X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fnotification-thread-events.cpp;h=586a9900b641419b39a0729dfa16e00459a7fed0;hb=139a8d250fb18f8ffc95b0936f7285f7b484b72f;hp=6f449fc638cef0df78f88c8abcc5d575f6376ad6;hpb=7966af5763c4aaca39df9bbfa9277ff15715c720;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/notification-thread-events.cpp b/src/bin/lttng-sessiond/notification-thread-events.cpp index 6f449fc63..586a9900b 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.cpp +++ b/src/bin/lttng-sessiond/notification-thread-events.cpp @@ -6,49 +6,49 @@ */ #include "lttng/action/action.h" -#include "lttng/trigger/trigger-internal.h" +#include "lttng/trigger/trigger-internal.hpp" #define _LGPL_SOURCE #include #include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include #include #include -#include "condition-internal.h" -#include "event-notifier-error-accounting.h" -#include "notification-thread.h" -#include "notification-thread-events.h" -#include "notification-thread-commands.h" -#include "lttng-sessiond.h" -#include "kernel.h" +#include "condition-internal.hpp" +#include "event-notifier-error-accounting.hpp" +#include "notification-thread.hpp" +#include "notification-thread-events.hpp" +#include "notification-thread-commands.hpp" +#include "lttng-sessiond.hpp" +#include "kernel.hpp" -#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP) -#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT) +#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP) +#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT) /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */ #define MAX_CAPTURE_SIZE (PIPE_BUF) @@ -60,12 +60,6 @@ enum lttng_object_type { LTTNG_OBJECT_TYPE_SESSION, }; -struct lttng_trigger_list_element { - /* No ownership of the trigger object is assumed. */ - struct lttng_trigger *trigger; - struct cds_list_head node; -}; - struct lttng_channel_trigger_list { struct channel_key channel_key; /* List of struct lttng_trigger_list_element. */ @@ -117,6 +111,13 @@ struct lttng_session_trigger_list { struct rcu_head rcu_node; }; +namespace { +struct lttng_trigger_list_element { + /* No ownership of the trigger object is assumed. */ + struct lttng_trigger *trigger; + struct cds_list_head node; +}; + struct lttng_trigger_ht_element { struct lttng_trigger *trigger; struct cds_lfht_node node; @@ -140,6 +141,7 @@ struct channel_state_sample { /* call_rcu delayed reclaim. */ struct rcu_head rcu_node; }; +} /* namespace */ static unsigned long hash_channel_key(struct channel_key *key); static int evaluate_buffer_condition(const struct lttng_condition *condition, @@ -166,13 +168,14 @@ void session_info_get(struct session_info *session_info); static void session_info_put(struct session_info *session_info); static -struct session_info *session_info_create(const char *name, - uid_t uid, gid_t gid, +struct session_info *session_info_create(uint64_t id, + const char *name, + uid_t uid, + gid_t gid, struct lttng_session_trigger_list *trigger_list, struct cds_lfht *sessions_ht); -static -void session_info_add_channel(struct session_info *session_info, - struct channel_info *channel_info); +static void session_info_add_channel( + struct session_info *session_info, struct channel_info *channel_info); static void session_info_remove_channel(struct session_info *session_info, struct channel_info *channel_info); @@ -223,8 +226,8 @@ int match_client_id(struct cds_lfht_node *node, const void *key) { /* This double-cast is intended to supress pointer-to-cast warning. */ const notification_client_id id = *((notification_client_id *) key); - const struct notification_client *client = caa_container_of( - node, struct notification_client, client_id_ht_node); + const struct notification_client *client = lttng::utils::container_of( + node, ¬ification_client::client_id_ht_node); return client->id == id; } @@ -320,13 +323,60 @@ int match_client_list_condition(struct cds_lfht_node *node, const void *key) } static -int match_session(struct cds_lfht_node *node, const void *key) +int match_session_info(struct cds_lfht_node *node, const void *key) +{ + const auto session_id = *((uint64_t *) key); + const auto *session_info = lttng::utils::container_of( + node, &session_info::sessions_ht_node); + + return session_id == session_info->id; +} + +static +unsigned long hash_session_info_id(uint64_t id) +{ + return hash_key_u64(&id, lttng_ht_seed); +} + +static +unsigned long hash_session_info(const struct session_info *session_info) +{ + return hash_session_info_id(session_info->id); +} + +static +struct session_info *get_session_info_by_id( + const struct notification_thread_state *state, uint64_t id) +{ + struct cds_lfht_iter iter; + struct cds_lfht_node *node; + lttng::urcu::read_lock_guard read_lock_guard; + + cds_lfht_lookup(state->sessions_ht, + hash_session_info_id(id), + match_session_info, + &id, + &iter); + node = cds_lfht_iter_get_node(&iter); + + if (node) { + auto session_info = lttng::utils::container_of(node, &session_info::sessions_ht_node); + + session_info_get(session_info); + return session_info; + } + + return NULL; +} + +static +struct session_info *get_session_info_by_name( + const struct notification_thread_state *state, const char *name) { - const char *name = (const char *) key; - struct session_info *session_info = caa_container_of( - node, struct session_info, sessions_ht_node); + uint64_t session_id; + const auto found = sample_session_id_by_name(name, &session_id); - return !strcmp(session_info->name, name); + return found ? get_session_info_by_id(state, session_id) : NULL; } static @@ -342,6 +392,10 @@ const char *notification_command_type_str( return "ADD_CHANNEL"; case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL: return "REMOVE_CHANNEL"; + case NOTIFICATION_COMMAND_TYPE_ADD_SESSION: + return "ADD_SESSION"; + case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION: + return "REMOVE_SESSION"; case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING: return "SESSION_ROTATION_ONGOING"; case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED: @@ -491,7 +545,7 @@ enum lttng_object_type get_condition_binding_object( static void free_channel_info_rcu(struct rcu_head *node) { - free(caa_container_of(node, struct channel_info, rcu_node)); + free(lttng::utils::container_of(node, &channel_info::rcu_node)); } static @@ -515,7 +569,7 @@ void channel_info_destroy(struct channel_info *channel_info) static void free_session_info_rcu(struct rcu_head *node) { - free(caa_container_of(node, struct session_info, rcu_node)); + free(lttng::utils::container_of(node, &session_info::rcu_node)); } /* Don't call directly, use the ref-counting mechanism. */ @@ -561,7 +615,10 @@ void session_info_put(struct session_info *session_info) } static -struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid, +struct session_info *session_info_create(uint64_t id, + const char *name, + uid_t uid, + gid_t gid, struct lttng_session_trigger_list *trigger_list, struct cds_lfht *sessions_ht) { @@ -569,10 +626,11 @@ struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid, LTTNG_ASSERT(name); - session_info = (struct session_info *) zmalloc(sizeof(*session_info)); + session_info = zmalloc(); if (!session_info) { goto end; } + lttng_ref_init(&session_info->ref, session_info_destroy); session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE, @@ -582,10 +640,12 @@ struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid, } cds_lfht_node_init(&session_info->sessions_ht_node); + session_info->id = id; session_info->name = strdup(name); if (!session_info->name) { goto error; } + session_info->uid = uid; session_info->gid = gid; session_info->trigger_list = trigger_list; @@ -623,7 +683,7 @@ struct channel_info *channel_info_create(const char *channel_name, struct channel_key *channel_key, uint64_t channel_capacity, struct session_info *session_info) { - struct channel_info *channel_info = (struct channel_info *) zmalloc(sizeof(*channel_info)); + struct channel_info *channel_info = zmalloc(); if (!channel_info) { goto end; @@ -670,7 +730,7 @@ static void notification_client_list_release(struct urcu_ref *list_ref) { struct notification_client_list *list = - container_of(list_ref, typeof(*list), ref); + lttng::utils::container_of(list_ref, ¬ification_client_list::ref); struct notification_client_list_element *client_list_element, *tmp; lttng_condition_put(list->condition); @@ -723,7 +783,7 @@ struct notification_client_list *notification_client_list_create( struct cds_lfht_iter iter; struct notification_client_list *client_list; - client_list = (notification_client_list *) zmalloc(sizeof(*client_list)); + client_list = zmalloc(); if (!client_list) { PERROR("Failed to allocate notification client list"); goto end; @@ -755,7 +815,7 @@ struct notification_client_list *notification_client_list_create( continue; } - client_list_element = (notification_client_list_element *) zmalloc(sizeof(*client_list_element)); + client_list_element = zmalloc(); if (!client_list_element) { goto error_put_client_list; } @@ -814,8 +874,8 @@ struct notification_client_list *get_client_list_from_condition( &iter); node = cds_lfht_iter_get_node(&iter); if (node) { - list = container_of(node, struct notification_client_list, - notification_trigger_clients_ht_node); + list = lttng::utils::container_of(node, + ¬ification_client_list::notification_trigger_clients_ht_node); list = notification_client_list_get(list) ? list : NULL; } @@ -958,32 +1018,21 @@ int evaluate_session_condition_for_client( uid_t *session_uid, gid_t *session_gid) { int ret; - struct cds_lfht_iter iter; - struct cds_lfht_node *node; const char *session_name; struct session_info *session_info = NULL; rcu_read_lock(); session_name = get_condition_session_name(condition); - /* Find the session associated with the trigger. */ - cds_lfht_lookup(state->sessions_ht, - hash_key_str(session_name, lttng_ht_seed), - match_session, - session_name, - &iter); - node = cds_lfht_iter_get_node(&iter); - if (!node) { - DBG("No known session matching name \"%s\"", + /* Find the session associated with the condition. */ + session_info = get_session_info_by_name(state, session_name); + if (!session_info) { + DBG("Unknown session while evaluating session condition for client: name = `%s`", session_name); ret = 0; goto end; } - session_info = caa_container_of(node, struct session_info, - sessions_ht_node); - session_info_get(session_info); - /* * Evaluation is performed in-line here since only one type of * session-bound condition is handled for the moment. @@ -1031,8 +1080,15 @@ int evaluate_condition_for_client(const struct lttng_trigger *trigger, struct lttng_evaluation *evaluation = NULL; struct notification_client_list client_list = { .lock = PTHREAD_MUTEX_INITIALIZER, + .ref = {}, + .condition = NULL, + .triggers_list = {}, + .clients_list = {}, + .notification_trigger_clients_ht = NULL, + .notification_trigger_clients_ht_node = {}, + .rcu_node = {}, }; - struct notification_client_list_element client_list_element = { 0 }; + struct notification_client_list_element client_list_element = {}; uid_t object_uid = 0; gid_t object_gid = 0; @@ -1116,12 +1172,12 @@ int notification_thread_client_subscribe(struct notification_client *client, } } - condition_list_element = (lttng_condition_list_element *) zmalloc(sizeof(*condition_list_element)); + condition_list_element = zmalloc(); if (!condition_list_element) { ret = -1; goto error; } - client_list_element = (notification_client_list_element *) zmalloc(sizeof(*client_list_element)); + client_list_element = zmalloc(); if (!client_list_element) { ret = -1; goto error; @@ -1275,12 +1331,11 @@ end: static void free_notification_client_rcu(struct rcu_head *node) { - free(caa_container_of(node, struct notification_client, rcu_node)); + free(lttng::utils::container_of(node, ¬ification_client::rcu_node)); } static -void notification_client_destroy(struct notification_client *client, - struct notification_thread_state *state) +void notification_client_destroy(struct notification_client *client) { if (!client) { return; @@ -1313,6 +1368,8 @@ struct notification_client *get_client_from_socket(int socket, struct cds_lfht_node *node; struct notification_client *client = NULL; + ASSERT_RCU_READ_LOCKED(); + cds_lfht_lookup(state->client_socket_ht, hash_client_socket(socket), match_client_socket, @@ -1341,6 +1398,8 @@ struct notification_client *get_client_from_id(notification_client_id id, struct cds_lfht_node *node; struct notification_client *client = NULL; + ASSERT_RCU_READ_LOCKED(); + cds_lfht_lookup(state->client_id_ht, hash_client_id(id), match_client_id, @@ -1456,6 +1515,8 @@ struct lttng_session_trigger_list *get_session_trigger_list( struct cds_lfht_node *node; struct cds_lfht_iter iter; + ASSERT_RCU_READ_LOCKED(); + cds_lfht_lookup(state->session_triggers_ht, hash_key_str(session_name, lttng_ht_seed), match_session_trigger_list, @@ -1494,7 +1555,7 @@ struct lttng_session_trigger_list *lttng_session_trigger_list_create( { struct lttng_session_trigger_list *list; - list = (lttng_session_trigger_list *) zmalloc(sizeof(*list)); + list = zmalloc(); if (!list) { goto end; } @@ -1545,7 +1606,7 @@ int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list, { int ret = 0; struct lttng_trigger_list_element *new_element = - (lttng_trigger_list_element *) zmalloc(sizeof(*new_element)); + zmalloc(); if (!new_element) { ret = -1; @@ -1640,39 +1701,22 @@ error: } static -struct session_info *find_or_create_session_info( - struct notification_thread_state *state, - const char *name, uid_t uid, gid_t gid) +struct session_info *create_and_publish_session_info(struct notification_thread_state *state, + uint64_t id, + const char *name, + uid_t uid, + gid_t gid) { struct session_info *session = NULL; - struct cds_lfht_node *node; - struct cds_lfht_iter iter; struct lttng_session_trigger_list *trigger_list; rcu_read_lock(); - cds_lfht_lookup(state->sessions_ht, - hash_key_str(name, lttng_ht_seed), - match_session, - name, - &iter); - node = cds_lfht_iter_get_node(&iter); - if (node) { - DBG("Found session info of session \"%s\" (uid = %i, gid = %i)", - name, uid, gid); - session = caa_container_of(node, struct session_info, - sessions_ht_node); - LTTNG_ASSERT(session->uid == uid); - LTTNG_ASSERT(session->gid == gid); - session_info_get(session); - goto end; - } - trigger_list = lttng_session_trigger_list_build(state, name); if (!trigger_list) { goto error; } - session = session_info_create(name, uid, gid, trigger_list, + session = session_info_create(id, name, uid, gid, trigger_list, state->sessions_ht); if (!session) { ERR("Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)", @@ -1680,11 +1724,16 @@ struct session_info *find_or_create_session_info( lttng_session_trigger_list_destroy(trigger_list); goto error; } + + /* Transferred ownership to the new session. */ trigger_list = NULL; - cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed), - &session->sessions_ht_node); -end: + if (cds_lfht_add_unique(state->sessions_ht, hash_session_info(session), match_session_info, + &id, &session->sessions_ht_node) != &session->sessions_ht_node) { + ERR("Duplicate session found: name = `%s`, id = %" PRIu64, name, id); + goto error; + } + rcu_read_unlock(); return session; error: @@ -1694,11 +1743,12 @@ error: } static -int handle_notification_thread_command_add_channel( - struct notification_thread_state *state, - const char *session_name, uid_t session_uid, gid_t session_gid, - const char *channel_name, enum lttng_domain_type channel_domain, - uint64_t channel_key_int, uint64_t channel_capacity, +int handle_notification_thread_command_add_channel(struct notification_thread_state *state, + uint64_t session_id, + const char *channel_name, + enum lttng_domain_type channel_domain, + uint64_t channel_key_int, + uint64_t channel_capacity, enum lttng_error_code *cmd_result) { struct cds_list_head trigger_list; @@ -1713,16 +1763,17 @@ int handle_notification_thread_command_add_channel( struct cds_lfht_iter iter; struct session_info *session_info = NULL; - DBG("Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain", - channel_name, session_name, channel_key_int, + DBG("Adding channel: channel name = `%s`, session id = %" PRIu64 ", channel key = %" PRIu64 ", domain = %s", + channel_name, session_id, channel_key_int, lttng_domain_type_str(channel_domain)); CDS_INIT_LIST_HEAD(&trigger_list); - session_info = find_or_create_session_info(state, session_name, - session_uid, session_gid); + session_info = get_session_info_by_id(state, session_id); if (!session_info) { - /* Allocation error or an internal error occurred. */ + /* Fatal logic error. */ + ERR("Failed to find session while adding channel: session id = %" PRIu64, + session_id); goto error; } @@ -1743,7 +1794,7 @@ int handle_notification_thread_command_add_channel( continue; } - new_element = (lttng_trigger_list_element *) zmalloc(sizeof(*new_element)); + new_element = zmalloc(); if (!new_element) { rcu_read_unlock(); goto error; @@ -1757,7 +1808,7 @@ int handle_notification_thread_command_add_channel( DBG("Found %i triggers that apply to newly added channel", trigger_count); - channel_trigger_list = (lttng_channel_trigger_list *) zmalloc(sizeof(*channel_trigger_list)); + channel_trigger_list = zmalloc(); if (!channel_trigger_list) { goto error; } @@ -1788,6 +1839,67 @@ error: return 1; } +static +int handle_notification_thread_command_add_session(struct notification_thread_state *state, + uint64_t session_id, + const char *session_name, + uid_t session_uid, + gid_t session_gid, + enum lttng_error_code *cmd_result) +{ + int ret; + + DBG("Adding session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d", + session_name, session_id, session_uid, session_gid); + + auto session = create_and_publish_session_info(state, session_id, session_name, session_uid, session_gid); + if (!session) { + PERROR("Failed to add session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d", + session_name, session_id, session_uid, session_gid); + ret = -1; + *cmd_result = LTTNG_ERR_NOMEM; + goto end; + } + + /* + * Note that the reference to `session` is not released; this reference is + * the "global" reference that is used to allow look-ups. This reference will + * only be released when the session is removed. See + * handle_notification_thread_command_remove_session. + */ + ret = 0; + *cmd_result = LTTNG_OK; +end: + return ret; +} + +static +int handle_notification_thread_command_remove_session( + struct notification_thread_state *state, + uint64_t session_id, + enum lttng_error_code *cmd_result) +{ + int ret; + + DBG("Removing session: session id = %" PRIu64, session_id); + + auto session = get_session_info_by_id(state, session_id); + if (!session) { + ERR("Failed to remove session: session id = %" PRIu64, session_id); + ret = -1; + *cmd_result = LTTNG_ERR_NO_SESSION; + goto end; + } + + /* Release the reference returned by the look-up, and then release the global reference. */ + session_info_put(session); + session_info_put(session); + ret = 0; + *cmd_result = LTTNG_OK; +end: + return ret; +} + static void free_channel_trigger_list_rcu(struct rcu_head *node) { @@ -1888,7 +2000,7 @@ static int handle_notification_thread_command_session_rotation( struct notification_thread_state *state, enum notification_thread_command_type cmd_type, - const char *session_name, uid_t session_uid, gid_t session_gid, + uint64_t session_id, uint64_t trace_archive_chunk_id, struct lttng_trace_archive_location *location, enum lttng_error_code *_cmd_result) @@ -1898,29 +2010,32 @@ int handle_notification_thread_command_session_rotation( struct lttng_session_trigger_list *trigger_list; struct lttng_trigger_list_element *trigger_list_element; struct session_info *session_info; - const struct lttng_credentials session_creds = { - .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid), - .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid), - }; + struct lttng_credentials session_creds; rcu_read_lock(); - session_info = find_or_create_session_info(state, session_name, - session_uid, session_gid); + session_info = get_session_info_by_id(state, session_id); if (!session_info) { - /* Allocation error or an internal error occurred. */ + /* Fatal logic error. */ + ERR("Failed to find session while handling rotation state change: session id = %" PRIu64, + session_id); ret = -1; - cmd_result = LTTNG_ERR_NOMEM; + cmd_result = LTTNG_ERR_FATAL; goto end; } + session_creds = { + .uid = LTTNG_OPTIONAL_INIT_VALUE(session_info->uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(session_info->gid), + }; + session_info->rotation.ongoing = cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING; session_info->rotation.id = trace_archive_chunk_id; - trigger_list = get_session_trigger_list(state, session_name); + trigger_list = get_session_trigger_list(state, session_info->name); if (!trigger_list) { - DBG("No triggers applying to session \"%s\" found", - session_name); + DBG("No triggers apply to session: session name = `%s` ", + session_info->name); goto end; } @@ -2020,7 +2135,7 @@ int handle_notification_thread_command_add_tracer_event_source( enum lttng_error_code cmd_result = LTTNG_OK; struct notification_event_tracer_event_source_element *element = NULL; - element = (notification_event_tracer_event_source_element *) zmalloc(sizeof(*element)); + element = zmalloc(); if (!element) { cmd_result = LTTNG_ERR_NOMEM; ret = -1; @@ -2037,7 +2152,7 @@ int handle_notification_thread_command_add_tracer_event_source( lttng_domain_type_str(domain_type)); /* Adding the read side pipe to the event poll. */ - ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR); + ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLPRI | LPOLLIN | LPOLLERR); if (ret < 0) { ERR("Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'", tracer_event_source_fd, @@ -2059,7 +2174,7 @@ int drain_event_notifier_notification_pipe( struct notification_thread_state *state, int pipe, enum lttng_domain_type domain) { - struct lttng_poll_event events = {0}; + struct lttng_poll_event events = {}; int ret; ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC); @@ -2227,7 +2342,7 @@ end: } static int handle_notification_thread_command_list_triggers( - struct notification_thread_handle *handle, + struct notification_thread_handle *handle __attribute__((unused)), struct notification_thread_state *state, uid_t client_uid, struct lttng_triggers **triggers, @@ -2418,6 +2533,8 @@ int bind_trigger_to_matching_session(struct lttng_trigger *trigger, const char *session_name; struct lttng_session_trigger_list *trigger_list; + ASSERT_RCU_READ_LOCKED(); + condition = lttng_trigger_get_const_condition(trigger); switch (lttng_condition_get_type(condition)) { case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING: @@ -2464,6 +2581,8 @@ int bind_trigger_to_matching_channels(struct lttng_trigger *trigger, struct cds_lfht_iter iter; struct channel_info *channel; + ASSERT_RCU_READ_LOCKED(); + cds_lfht_for_each_entry(state->channels_ht, &iter, channel, channels_ht_node) { struct lttng_trigger_list_element *trigger_list_element; @@ -2485,7 +2604,7 @@ int bind_trigger_to_matching_channels(struct lttng_trigger *trigger, struct lttng_channel_trigger_list, channel_triggers_ht_node); - trigger_list_element = (lttng_trigger_list_element *) zmalloc(sizeof(*trigger_list_element)); + trigger_list_element = zmalloc(); if (!trigger_list_element) { ret = -1; goto end; @@ -2606,7 +2725,7 @@ enum lttng_error_code setup_tracer_notifier( struct lttng_condition *condition = lttng_trigger_get_condition(trigger); struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL; - trigger_tokens_ht_element = (notification_trigger_tokens_ht_element *) zmalloc(sizeof(*trigger_tokens_ht_element)); + trigger_tokens_ht_element = zmalloc(); if (!trigger_tokens_ht_element) { ret = LTTNG_ERR_NOMEM; goto end; @@ -2729,7 +2848,7 @@ int handle_notification_thread_command_register_trigger( goto error; } - trigger_ht_element = (lttng_trigger_ht_element *) zmalloc(sizeof(*trigger_ht_element)); + trigger_ht_element = zmalloc(); if (!trigger_ht_element) { ret = -1; goto error; @@ -3107,28 +3226,43 @@ end: return 0; } +static +int pop_cmd_queue(struct notification_thread_handle *handle, + struct notification_thread_command **cmd) +{ + int ret; + uint64_t counter; + + pthread_mutex_lock(&handle->cmd_queue.lock); + ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter)); + if (ret != sizeof(counter)) { + ret = -1; + goto error_unlock; + } + + *cmd = cds_list_first_entry(&handle->cmd_queue.list, + struct notification_thread_command, cmd_list_node); + cds_list_del(&((*cmd)->cmd_list_node)); + ret = 0; + +error_unlock: + pthread_mutex_unlock(&handle->cmd_queue.lock); + return ret; +} + /* Returns 0 on success, 1 on exit requested, negative value on error. */ int handle_notification_thread_command( struct notification_thread_handle *handle, struct notification_thread_state *state) { int ret; - uint64_t counter; struct notification_thread_command *cmd; - /* Read the event pipe to put it back into a quiescent state. */ - ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter, - sizeof(counter)); - if (ret != sizeof(counter)) { + ret = pop_cmd_queue(handle, &cmd); + if (ret) { goto error; } - pthread_mutex_lock(&handle->cmd_queue.lock); - cmd = cds_list_first_entry(&handle->cmd_queue.list, - struct notification_thread_command, cmd_list_node); - cds_list_del(&cmd->cmd_list_node); - pthread_mutex_unlock(&handle->cmd_queue.lock); - DBG("Received `%s` command", notification_command_type_str(cmd->type)); switch (cmd->type) { @@ -3147,9 +3281,7 @@ int handle_notification_thread_command( case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL: ret = handle_notification_thread_command_add_channel( state, - cmd->parameters.add_channel.session.name, - cmd->parameters.add_channel.session.uid, - cmd->parameters.add_channel.session.gid, + cmd->parameters.add_channel.session.id, cmd->parameters.add_channel.channel.name, cmd->parameters.add_channel.channel.domain, cmd->parameters.add_channel.channel.key, @@ -3162,14 +3294,23 @@ int handle_notification_thread_command( cmd->parameters.remove_channel.domain, &cmd->reply_code); break; + case NOTIFICATION_COMMAND_TYPE_ADD_SESSION: + ret = handle_notification_thread_command_add_session(state, + cmd->parameters.add_session.session_id, + cmd->parameters.add_session.session_name, + cmd->parameters.add_session.session_uid, + cmd->parameters.add_session.session_gid, &cmd->reply_code); + break; + case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION: + ret = handle_notification_thread_command_remove_session( + state, cmd->parameters.remove_session.session_id, &cmd->reply_code); + break; case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING: case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED: ret = handle_notification_thread_command_session_rotation( state, cmd->type, - cmd->parameters.session_rotation.session_name, - cmd->parameters.session_rotation.uid, - cmd->parameters.session_rotation.gid, + cmd->parameters.session_rotation.session_id, cmd->parameters.session_rotation.trace_archive_chunk_id, cmd->parameters.session_rotation.location, &cmd->reply_code); @@ -3320,7 +3461,7 @@ int handle_notification_thread_client_connect( DBG("Handling new notification channel client connection"); - client = (notification_client *) zmalloc(sizeof(*client)); + client = zmalloc(); if (!client) { /* Fatal error. */ ret = -1; @@ -3363,9 +3504,9 @@ int handle_notification_thread_client_connect( goto error; } + client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN; ret = lttng_poll_add(&state->events, client->socket, - LPOLLIN | LPOLLERR | - LPOLLHUP | LPOLLRDHUP); + client->communication.current_poll_events); if (ret < 0) { ERR("Failed to add notification channel client socket to poll set"); ret = 0; @@ -3386,7 +3527,7 @@ int handle_notification_thread_client_connect( return ret; error: - notification_client_destroy(client, state); + notification_client_destroy(client); return ret; } @@ -3402,6 +3543,8 @@ int notification_thread_client_disconnect( int ret; struct lttng_condition_list_element *condition_list_element, *tmp; + ASSERT_RCU_READ_LOCKED(); + /* Acquire the client lock to disable its communication atomically. */ pthread_mutex_lock(&client->lock); client->communication.active = false; @@ -3426,7 +3569,7 @@ int notification_thread_client_disconnect( * Client no longer accessible to other threads (through the * client lists). */ - notification_client_destroy(client, state); + notification_client_destroy(client); return ret; } @@ -3497,6 +3640,18 @@ int handle_notification_thread_trigger_unregister_all( return error_occurred ? -1 : 0; } +static +bool client_has_outbound_data_left( + const struct notification_client *client) +{ + const struct lttng_payload_view pv = lttng_payload_view_from_payload( + &client->communication.outbound.payload, 0, -1); + const bool has_data = pv.buffer.size != 0; + const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv); + + return has_data || has_fds; +} + static int client_handle_transmission_status( struct notification_client *client, @@ -3507,24 +3662,51 @@ int client_handle_transmission_status( switch (transmission_status) { case CLIENT_TRANSMISSION_STATUS_COMPLETE: - ret = lttng_poll_mod(&state->events, client->socket, - CLIENT_POLL_MASK_IN); - if (ret) { - goto end; - } - - break; case CLIENT_TRANSMISSION_STATUS_QUEUED: + { + int current_poll_events; + int new_poll_events; /* * We want to be notified whenever there is buffer space - * available to send the rest of the payload. + * available to send the rest of the payload if we are + * waiting to send data to the client. + * + * The state of the outbound queue being sampled here is + * fine since: + * - it is okay to wake-up "for nothing" in case we see + * that data is left, but another thread succeeds in + * flushing it before us when handling the client "out" + * event. We will simply stop monitoring that event the next + * time it wakes us up and we see no data left to be sent, + * - if another thread fails to flush the entire client + * outgoing queue, it will issue a "communication update" + * command and cause the client's (e)poll mask to be + * re-evaluated. + * + * The situation we seek to avoid would be to disable the + * monitoring of "out" client events indefinitely when there is + * data to be sent, which can't happen because of the + * aforementioned "communication update" mechanism. */ - ret = lttng_poll_mod(&state->events, client->socket, - CLIENT_POLL_MASK_IN_OUT); - if (ret) { - goto end; + pthread_mutex_lock(&client->lock); + current_poll_events = client->communication.current_poll_events; + new_poll_events = client_has_outbound_data_left(client) ? + CLIENT_POLL_EVENTS_IN_OUT : + CLIENT_POLL_EVENTS_IN; + client->communication.current_poll_events = new_poll_events; + pthread_mutex_unlock(&client->lock); + + /* Update the monitored event set only if it changed. */ + if (current_poll_events != new_poll_events) { + ret = lttng_poll_mod(&state->events, client->socket, + new_poll_events); + if (ret) { + goto end; + } } + break; + } case CLIENT_TRANSMISSION_STATUS_FAIL: ret = notification_thread_client_disconnect(client, state); if (ret) { @@ -3664,18 +3846,6 @@ error: return CLIENT_TRANSMISSION_STATUS_ERROR; } -static -bool client_has_outbound_data_left( - const struct notification_client *client) -{ - const struct lttng_payload_view pv = lttng_payload_view_from_payload( - &client->communication.outbound.payload, 0, -1); - const bool has_data = pv.buffer.size != 0; - const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv); - - return has_data || has_fds; -} - /* Client lock must _not_ be held by the caller. */ static int client_send_command_reply(struct notification_client *client, @@ -3686,13 +3856,14 @@ int client_send_command_reply(struct notification_client *client, struct lttng_notification_channel_command_reply reply = { .status = (int8_t) status, }; - struct lttng_notification_channel_message msg = { - .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY, - .size = sizeof(reply), - }; + struct lttng_notification_channel_message msg; char buffer[sizeof(msg) + sizeof(reply)]; enum client_transmission_status transmission_status; + msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY; + msg.size = sizeof(reply); + msg.fds = 0; + memcpy(buffer, &msg, sizeof(msg)); memcpy(buffer + sizeof(msg), &reply, sizeof(reply)); DBG("Send command reply (%i)", (int) status); @@ -3734,7 +3905,7 @@ error: static int client_handle_message_unknown(struct notification_client *client, - struct notification_thread_state *state) + struct notification_thread_state *state __attribute__((unused))) { int ret; /* @@ -3790,14 +3961,15 @@ int client_handle_message_handshake(struct notification_client *client, .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR, .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR, }; - const struct lttng_notification_channel_message msg_header = { - .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE, - .size = sizeof(handshake_reply), - }; + struct lttng_notification_channel_message msg_header; enum lttng_notification_channel_status status = LTTNG_NOTIFICATION_CHANNEL_STATUS_OK; char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)]; + msg_header.type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE; + msg_header.size = sizeof(handshake_reply); + msg_header.fds = 0; + memcpy(send_buffer, &msg_header, sizeof(msg_header)); memcpy(send_buffer + sizeof(msg_header), &handshake_reply, sizeof(handshake_reply)); @@ -4082,7 +4254,47 @@ int handle_notification_thread_client_out( } pthread_mutex_lock(&client->lock); - transmission_status = client_flush_outgoing_queue(client); + if (!client_has_outbound_data_left(client)) { + /* + * A client "out" event can be received when no payload is left + * to send under some circumstances. + * + * Many threads can flush a client's outgoing queue and, if they + * had to queue their message (socket was full), will use the + * "communication update" command to signal the (e)poll thread + * to monitor for space being made available in the socket. + * + * Commands are sent over an internal pipe serviced by the same + * thread as the client sockets. + * + * When space is made available in the socket, there is a race + * between the (e)poll thread and the other threads that may + * wish to use the client's socket to flush its outgoing queue. + * + * A non-(e)poll thread may attempt (and succeed) in flushing + * the queue before the (e)poll thread gets a chance to service + * the client's "out" event. + * + * In this situation, the (e)poll thread processing the client + * out event will see an empty payload: there is nothing to do + * except unsubscribing (e)poll "out" events. + * + * Note that this thread is the (e)poll thread so it can modify + * the (e)poll mask directly without using a communication + * update command. Other threads that flush the outgoing queue + * will use the "communication update" command to wake up this + * thread and force it to monitor "out" events. + * + * When other threads succeed in emptying the outgoing queue, + * they don't need to update the (e)poll mask: if the "out" + * event is monitored, it will fire once and the (e)poll + * thread will reach this condition, causing the event to + * stop being monitored. + */ + transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE; + } else { + transmission_status = client_flush_outgoing_queue(client); + } pthread_mutex_unlock(&client->lock); ret = client_handle_transmission_status( @@ -4103,9 +4315,8 @@ bool evaluate_buffer_usage_condition(const struct lttng_condition *condition, bool result = false; uint64_t threshold; enum lttng_condition_type condition_type; - const struct lttng_condition_buffer_usage *use_condition = container_of( - condition, struct lttng_condition_buffer_usage, - parent); + const struct lttng_condition_buffer_usage *use_condition = lttng::utils::container_of( + condition, <tng_condition_buffer_usage::parent); if (use_condition->threshold_bytes.set) { threshold = use_condition->threshold_bytes.value; @@ -4161,9 +4372,8 @@ bool evaluate_session_consumed_size_condition( { uint64_t threshold; const struct lttng_condition_session_consumed_size *size_condition = - container_of(condition, - struct lttng_condition_session_consumed_size, - parent); + lttng::utils::container_of(condition, + <tng_condition_session_consumed_size::parent); threshold = size_condition->consumed_threshold_bytes.value; DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64, @@ -4174,7 +4384,7 @@ bool evaluate_session_consumed_size_condition( static int evaluate_buffer_condition(const struct lttng_condition *condition, struct lttng_evaluation **evaluation, - const struct notification_thread_state *state, + const struct notification_thread_state *state __attribute__((unused)), const struct channel_state_sample *previous_sample, const struct channel_state_sample *latest_sample, uint64_t previous_session_consumed_total, @@ -4261,9 +4471,11 @@ static int client_notification_overflow(struct notification_client *client) { int ret = 0; - const struct lttng_notification_channel_message msg = { - .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED, - }; + struct lttng_notification_channel_message msg; + + msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED; + msg.size = 0; + msg.fds = 0; ASSERT_LOCKED(client->lock); @@ -4365,14 +4577,16 @@ int notification_client_list_send_evaluation( .trigger = (struct lttng_trigger *) trigger, .evaluation = (struct lttng_evaluation *) evaluation, }; - struct lttng_notification_channel_message msg_header = { - .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION, - }; + struct lttng_notification_channel_message msg_header; const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger); lttng_payload_init(&msg_payload); + msg_header.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION; + msg_header.size = 0; + msg_header.fds = 0; + ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header, sizeof(msg_header)); if (ret) { @@ -4562,7 +4776,7 @@ struct lttng_event_notifier_notification *recv_one_event_notifier_notification( goto end; } - capture_buffer = (char *) zmalloc(capture_buffer_size); + capture_buffer = calloc(capture_buffer_size); if (!capture_buffer) { ERR("Failed to allocate capture buffer"); goto end; @@ -4642,10 +4856,9 @@ int dispatch_one_event_notifier_notification(struct notification_thread_state *s } evaluation = lttng_evaluation_event_rule_matches_create( - container_of(lttng_trigger_get_const_condition( + lttng::utils::container_of(lttng_trigger_get_const_condition( element->trigger), - struct lttng_condition_event_rule_matches, - parent), + <tng_condition_event_rule_matches::parent), notification->capture_buffer, notification->capture_buf_size, false); @@ -4865,7 +5078,7 @@ int handle_notification_thread_channel_sample( */ struct channel_state_sample *stored_sample; - stored_sample = (channel_state_sample *) zmalloc(sizeof(*stored_sample)); + stored_sample = zmalloc(); if (!stored_sample) { ret = -1; goto end_unlock;