Fix: sessiond: report client list allocation failure as a fatal error
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.cpp
index 67fca2a357bb3fcac0814d150313e49ae38532a2..43879c99fe8957d021e2dacbb2218997cd3c73d8 100644 (file)
@@ -33,6 +33,7 @@
 #include <lttng/notification/channel-internal.hpp>
 #include <lttng/trigger/trigger-internal.hpp>
 #include <lttng/event-rule/event-rule-internal.hpp>
+#include <lttng/location-internal.hpp>
 
 #include <time.h>
 #include <unistd.h>
@@ -47,8 +48,8 @@
 #include "lttng-sessiond.hpp"
 #include "kernel.hpp"
 
-#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
+#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT)
 
 /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
 #define MAX_CAPTURE_SIZE (PIPE_BUF)
@@ -60,12 +61,6 @@ enum lttng_object_type {
        LTTNG_OBJECT_TYPE_SESSION,
 };
 
-struct lttng_trigger_list_element {
-       /* No ownership of the trigger object is assumed. */
-       struct lttng_trigger *trigger;
-       struct cds_list_head node;
-};
-
 struct lttng_channel_trigger_list {
        struct channel_key channel_key;
        /* List of struct lttng_trigger_list_element. */
@@ -86,11 +81,7 @@ struct lttng_channel_trigger_list {
  *   - lttng_session_trigger_list_add()
  */
 struct lttng_session_trigger_list {
-       /*
-        * Not owned by this; points to the session_info structure's
-        * session name.
-        */
-       const char *session_name;
+       char *session_name;
        /* List of struct lttng_trigger_list_element. */
        struct cds_list_head list;
        /* Node in the session_triggers_ht */
@@ -117,6 +108,13 @@ struct lttng_session_trigger_list {
        struct rcu_head rcu_node;
 };
 
+namespace {
+struct lttng_trigger_list_element {
+       /* No ownership of the trigger object is assumed. */
+       struct lttng_trigger *trigger;
+       struct cds_list_head node;
+};
+
 struct lttng_trigger_ht_element {
        struct lttng_trigger *trigger;
        struct cds_lfht_node node;
@@ -136,10 +134,10 @@ struct channel_state_sample {
        struct cds_lfht_node channel_state_ht_node;
        uint64_t highest_usage;
        uint64_t lowest_usage;
-       uint64_t channel_total_consumed;
        /* call_rcu delayed reclaim. */
        struct rcu_head rcu_node;
 };
+} /* namespace */
 
 static unsigned long hash_channel_key(struct channel_key *key);
 static int evaluate_buffer_condition(const struct lttng_condition *condition,
@@ -147,8 +145,6 @@ static int evaluate_buffer_condition(const struct lttng_condition *condition,
                const struct notification_thread_state *state,
                const struct channel_state_sample *previous_sample,
                const struct channel_state_sample *latest_sample,
-               uint64_t previous_session_consumed_total,
-               uint64_t latest_session_consumed_total,
                struct channel_info *channel_info);
 static
 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
@@ -166,13 +162,14 @@ void session_info_get(struct session_info *session_info);
 static
 void session_info_put(struct session_info *session_info);
 static
-struct session_info *session_info_create(const char *name,
-               uid_t uid, gid_t gid,
+struct session_info *session_info_create(uint64_t id,
+               const char *name,
+               uid_t uid,
+               gid_t gid,
                struct lttng_session_trigger_list *trigger_list,
                struct cds_lfht *sessions_ht);
-static
-void session_info_add_channel(struct session_info *session_info,
-               struct channel_info *channel_info);
+static void session_info_add_channel(
+               struct session_info *session_info, struct channel_info *channel_info);
 static
 void session_info_remove_channel(struct session_info *session_info,
                struct channel_info *channel_info);
@@ -223,8 +220,8 @@ int match_client_id(struct cds_lfht_node *node, const void *key)
 {
        /* This double-cast is intended to supress pointer-to-cast warning. */
        const notification_client_id id = *((notification_client_id *) key);
-       const struct notification_client *client = caa_container_of(
-                       node, struct notification_client, client_id_ht_node);
+       const struct notification_client *client = lttng::utils::container_of(
+                       node, &notification_client::client_id_ht_node);
 
        return client->id == id;
 }
@@ -320,13 +317,60 @@ int match_client_list_condition(struct cds_lfht_node *node, const void *key)
 }
 
 static
-int match_session(struct cds_lfht_node *node, const void *key)
+int match_session_info(struct cds_lfht_node *node, const void *key)
+{
+       const auto session_id = *((uint64_t *) key);
+       const auto *session_info = lttng::utils::container_of(
+               node, &session_info::sessions_ht_node);
+
+       return session_id == session_info->id;
+}
+
+static
+unsigned long hash_session_info_id(uint64_t id)
 {
-       const char *name = (const char *) key;
-       struct session_info *session_info = caa_container_of(
-               node, struct session_info, sessions_ht_node);
+       return hash_key_u64(&id, lttng_ht_seed);
+}
 
-       return !strcmp(session_info->name, name);
+static
+unsigned long hash_session_info(const struct session_info *session_info)
+{
+       return hash_session_info_id(session_info->id);
+}
+
+static
+struct session_info *get_session_info_by_id(
+               const struct notification_thread_state *state, uint64_t id)
+{
+       struct cds_lfht_iter iter;
+       struct cds_lfht_node *node;
+       lttng::urcu::read_lock_guard read_lock_guard;
+
+       cds_lfht_lookup(state->sessions_ht,
+                       hash_session_info_id(id),
+                       match_session_info,
+                       &id,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+
+       if (node) {
+               auto session_info = lttng::utils::container_of(node, &session_info::sessions_ht_node);
+
+               session_info_get(session_info);
+               return session_info;
+       }
+
+       return NULL;
+}
+
+static
+struct session_info *get_session_info_by_name(
+               const struct notification_thread_state *state, const char *name)
+{
+       uint64_t session_id;
+       const auto found = sample_session_id_by_name(name, &session_id);
+
+       return found ? get_session_info_by_id(state, session_id) : NULL;
 }
 
 static
@@ -342,6 +386,10 @@ const char *notification_command_type_str(
                return "ADD_CHANNEL";
        case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
                return "REMOVE_CHANNEL";
+       case NOTIFICATION_COMMAND_TYPE_ADD_SESSION:
+               return "ADD_SESSION";
+       case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION:
+               return "REMOVE_SESSION";
        case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
                return "SESSION_ROTATION_ONGOING";
        case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
@@ -476,10 +524,10 @@ enum lttng_object_type get_condition_binding_object(
        switch (lttng_condition_get_type(condition)) {
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
                return LTTNG_OBJECT_TYPE_CHANNEL;
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
                return LTTNG_OBJECT_TYPE_SESSION;
        case LTTNG_CONDITION_TYPE_EVENT_RULE_MATCHES:
                return LTTNG_OBJECT_TYPE_NONE;
@@ -491,7 +539,7 @@ enum lttng_object_type get_condition_binding_object(
 static
 void free_channel_info_rcu(struct rcu_head *node)
 {
-       free(caa_container_of(node, struct channel_info, rcu_node));
+       free(lttng::utils::container_of(node, &channel_info::rcu_node));
 }
 
 static
@@ -515,7 +563,7 @@ void channel_info_destroy(struct channel_info *channel_info)
 static
 void free_session_info_rcu(struct rcu_head *node)
 {
-       free(caa_container_of(node, struct session_info, rcu_node));
+       free(lttng::utils::container_of(node, &session_info::rcu_node));
 }
 
 /* Don't call directly, use the ref-counting mechanism. */
@@ -539,6 +587,7 @@ void session_info_destroy(void *_data)
                        &session_info->sessions_ht_node);
        rcu_read_unlock();
        free(session_info->name);
+       lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location);
        call_rcu(&session_info->rcu_node, free_session_info_rcu);
 }
 
@@ -561,7 +610,10 @@ void session_info_put(struct session_info *session_info)
 }
 
 static
-struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
+struct session_info *session_info_create(uint64_t id,
+               const char *name,
+               uid_t uid,
+               gid_t gid,
                struct lttng_session_trigger_list *trigger_list,
                struct cds_lfht *sessions_ht)
 {
@@ -569,10 +621,11 @@ struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
 
        LTTNG_ASSERT(name);
 
-       session_info = (struct session_info *) zmalloc(sizeof(*session_info));
+       session_info = zmalloc<struct session_info>();
        if (!session_info) {
                goto end;
        }
+
        lttng_ref_init(&session_info->ref, session_info_destroy);
 
        session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
@@ -582,10 +635,12 @@ struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
        }
 
        cds_lfht_node_init(&session_info->sessions_ht_node);
+       session_info->id = id;
        session_info->name = strdup(name);
        if (!session_info->name) {
                goto error;
        }
+
        session_info->uid = uid;
        session_info->gid = gid;
        session_info->trigger_list = trigger_list;
@@ -623,7 +678,7 @@ struct channel_info *channel_info_create(const char *channel_name,
                struct channel_key *channel_key, uint64_t channel_capacity,
                struct session_info *session_info)
 {
-       struct channel_info *channel_info = (struct channel_info *) zmalloc(sizeof(*channel_info));
+       struct channel_info *channel_info = zmalloc<struct channel_info>();
 
        if (!channel_info) {
                goto end;
@@ -670,7 +725,7 @@ static
 void notification_client_list_release(struct urcu_ref *list_ref)
 {
        struct notification_client_list *list =
-                       container_of(list_ref, typeof(*list), ref);
+                       lttng::utils::container_of(list_ref, &notification_client_list::ref);
        struct notification_client_list_element *client_list_element, *tmp;
 
        lttng_condition_put(list->condition);
@@ -723,7 +778,7 @@ struct notification_client_list *notification_client_list_create(
        struct cds_lfht_iter iter;
        struct notification_client_list *client_list;
 
-       client_list = (notification_client_list *) zmalloc(sizeof(*client_list));
+       client_list = zmalloc<notification_client_list>();
        if (!client_list) {
                PERROR("Failed to allocate notification client list");
                goto end;
@@ -755,7 +810,7 @@ struct notification_client_list *notification_client_list_create(
                        continue;
                }
 
-               client_list_element = (notification_client_list_element *) zmalloc(sizeof(*client_list_element));
+               client_list_element = zmalloc<notification_client_list_element>();
                if (!client_list_element) {
                        goto error_put_client_list;
                }
@@ -814,8 +869,8 @@ struct notification_client_list *get_client_list_from_condition(
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
        if (node) {
-               list = container_of(node, struct notification_client_list,
-                               notification_trigger_clients_ht_node);
+               list = lttng::utils::container_of(node,
+                               &notification_client_list::notification_trigger_clients_ht_node);
                list = notification_client_list_get(list) ? list : NULL;
        }
 
@@ -904,7 +959,6 @@ int evaluate_channel_condition_for_client(
 
        ret = evaluate_buffer_condition(condition, evaluation, state,
                        NULL, last_sample,
-                       0, channel_info->session_info->consumed_data_size,
                        channel_info);
        if (ret) {
                WARN("Fatal error occurred while evaluating a newly subscribed-to condition");
@@ -951,73 +1005,141 @@ end:
 }
 
 static
-int evaluate_session_condition_for_client(
+bool evaluate_session_rotation_ongoing_condition(const struct lttng_condition *condition
+               __attribute__((unused)),
+               const struct session_state_sample *sample)
+{
+       return sample->rotation.ongoing;
+}
+
+static
+bool evaluate_session_consumed_size_condition(
                const struct lttng_condition *condition,
-               struct notification_thread_state *state,
-               struct lttng_evaluation **evaluation,
-               uid_t *session_uid, gid_t *session_gid)
+               const struct session_state_sample *sample)
+{
+       uint64_t threshold;
+       const struct lttng_condition_session_consumed_size *size_condition =
+                       lttng::utils::container_of(condition,
+                               &lttng_condition_session_consumed_size::parent);
+
+       threshold = size_condition->consumed_threshold_bytes.value;
+       DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
+                       threshold, sample->consumed_data_size);
+       return sample->consumed_data_size >= threshold;
+}
+
+/*
+ * `new_state` can be NULL to indicate that we are not evaluating a
+ * state transition. A client subscribed or a trigger was registered and
+ * we wish to perform an initial evaluation.
+ */
+static
+int evaluate_session_condition(
+               const struct lttng_condition *condition,
+               const struct session_info *session_info,
+               const struct session_state_sample *new_state,
+               struct lttng_evaluation **evaluation)
 {
        int ret;
-       struct cds_lfht_iter iter;
-       struct cds_lfht_node *node;
-       const char *session_name;
-       struct session_info *session_info = NULL;
+       bool previous_result, newest_result;
 
-       rcu_read_lock();
-       session_name = get_condition_session_name(condition);
+       switch (lttng_condition_get_type(condition)) {
+       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
+               if (new_state) {
+                       previous_result = evaluate_session_rotation_ongoing_condition(
+                                       condition, &session_info->last_state_sample);
+                       newest_result = evaluate_session_rotation_ongoing_condition(
+                                       condition, new_state);
+               } else {
+                       previous_result = false;
+                       newest_result = evaluate_session_rotation_ongoing_condition(
+                                       condition, &session_info->last_state_sample);
+               }
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               if (new_state) {
+                       previous_result = evaluate_session_consumed_size_condition(
+                                       condition, &session_info->last_state_sample);
+                       newest_result = evaluate_session_consumed_size_condition(
+                                       condition, new_state);
+               } else {
+                       previous_result = false;
+                       newest_result = evaluate_session_consumed_size_condition(
+                                       condition, &session_info->last_state_sample);
+               }
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+               /*
+                * Note that LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED is
+                * evaluated differently to only consider state transitions without regard for the
+                * initial state. This is a deliberate choice as it is unlikely that a user would
+                * expect an action to occur for a rotation that occurred long before the trigger or
+                * subscription occurred.
+                */
+               if (!new_state) {
+                       ret = 0;
+                       goto end;
+               }
 
-       /* Find the session associated with the trigger. */
-       cds_lfht_lookup(state->sessions_ht,
-                       hash_key_str(session_name, lttng_ht_seed),
-                       match_session,
-                       session_name,
-                       &iter);
-       node = cds_lfht_iter_get_node(&iter);
-       if (!node) {
-               DBG("No known session matching name \"%s\"",
-                               session_name);
+               previous_result = !session_info->last_state_sample.rotation.ongoing;
+               newest_result = !new_state->rotation.ongoing;
+               break;
+       default:
                ret = 0;
                goto end;
        }
 
-       session_info = caa_container_of(node, struct session_info,
-                       sessions_ht_node);
-       session_info_get(session_info);
+       if (!newest_result || (previous_result == newest_result)) {
+               /* Not a state transition, evaluate to false. */
+               ret = 0;
+               goto end;
+       }
 
-       /*
-        * Evaluation is performed in-line here since only one type of
-        * session-bound condition is handled for the moment.
-        */
        switch (lttng_condition_get_type(condition)) {
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
-               if (!session_info->rotation.ongoing) {
-                       ret = 0;
-                       goto end_session_put;
-               }
+       {
+               const auto rotation_id = new_state ?
+                               new_state->rotation.id :
+                                     session_info->last_state_sample.rotation.id;
 
-               *evaluation = lttng_evaluation_session_rotation_ongoing_create(
-                               session_info->rotation.id);
-               if (!*evaluation) {
-                       /* Fatal error. */
-                       ERR("Failed to create session rotation ongoing evaluation for session \"%s\"",
-                                       session_info->name);
-                       ret = -1;
-                       goto end_session_put;
-               }
-               ret = 0;
+               *evaluation = lttng_evaluation_session_rotation_ongoing_create(rotation_id);
+               break;
+       }
+       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+       {
+               const auto rotation_id = new_state ?
+                               new_state->rotation.id :
+                                     session_info->last_state_sample.rotation.id;
+
+               /* Callee acquires a reference to location. */
+               *evaluation = lttng_evaluation_session_rotation_completed_create(
+                               rotation_id, new_state->rotation.location);
                break;
+       }
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+       {
+               const auto latest_session_consumed_total = new_state ?
+                               new_state->consumed_data_size :
+                                     session_info->last_state_sample.consumed_data_size;
+
+               *evaluation = lttng_evaluation_session_consumed_size_create(
+                               latest_session_consumed_total);
+               break;
+       }
        default:
-               ret = 0;
-               goto end_session_put;
+               abort();
        }
 
-       *session_uid = session_info->uid;
-       *session_gid = session_info->gid;
+       if (!*evaluation) {
+               /* Fatal error. */
+               ERR("Failed to create session condition evaluation: session name = `%s`",
+                               session_info->name);
+               ret = -1;
+               goto end;
+       }
 
-end_session_put:
-       session_info_put(session_info);
+       ret = 0;
 end:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1050,9 +1172,25 @@ int evaluate_condition_for_client(const struct lttng_trigger *trigger,
 
        switch (get_condition_binding_object(condition)) {
        case LTTNG_OBJECT_TYPE_SESSION:
-               ret = evaluate_session_condition_for_client(condition, state,
-                               &evaluation, &object_uid, &object_gid);
+       {
+               /* Find the session associated with the condition. */
+               const auto *session_name = get_condition_session_name(condition);
+               auto session_info = get_session_info_by_name(state, session_name);
+               if (!session_info) {
+                       /* Not an error, the session doesn't exist yet. */
+                       DBG("Session not found while evaluating session condition for client: session name = `%s`",
+                                       session_name);
+                       ret = 0;
+                       goto end;
+               }
+
+               object_uid = session_info->uid;
+               object_gid = session_info->gid;
+
+               ret = evaluate_session_condition(condition, session_info, NULL, &evaluation);
+               session_info_put(session_info);
                break;
+       }
        case LTTNG_OBJECT_TYPE_CHANNEL:
                ret = evaluate_channel_condition_for_client(condition, state,
                                &evaluation, &object_uid, &object_gid);
@@ -1123,12 +1261,12 @@ int notification_thread_client_subscribe(struct notification_client *client,
                }
        }
 
-       condition_list_element = (lttng_condition_list_element *) zmalloc(sizeof(*condition_list_element));
+       condition_list_element = zmalloc<lttng_condition_list_element>();
        if (!condition_list_element) {
                ret = -1;
                goto error;
        }
-       client_list_element = (notification_client_list_element *) zmalloc(sizeof(*client_list_element));
+       client_list_element = zmalloc<notification_client_list_element>();
        if (!client_list_element) {
                ret = -1;
                goto error;
@@ -1282,7 +1420,7 @@ end:
 static
 void free_notification_client_rcu(struct rcu_head *node)
 {
-       free(caa_container_of(node, struct notification_client, rcu_node));
+       free(lttng::utils::container_of(node, &notification_client::rcu_node));
 }
 
 static
@@ -1404,27 +1542,6 @@ fail:
        return false;
 }
 
-static
-bool session_consumed_size_condition_applies_to_channel(
-               const struct lttng_condition *condition,
-               const struct channel_info *channel_info)
-{
-       enum lttng_condition_status status;
-       const char *condition_session_name = NULL;
-
-       status = lttng_condition_session_consumed_size_get_session_name(
-                       condition, &condition_session_name);
-       LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
-
-       if (strcmp(channel_info->session_info->name, condition_session_name)) {
-               goto fail;
-       }
-
-       return true;
-fail:
-       return false;
-}
-
 static
 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
                const struct channel_info *channel_info)
@@ -1443,10 +1560,6 @@ bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
                trigger_applies = buffer_usage_condition_applies_to_channel(
                                condition, channel_info);
                break;
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
-               trigger_applies = session_consumed_size_condition_applies_to_channel(
-                               condition, channel_info);
-               break;
        default:
                goto fail;
        }
@@ -1494,23 +1607,27 @@ end:
 /*
  * Allocate an empty lttng_session_trigger_list for the session named
  * 'session_name'.
- *
- * No ownership of 'session_name' is assumed by the session trigger list.
- * It is the caller's responsability to ensure the session name is alive
- * for as long as this list is.
  */
 static
 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
                const char *session_name,
                struct cds_lfht *session_triggers_ht)
 {
-       struct lttng_session_trigger_list *list;
+       struct lttng_session_trigger_list *list = NULL;
+       char *session_name_copy = strdup(session_name);
+
+       if (!session_name_copy) {
+               PERROR("Failed to allocate session name while building trigger list");
+               goto end;
+       }
 
-       list = (lttng_session_trigger_list *) zmalloc(sizeof(*list));
+       list = zmalloc<lttng_session_trigger_list>();
        if (!list) {
+               PERROR("Failed to allocate session trigger list while building trigger list");
                goto end;
        }
-       list->session_name = session_name;
+
+       list->session_name = session_name_copy;
        CDS_INIT_LIST_HEAD(&list->list);
        cds_lfht_node_init(&list->session_triggers_ht_node);
        list->session_triggers_ht = session_triggers_ht;
@@ -1528,8 +1645,11 @@ end:
 static
 void free_session_trigger_list_rcu(struct rcu_head *node)
 {
-       free(caa_container_of(node, struct lttng_session_trigger_list,
-                       rcu_node));
+       struct lttng_session_trigger_list *list =
+                       caa_container_of(node, struct lttng_session_trigger_list, rcu_node);
+
+       free(list->session_name);
+       free(list);
 }
 
 static
@@ -1557,7 +1677,7 @@ int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
 {
        int ret = 0;
        struct lttng_trigger_list_element *new_element =
-                       (lttng_trigger_list_element *) zmalloc(sizeof(*new_element));
+                       zmalloc<lttng_trigger_list_element>();
 
        if (!new_element) {
                ret = -1;
@@ -1581,17 +1701,11 @@ bool trigger_applies_to_session(const struct lttng_trigger *trigger,
        switch (lttng_condition_get_type(condition)) {
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
        {
-               enum lttng_condition_status condition_status;
                const char *condition_session_name;
 
-               condition_status = lttng_condition_session_rotation_get_session_name(
-                       condition, &condition_session_name);
-               if (condition_status != LTTNG_CONDITION_STATUS_OK) {
-                       ERR("Failed to retrieve session rotation condition's session name");
-                       goto end;
-               }
-
+               condition_session_name = get_condition_session_name(condition);
                LTTNG_ASSERT(condition_session_name);
                applies = !strcmp(condition_session_name, session_name);
                break;
@@ -1606,10 +1720,6 @@ end:
 /*
  * Allocate and initialize an lttng_session_trigger_list which contains
  * all triggers that apply to the session named 'session_name'.
- *
- * No ownership of 'session_name' is assumed by the session trigger list.
- * It is the caller's responsability to ensure the session name is alive
- * for as long as this list is.
  */
 static
 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
@@ -1652,39 +1762,22 @@ error:
 }
 
 static
-struct session_info *find_or_create_session_info(
-               struct notification_thread_state *state,
-               const char *name, uid_t uid, gid_t gid)
+struct session_info *create_and_publish_session_info(struct notification_thread_state *state,
+               uint64_t id,
+               const char *name,
+               uid_t uid,
+               gid_t gid)
 {
        struct session_info *session = NULL;
-       struct cds_lfht_node *node;
-       struct cds_lfht_iter iter;
        struct lttng_session_trigger_list *trigger_list;
 
        rcu_read_lock();
-       cds_lfht_lookup(state->sessions_ht,
-                       hash_key_str(name, lttng_ht_seed),
-                       match_session,
-                       name,
-                       &iter);
-       node = cds_lfht_iter_get_node(&iter);
-       if (node) {
-               DBG("Found session info of session \"%s\" (uid = %i, gid = %i)",
-                               name, uid, gid);
-               session = caa_container_of(node, struct session_info,
-                               sessions_ht_node);
-               LTTNG_ASSERT(session->uid == uid);
-               LTTNG_ASSERT(session->gid == gid);
-               session_info_get(session);
-               goto end;
-       }
-
        trigger_list = lttng_session_trigger_list_build(state, name);
        if (!trigger_list) {
                goto error;
        }
 
-       session = session_info_create(name, uid, gid, trigger_list,
+       session = session_info_create(id, name, uid, gid, trigger_list,
                        state->sessions_ht);
        if (!session) {
                ERR("Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
@@ -1692,11 +1785,16 @@ struct session_info *find_or_create_session_info(
                lttng_session_trigger_list_destroy(trigger_list);
                goto error;
        }
+
+       /* Transferred ownership to the new session. */
        trigger_list = NULL;
 
-       cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
-                       &session->sessions_ht_node);
-end:
+       if (cds_lfht_add_unique(state->sessions_ht, hash_session_info(session), match_session_info,
+                           &id, &session->sessions_ht_node) != &session->sessions_ht_node) {
+               ERR("Duplicate session found: name = `%s`, id = %" PRIu64, name, id);
+               goto error;
+       }
+
        rcu_read_unlock();
        return session;
 error:
@@ -1706,11 +1804,12 @@ error:
 }
 
 static
-int handle_notification_thread_command_add_channel(
-               struct notification_thread_state *state,
-               const char *session_name, uid_t session_uid, gid_t session_gid,
-               const char *channel_name, enum lttng_domain_type channel_domain,
-               uint64_t channel_key_int, uint64_t channel_capacity,
+int handle_notification_thread_command_add_channel(struct notification_thread_state *state,
+               uint64_t session_id,
+               const char *channel_name,
+               enum lttng_domain_type channel_domain,
+               uint64_t channel_key_int,
+               uint64_t channel_capacity,
                enum lttng_error_code *cmd_result)
 {
        struct cds_list_head trigger_list;
@@ -1725,16 +1824,17 @@ int handle_notification_thread_command_add_channel(
        struct cds_lfht_iter iter;
        struct session_info *session_info = NULL;
 
-       DBG("Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
-                       channel_name, session_name, channel_key_int,
+       DBG("Adding channel: channel name = `%s`, session id = %" PRIu64 ", channel key = %" PRIu64 ", domain = %s",
+                       channel_name, session_id, channel_key_int,
                        lttng_domain_type_str(channel_domain));
 
        CDS_INIT_LIST_HEAD(&trigger_list);
 
-       session_info = find_or_create_session_info(state, session_name,
-                       session_uid, session_gid);
+       session_info = get_session_info_by_id(state, session_id);
        if (!session_info) {
-               /* Allocation error or an internal error occurred. */
+               /* Fatal logic error. */
+               ERR("Failed to find session while adding channel: session id = %" PRIu64,
+                               session_id);
                goto error;
        }
 
@@ -1755,7 +1855,7 @@ int handle_notification_thread_command_add_channel(
                        continue;
                }
 
-               new_element = (lttng_trigger_list_element *) zmalloc(sizeof(*new_element));
+               new_element = zmalloc<lttng_trigger_list_element>();
                if (!new_element) {
                        rcu_read_unlock();
                        goto error;
@@ -1769,7 +1869,7 @@ int handle_notification_thread_command_add_channel(
 
        DBG("Found %i triggers that apply to newly added channel",
                        trigger_count);
-       channel_trigger_list = (lttng_channel_trigger_list *) zmalloc(sizeof(*channel_trigger_list));
+       channel_trigger_list = zmalloc<lttng_channel_trigger_list>();
        if (!channel_trigger_list) {
                goto error;
        }
@@ -1800,6 +1900,67 @@ error:
        return 1;
 }
 
+static
+int handle_notification_thread_command_add_session(struct notification_thread_state *state,
+               uint64_t session_id,
+               const char *session_name,
+               uid_t session_uid,
+               gid_t session_gid,
+               enum lttng_error_code *cmd_result)
+{
+       int ret;
+
+       DBG("Adding session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d",
+                       session_name, session_id, session_uid, session_gid);
+
+       auto session = create_and_publish_session_info(state, session_id, session_name, session_uid, session_gid);
+       if (!session) {
+               PERROR("Failed to add session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d",
+                               session_name, session_id, session_uid, session_gid);
+               ret = -1;
+               *cmd_result = LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       /*
+        * Note that the reference to `session` is not released; this reference is
+        * the "global" reference that is used to allow look-ups. This reference will
+        * only be released when the session is removed. See
+        * handle_notification_thread_command_remove_session.
+        */
+       ret = 0;
+       *cmd_result = LTTNG_OK;
+end:
+       return ret;
+}
+
+static
+int handle_notification_thread_command_remove_session(
+               struct notification_thread_state *state,
+               uint64_t session_id,
+               enum lttng_error_code *cmd_result)
+{
+       int ret;
+
+       DBG("Removing session: session id = %" PRIu64, session_id);
+
+       auto session = get_session_info_by_id(state, session_id);
+       if (!session) {
+               ERR("Failed to remove session: session id = %" PRIu64, session_id);
+               ret = -1;
+               *cmd_result = LTTNG_ERR_NO_SESSION;
+               goto end;
+       }
+
+       /* Release the reference returned by the look-up, and then release the global reference. */
+       session_info_put(session);
+       session_info_put(session);
+       ret = 0;
+       *cmd_result = LTTNG_OK;
+end:
+       return ret;
+}
+
 static
 void free_channel_trigger_list_rcu(struct rcu_head *node)
 {
@@ -1900,7 +2061,7 @@ static
 int handle_notification_thread_command_session_rotation(
        struct notification_thread_state *state,
        enum notification_thread_command_type cmd_type,
-       const char *session_name, uid_t session_uid, gid_t session_gid,
+       uint64_t session_id,
        uint64_t trace_archive_chunk_id,
        struct lttng_trace_archive_location *location,
        enum lttng_error_code *_cmd_result)
@@ -1910,77 +2071,75 @@ int handle_notification_thread_command_session_rotation(
        struct lttng_session_trigger_list *trigger_list;
        struct lttng_trigger_list_element *trigger_list_element;
        struct session_info *session_info;
-       const struct lttng_credentials session_creds = {
-               .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid),
-               .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid),
-       };
+       struct lttng_credentials session_creds;
+       struct session_state_sample new_session_state;
 
        rcu_read_lock();
 
-       session_info = find_or_create_session_info(state, session_name,
-                       session_uid, session_gid);
+       session_info = get_session_info_by_id(state, session_id);
        if (!session_info) {
-               /* Allocation error or an internal error occurred. */
+               /* Fatal logic error. */
+               ERR("Failed to find session while handling rotation state change: session id = %" PRIu64,
+                               session_id);
                ret = -1;
-               cmd_result = LTTNG_ERR_NOMEM;
+               cmd_result = LTTNG_ERR_FATAL;
                goto end;
        }
 
-       session_info->rotation.ongoing =
-                       cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
-       session_info->rotation.id = trace_archive_chunk_id;
-       trigger_list = get_session_trigger_list(state, session_name);
-       if (!trigger_list) {
-               DBG("No triggers applying to session \"%s\" found",
-                               session_name);
-               goto end;
+       new_session_state = session_info->last_state_sample;
+       if (location) {
+               lttng_trace_archive_location_get(location);
+               new_session_state.rotation.location = location;
+       } else {
+               new_session_state.rotation.location = NULL;
        }
 
+       session_creds = {
+               .uid = LTTNG_OPTIONAL_INIT_VALUE(session_info->uid),
+               .gid = LTTNG_OPTIONAL_INIT_VALUE(session_info->gid),
+       };
+
+       new_session_state.rotation.ongoing = cmd_type ==
+                       NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
+       new_session_state.rotation.id = trace_archive_chunk_id;
+
+       trigger_list = get_session_trigger_list(state, session_info->name);
+       LTTNG_ASSERT(trigger_list);
+
        cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
                        node) {
                const struct lttng_condition *condition;
                struct lttng_trigger *trigger;
                struct notification_client_list *client_list;
                struct lttng_evaluation *evaluation = NULL;
-               enum lttng_condition_type condition_type;
                enum action_executor_status executor_status;
 
                trigger = trigger_list_element->trigger;
                condition = lttng_trigger_get_const_condition(trigger);
                LTTNG_ASSERT(condition);
-               condition_type = lttng_condition_get_type(condition);
-
-               if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
-                               cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
-                       continue;
-               } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
-                               cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
-                       continue;
-               }
 
-               client_list = get_client_list_from_condition(state, condition);
-               if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
-                       evaluation = lttng_evaluation_session_rotation_ongoing_create(
-                                       trace_archive_chunk_id);
-               } else {
-                       evaluation = lttng_evaluation_session_rotation_completed_create(
-                                       trace_archive_chunk_id, location);
+               ret = evaluate_session_condition(
+                               condition, session_info, &new_session_state, &evaluation);
+               if (ret) {
+                       ret = -1;
+                       cmd_result = LTTNG_ERR_NOMEM;
+                       goto end;
                }
 
                if (!evaluation) {
-                       /* Internal error */
-                       ret = -1;
-                       cmd_result = LTTNG_ERR_UNK;
-                       goto put_list;
+                       continue;
                }
 
                /*
                 * Ownership of `evaluation` transferred to the action executor
-                * no matter the result.
+                * no matter the result. The callee acquires a reference to the
+                * client list: we can release our own.
                 */
+               client_list = get_client_list_from_condition(state, condition);
                executor_status = action_executor_enqueue_trigger(
                                state->executor, trigger, evaluation,
                                &session_creds, client_list);
+               notification_client_list_put(client_list);
                evaluation = NULL;
                switch (executor_status) {
                case ACTION_EXECUTOR_STATUS_OK:
@@ -1993,7 +2152,7 @@ int handle_notification_thread_command_session_rotation(
                         */
                        ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
                        ret = -1;
-                       goto put_list;
+                       goto end;
                case ACTION_EXECUTOR_STATUS_OVERFLOW:
                        /*
                         * TODO Add trigger identification (name/id) when
@@ -2003,18 +2162,19 @@ int handle_notification_thread_command_session_rotation(
                         */
                        WARN("No space left when enqueuing action associated with session rotation trigger");
                        ret = 0;
-                       goto put_list;
+                       goto end;
                default:
                        abort();
                }
-
-put_list:
-               notification_client_list_put(client_list);
-               if (caa_unlikely(ret)) {
-                       break;
-               }
        }
+
 end:
+       if (session_info) {
+               /* Ownership of new_session_state::location is transferred. */
+               lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location);
+               session_info->last_state_sample = new_session_state;
+       }
+
        session_info_put(session_info);
        *_cmd_result = cmd_result;
        rcu_read_unlock();
@@ -2032,7 +2192,7 @@ int handle_notification_thread_command_add_tracer_event_source(
        enum lttng_error_code cmd_result = LTTNG_OK;
        struct notification_event_tracer_event_source_element *element = NULL;
 
-       element = (notification_event_tracer_event_source_element *) zmalloc(sizeof(*element));
+       element = zmalloc<notification_event_tracer_event_source_element>();
        if (!element) {
                cmd_result = LTTNG_ERR_NOMEM;
                ret = -1;
@@ -2049,7 +2209,7 @@ int handle_notification_thread_command_add_tracer_event_source(
                        lttng_domain_type_str(domain_type));
 
        /* Adding the read side pipe to the event poll. */
-       ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR);
+       ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLPRI | LPOLLIN | LPOLLERR);
        if (ret < 0) {
                ERR("Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
                                tracer_event_source_fd,
@@ -2426,32 +2586,14 @@ int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
                struct notification_thread_state *state)
 {
        int ret = 0;
-       const struct lttng_condition *condition;
-       const char *session_name;
-       struct lttng_session_trigger_list *trigger_list;
-
-       ASSERT_RCU_READ_LOCKED();
-
-       condition = lttng_trigger_get_const_condition(trigger);
-       switch (lttng_condition_get_type(condition)) {
-       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
-       case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
-       {
-               enum lttng_condition_status status;
-
-               status = lttng_condition_session_rotation_get_session_name(
-                               condition, &session_name);
-               if (status != LTTNG_CONDITION_STATUS_OK) {
-                       ERR("Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
-                       ret = -1;
-                       goto end;
-               }
-               break;
-       }
-       default:
-               ret = -1;
-               goto end;
-       }
+       const struct lttng_condition *condition;
+       const char *session_name;
+       struct lttng_session_trigger_list *trigger_list;
+
+       ASSERT_RCU_READ_LOCKED();
+
+       condition = lttng_trigger_get_const_condition(trigger);
+       session_name = get_condition_session_name(condition);
 
        trigger_list = get_session_trigger_list(state, session_name);
        if (!trigger_list) {
@@ -2501,7 +2643,7 @@ int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
                                struct lttng_channel_trigger_list,
                                channel_triggers_ht_node);
 
-               trigger_list_element = (lttng_trigger_list_element *) zmalloc(sizeof(*trigger_list_element));
+               trigger_list_element = zmalloc<lttng_trigger_list_element>();
                if (!trigger_list_element) {
                        ret = -1;
                        goto end;
@@ -2622,7 +2764,7 @@ enum lttng_error_code setup_tracer_notifier(
        struct lttng_condition *condition = lttng_trigger_get_condition(trigger);
        struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
 
-       trigger_tokens_ht_element = (notification_trigger_tokens_ht_element *) zmalloc(sizeof(*trigger_tokens_ht_element));
+       trigger_tokens_ht_element = zmalloc<notification_trigger_tokens_ht_element>();
        if (!trigger_tokens_ht_element) {
                ret = LTTNG_ERR_NOMEM;
                goto end;
@@ -2745,7 +2887,7 @@ int handle_notification_thread_command_register_trigger(
                goto error;
        }
 
-       trigger_ht_element = (lttng_trigger_ht_element *) zmalloc(sizeof(*trigger_ht_element));
+       trigger_ht_element = zmalloc<lttng_trigger_ht_element>();
        if (!trigger_ht_element) {
                ret = -1;
                goto error;
@@ -2823,6 +2965,7 @@ int handle_notification_thread_command_register_trigger(
                        client_list = notification_client_list_create(state, condition);
                        if (!client_list) {
                                ERR("Error creating notification client list for trigger %s", trigger->name);
+                               ret = -1;
                                goto error_free_ht_element;
                        }
                }
@@ -2898,12 +3041,25 @@ int handle_notification_thread_command_register_trigger(
         */
        switch (get_condition_binding_object(condition)) {
        case LTTNG_OBJECT_TYPE_SESSION:
-               ret = evaluate_session_condition_for_client(condition, state,
-                               &evaluation, &object_uid,
-                               &object_gid);
-               LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
-               LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
+       {
+               /* Find the session associated with the condition. */
+               const auto *session_name = get_condition_session_name(condition);
+               auto session_info = get_session_info_by_name(state, session_name);
+               if (!session_info) {
+                       /* Not an error, the session doesn't exist yet. */
+                       DBG("Session not found while evaluating session condition during registration of trigger: session name = `%s`",
+                                       session_name);
+                       ret = 0;
+                       goto success;
+               }
+
+               LTTNG_OPTIONAL_SET(&object_creds.uid, session_info->uid);
+               LTTNG_OPTIONAL_SET(&object_creds.gid, session_info->gid);
+
+               ret = evaluate_session_condition(condition, session_info, NULL, &evaluation);
+               session_info_put(session_info);
                break;
+       }
        case LTTNG_OBJECT_TYPE_CHANNEL:
                ret = evaluate_channel_condition_for_client(condition, state,
                                &evaluation, &object_uid,
@@ -3036,6 +3192,35 @@ void teardown_tracer_notifier(struct notification_thread_state *state,
        }
 }
 
+static
+void remove_trigger_from_session_trigger_list(
+       struct lttng_session_trigger_list *trigger_list,
+       const struct lttng_trigger *trigger)
+{
+       bool found = false;
+       struct lttng_trigger_list_element *trigger_element, *tmp;
+
+       cds_list_for_each_entry_safe (trigger_element, tmp, &trigger_list->list, node) {
+               if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
+                       continue;
+               }
+
+               DBG("Removed trigger from session_triggers_ht");
+               cds_list_del(&trigger_element->node);
+               free(trigger_element);
+               /* A trigger can only appear once per session. */
+               found = true;
+               break;
+       }
+
+       if (!found) {
+               ERR("Failed to find trigger associated with session: session name = `%s`",
+                               trigger_list->session_name);
+       }
+
+       LTTNG_ASSERT(found);
+}
+
 static
 int handle_notification_thread_command_unregister_trigger(
                struct notification_thread_state *state,
@@ -3044,7 +3229,6 @@ int handle_notification_thread_command_unregister_trigger(
 {
        struct cds_lfht_iter iter;
        struct cds_lfht_node *triggers_ht_node;
-       struct lttng_channel_trigger_list *trigger_list;
        struct notification_client_list *client_list;
        struct lttng_trigger_ht_element *trigger_ht_element = NULL;
        const struct lttng_condition *condition = lttng_trigger_get_const_condition(
@@ -3069,22 +3253,57 @@ int handle_notification_thread_command_unregister_trigger(
        trigger_ht_element = caa_container_of(triggers_ht_node,
                        struct lttng_trigger_ht_element, node);
 
-       /* Remove trigger from channel_triggers_ht. */
-       cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
-                       channel_triggers_ht_node) {
-               struct lttng_trigger_list_element *trigger_element, *tmp;
+       switch (get_condition_binding_object(condition)) {
+       case LTTNG_OBJECT_TYPE_CHANNEL:
+       {
+               struct lttng_channel_trigger_list *trigger_list;
 
-               cds_list_for_each_entry_safe(trigger_element, tmp,
-                               &trigger_list->list, node) {
-                       if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
-                               continue;
+               /*
+                * Remove trigger from channel_triggers_ht.
+                *
+                * Note that multiple channels may have matched the trigger's
+                * condition (e.g. all instances of a given channel in per-pid buffering
+                * mode).
+                *
+                * Iterate on all lists since we don't know the target channels' keys.
+                */
+               cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
+                               channel_triggers_ht_node) {
+                       struct lttng_trigger_list_element *trigger_element, *tmp;
+
+                       cds_list_for_each_entry_safe(
+                                       trigger_element, tmp, &trigger_list->list, node) {
+                               if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
+                                       continue;
+                               }
+
+                               DBG("Removed trigger from channel_triggers_ht");
+                               cds_list_del(&trigger_element->node);
+                               free(trigger_element);
+                               /* A trigger can only appear once per channel */
+                               break;
                        }
+               }
+               break;
+       }
+       case LTTNG_OBJECT_TYPE_SESSION:
+       {
+               auto session = get_session_info_by_name(
+                               state, get_condition_session_name(condition));
 
-                       DBG("Removed trigger from channel_triggers_ht");
-                       cds_list_del(&trigger_element->node);
-                       /* A trigger can only appear once per channel */
+               /* Session doesn't exist, no trigger to remove. */
+               if (!session) {
                        break;
                }
+
+               auto session_trigger_list = get_session_trigger_list(state, session->name);
+               remove_trigger_from_session_trigger_list(session_trigger_list, trigger);
+               session_info_put(session);
+       }
+       case LTTNG_OBJECT_TYPE_NONE:
+               break;
+       default:
+               abort();
        }
 
        if (lttng_trigger_needs_tracer_notifier(trigger)) {
@@ -3123,28 +3342,43 @@ end:
        return 0;
 }
 
+static
+int pop_cmd_queue(struct notification_thread_handle *handle,
+               struct notification_thread_command **cmd)
+{
+       int ret;
+       uint64_t counter;
+
+       pthread_mutex_lock(&handle->cmd_queue.lock);
+       ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+       if (ret != sizeof(counter)) {
+               ret = -1;
+               goto error_unlock;
+       }
+
+       *cmd = cds_list_first_entry(&handle->cmd_queue.list,
+                       struct notification_thread_command, cmd_list_node);
+       cds_list_del(&((*cmd)->cmd_list_node));
+       ret = 0;
+
+error_unlock:
+       pthread_mutex_unlock(&handle->cmd_queue.lock);
+       return ret;
+}
+
 /* Returns 0 on success, 1 on exit requested, negative value on error. */
 int handle_notification_thread_command(
                struct notification_thread_handle *handle,
                struct notification_thread_state *state)
 {
        int ret;
-       uint64_t counter;
        struct notification_thread_command *cmd;
 
-       /* Read the event pipe to put it back into a quiescent state. */
-       ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
-                       sizeof(counter));
-       if (ret != sizeof(counter)) {
+       ret = pop_cmd_queue(handle, &cmd);
+       if (ret) {
                goto error;
        }
 
-       pthread_mutex_lock(&handle->cmd_queue.lock);
-       cmd = cds_list_first_entry(&handle->cmd_queue.list,
-                       struct notification_thread_command, cmd_list_node);
-       cds_list_del(&cmd->cmd_list_node);
-       pthread_mutex_unlock(&handle->cmd_queue.lock);
-
        DBG("Received `%s` command",
                        notification_command_type_str(cmd->type));
        switch (cmd->type) {
@@ -3163,9 +3397,7 @@ int handle_notification_thread_command(
        case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
                ret = handle_notification_thread_command_add_channel(
                                state,
-                               cmd->parameters.add_channel.session.name,
-                               cmd->parameters.add_channel.session.uid,
-                               cmd->parameters.add_channel.session.gid,
+                               cmd->parameters.add_channel.session.id,
                                cmd->parameters.add_channel.channel.name,
                                cmd->parameters.add_channel.channel.domain,
                                cmd->parameters.add_channel.channel.key,
@@ -3178,14 +3410,23 @@ int handle_notification_thread_command(
                                cmd->parameters.remove_channel.domain,
                                &cmd->reply_code);
                break;
+       case NOTIFICATION_COMMAND_TYPE_ADD_SESSION:
+               ret = handle_notification_thread_command_add_session(state,
+                               cmd->parameters.add_session.session_id,
+                               cmd->parameters.add_session.session_name,
+                               cmd->parameters.add_session.session_uid,
+                               cmd->parameters.add_session.session_gid, &cmd->reply_code);
+               break;
+       case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION:
+               ret = handle_notification_thread_command_remove_session(
+                               state, cmd->parameters.remove_session.session_id, &cmd->reply_code);
+               break;
        case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
        case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
                ret = handle_notification_thread_command_session_rotation(
                                state,
                                cmd->type,
-                               cmd->parameters.session_rotation.session_name,
-                               cmd->parameters.session_rotation.uid,
-                               cmd->parameters.session_rotation.gid,
+                               cmd->parameters.session_rotation.session_id,
                                cmd->parameters.session_rotation.trace_archive_chunk_id,
                                cmd->parameters.session_rotation.location,
                                &cmd->reply_code);
@@ -3336,7 +3577,7 @@ int handle_notification_thread_client_connect(
 
        DBG("Handling new notification channel client connection");
 
-       client = (notification_client *) zmalloc(sizeof(*client));
+       client = zmalloc<notification_client>();
        if (!client) {
                /* Fatal error. */
                ret = -1;
@@ -3379,9 +3620,9 @@ int handle_notification_thread_client_connect(
                goto error;
        }
 
+       client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN;
        ret = lttng_poll_add(&state->events, client->socket,
-                       LPOLLIN | LPOLLERR |
-                       LPOLLHUP | LPOLLRDHUP);
+                       client->communication.current_poll_events);
        if (ret < 0) {
                ERR("Failed to add notification channel client socket to poll set");
                ret = 0;
@@ -3515,6 +3756,18 @@ int handle_notification_thread_trigger_unregister_all(
        return error_occurred ? -1 : 0;
 }
 
+static
+bool client_has_outbound_data_left(
+               const struct notification_client *client)
+{
+       const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+                       &client->communication.outbound.payload, 0, -1);
+       const bool has_data = pv.buffer.size != 0;
+       const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
+
+       return has_data || has_fds;
+}
+
 static
 int client_handle_transmission_status(
                struct notification_client *client,
@@ -3525,24 +3778,51 @@ int client_handle_transmission_status(
 
        switch (transmission_status) {
        case CLIENT_TRANSMISSION_STATUS_COMPLETE:
-               ret = lttng_poll_mod(&state->events, client->socket,
-                               CLIENT_POLL_MASK_IN);
-               if (ret) {
-                       goto end;
-               }
-
-               break;
        case CLIENT_TRANSMISSION_STATUS_QUEUED:
+       {
+               int current_poll_events;
+               int new_poll_events;
                /*
                 * We want to be notified whenever there is buffer space
-                * available to send the rest of the payload.
+                * available to send the rest of the payload if we are
+                * waiting to send data to the client.
+                *
+                * The state of the outbound queue being sampled here is
+                * fine since:
+                *   - it is okay to wake-up "for nothing" in case we see
+                *     that data is left, but another thread succeeds in
+                *     flushing it before us when handling the client "out"
+                *     event. We will simply stop monitoring that event the next
+                *     time it wakes us up and we see no data left to be sent,
+                *   - if another thread fails to flush the entire client
+                *     outgoing queue, it will issue a "communication update"
+                *     command and cause the client's (e)poll mask to be
+                *     re-evaluated.
+                *
+                * The situation we seek to avoid would be to disable the
+                * monitoring of "out" client events indefinitely when there is
+                * data to be sent, which can't happen because of the
+                * aforementioned "communication update" mechanism.
                 */
-               ret = lttng_poll_mod(&state->events, client->socket,
-                               CLIENT_POLL_MASK_IN_OUT);
-               if (ret) {
-                       goto end;
+               pthread_mutex_lock(&client->lock);
+               current_poll_events = client->communication.current_poll_events;
+               new_poll_events = client_has_outbound_data_left(client) ?
+                               CLIENT_POLL_EVENTS_IN_OUT :
+                                     CLIENT_POLL_EVENTS_IN;
+               client->communication.current_poll_events = new_poll_events;
+               pthread_mutex_unlock(&client->lock);
+
+               /* Update the monitored event set only if it changed. */
+               if (current_poll_events != new_poll_events) {
+                       ret = lttng_poll_mod(&state->events, client->socket,
+                                       new_poll_events);
+                       if (ret) {
+                               goto end;
+                       }
                }
+
                break;
+       }
        case CLIENT_TRANSMISSION_STATUS_FAIL:
                ret = notification_thread_client_disconnect(client, state);
                if (ret) {
@@ -3682,18 +3962,6 @@ error:
        return CLIENT_TRANSMISSION_STATUS_ERROR;
 }
 
-static
-bool client_has_outbound_data_left(
-               const struct notification_client *client)
-{
-       const struct lttng_payload_view pv = lttng_payload_view_from_payload(
-                       &client->communication.outbound.payload, 0, -1);
-       const bool has_data = pv.buffer.size != 0;
-       const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
-
-       return has_data || has_fds;
-}
-
 /* Client lock must _not_ be held by the caller. */
 static
 int client_send_command_reply(struct notification_client *client,
@@ -3704,14 +3972,14 @@ int client_send_command_reply(struct notification_client *client,
        struct lttng_notification_channel_command_reply reply = {
                .status = (int8_t) status,
        };
-       struct lttng_notification_channel_message msg = {
-               .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
-               .size = sizeof(reply),
-               .fds = 0,
-       };
+       struct lttng_notification_channel_message msg;
        char buffer[sizeof(msg) + sizeof(reply)];
        enum client_transmission_status transmission_status;
 
+       msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY;
+       msg.size = sizeof(reply);
+       msg.fds = 0;
+
        memcpy(buffer, &msg, sizeof(msg));
        memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
        DBG("Send command reply (%i)", (int) status);
@@ -3809,15 +4077,15 @@ int client_handle_message_handshake(struct notification_client *client,
                        .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
                        .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
        };
-       const struct lttng_notification_channel_message msg_header = {
-                       .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
-                       .size = sizeof(handshake_reply),
-                       .fds = 0,
-       };
+       struct lttng_notification_channel_message msg_header;
        enum lttng_notification_channel_status status =
                        LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
        char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
 
+       msg_header.type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE;
+       msg_header.size = sizeof(handshake_reply);
+       msg_header.fds = 0;
+
        memcpy(send_buffer, &msg_header, sizeof(msg_header));
        memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
                        sizeof(handshake_reply));
@@ -4102,7 +4370,47 @@ int handle_notification_thread_client_out(
        }
 
        pthread_mutex_lock(&client->lock);
-       transmission_status = client_flush_outgoing_queue(client);
+       if (!client_has_outbound_data_left(client)) {
+               /*
+                * A client "out" event can be received when no payload is left
+                * to send under some circumstances.
+                *
+                * Many threads can flush a client's outgoing queue and, if they
+                * had to queue their message (socket was full), will use the
+                * "communication update" command to signal the (e)poll thread
+                * to monitor for space being made available in the socket.
+                *
+                * Commands are sent over an internal pipe serviced by the same
+                * thread as the client sockets.
+                *
+                * When space is made available in the socket, there is a race
+                * between the (e)poll thread and the other threads that may
+                * wish to use the client's socket to flush its outgoing queue.
+                *
+                * A non-(e)poll thread may attempt (and succeed) in flushing
+                * the queue before the (e)poll thread gets a chance to service
+                * the client's "out" event.
+                *
+                * In this situation, the (e)poll thread processing the client
+                * out event will see an empty payload: there is nothing to do
+                * except unsubscribing (e)poll "out" events.
+                *
+                * Note that this thread is the (e)poll thread so it can modify
+                * the (e)poll mask directly without using a communication
+                * update command. Other threads that flush the outgoing queue
+                * will use the "communication update" command to wake up this
+                * thread and force it to monitor "out" events.
+                *
+                * When other threads succeed in emptying the outgoing queue,
+                * they don't need to update the (e)poll mask: if the "out"
+                * event is monitored, it will fire once and the (e)poll
+                * thread will reach this condition, causing the event to
+                * stop being monitored.
+                */
+               transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+       } else {
+               transmission_status = client_flush_outgoing_queue(client);
+       }
        pthread_mutex_unlock(&client->lock);
 
        ret = client_handle_transmission_status(
@@ -4123,9 +4431,8 @@ bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
        bool result = false;
        uint64_t threshold;
        enum lttng_condition_type condition_type;
-       const struct lttng_condition_buffer_usage *use_condition = container_of(
-                       condition, struct lttng_condition_buffer_usage,
-                       parent);
+       const struct lttng_condition_buffer_usage *use_condition = lttng::utils::container_of(
+                       condition, &lttng_condition_buffer_usage::parent);
 
        if (use_condition->threshold_bytes.set) {
                threshold = use_condition->threshold_bytes.value;
@@ -4174,31 +4481,12 @@ bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
        return result;
 }
 
-static
-bool evaluate_session_consumed_size_condition(
-               const struct lttng_condition *condition,
-               uint64_t session_consumed_size)
-{
-       uint64_t threshold;
-       const struct lttng_condition_session_consumed_size *size_condition =
-                       container_of(condition,
-                               struct lttng_condition_session_consumed_size,
-                               parent);
-
-       threshold = size_condition->consumed_threshold_bytes.value;
-       DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
-                       threshold, session_consumed_size);
-       return session_consumed_size >= threshold;
-}
-
 static
 int evaluate_buffer_condition(const struct lttng_condition *condition,
                struct lttng_evaluation **evaluation,
                const struct notification_thread_state *state __attribute__((unused)),
                const struct channel_state_sample *previous_sample,
                const struct channel_state_sample *latest_sample,
-               uint64_t previous_session_consumed_total,
-               uint64_t latest_session_consumed_total,
                struct channel_info *channel_info)
 {
        int ret = 0;
@@ -4221,18 +4509,6 @@ int evaluate_buffer_condition(const struct lttng_condition *condition,
                                condition, latest_sample,
                                channel_info->capacity);
                break;
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
-               if (caa_likely(previous_sample_available)) {
-                       previous_sample_result =
-                               evaluate_session_consumed_size_condition(
-                                       condition,
-                                       previous_session_consumed_total);
-               }
-               latest_sample_result =
-                               evaluate_session_consumed_size_condition(
-                                       condition,
-                                       latest_session_consumed_total);
-               break;
        default:
                /* Unknown condition type; internal error. */
                abort();
@@ -4261,10 +4537,6 @@ int evaluate_buffer_condition(const struct lttng_condition *condition,
                                latest_sample->highest_usage,
                                channel_info->capacity);
                break;
-       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
-               *evaluation = lttng_evaluation_session_consumed_size_create(
-                               latest_session_consumed_total);
-               break;
        default:
                abort();
        }
@@ -4281,11 +4553,11 @@ static
 int client_notification_overflow(struct notification_client *client)
 {
        int ret = 0;
-       const struct lttng_notification_channel_message msg = {
-               .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
-               .size = 0,
-               .fds = 0,
-       };
+       struct lttng_notification_channel_message msg;
+
+       msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED;
+       msg.size = 0;
+       msg.fds = 0;
 
        ASSERT_LOCKED(client->lock);
 
@@ -4387,16 +4659,16 @@ int notification_client_list_send_evaluation(
                .trigger = (struct lttng_trigger *) trigger,
                .evaluation = (struct lttng_evaluation *) evaluation,
        };
-       struct lttng_notification_channel_message msg_header = {
-               .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
-               .size = 0,
-               .fds = 0,
-       };
+       struct lttng_notification_channel_message msg_header;
        const struct lttng_credentials *trigger_creds =
                        lttng_trigger_get_credentials(trigger);
 
        lttng_payload_init(&msg_payload);
 
+       msg_header.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
+       msg_header.size = 0;
+       msg_header.fds = 0;
+
        ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
                        sizeof(msg_header));
        if (ret) {
@@ -4586,7 +4858,7 @@ struct lttng_event_notifier_notification *recv_one_event_notifier_notification(
                goto end;
        }
 
-       capture_buffer = (char *) zmalloc(capture_buffer_size);
+       capture_buffer = calloc<char>(capture_buffer_size);
        if (!capture_buffer) {
                ERR("Failed to allocate capture buffer");
                goto end;
@@ -4666,10 +4938,9 @@ int dispatch_one_event_notifier_notification(struct notification_thread_state *s
        }
 
        evaluation = lttng_evaluation_event_rule_matches_create(
-                       container_of(lttng_trigger_get_const_condition(
+                       lttng::utils::container_of(lttng_trigger_get_const_condition(
                                                     element->trigger),
-                                       struct lttng_condition_event_rule_matches,
-                                       parent),
+                                       &lttng_condition_event_rule_matches::parent),
                        notification->capture_buffer,
                        notification->capture_buf_size, false);
 
@@ -4794,15 +5065,18 @@ int handle_notification_thread_channel_sample(
 {
        int ret = 0;
        struct lttcomm_consumer_channel_monitor_msg sample_msg;
-       struct channel_info *channel_info;
+       struct channel_info *channel_info = NULL;
        struct cds_lfht_node *node;
        struct cds_lfht_iter iter;
-       struct lttng_channel_trigger_list *trigger_list;
+       struct lttng_channel_trigger_list *channel_trigger_list;
+       struct lttng_session_trigger_list *session_trigger_list;
        struct lttng_trigger_list_element *trigger_list_element;
        bool previous_sample_available = false;
-       struct channel_state_sample previous_sample, latest_sample;
-       uint64_t previous_session_consumed_total, latest_session_consumed_total;
-       struct lttng_credentials channel_creds;
+       struct channel_state_sample channel_previous_sample, channel_new_sample;
+       struct session_state_sample session_new_sample;
+       struct lttng_credentials channel_creds = {};
+       struct lttng_credentials session_creds = {};
+       struct session_info *session;
 
        /*
         * The monitoring pipe only holds messages smaller than PIPE_BUF,
@@ -4817,19 +5091,95 @@ int handle_notification_thread_channel_sample(
        }
 
        ret = 0;
-       latest_sample.key.key = sample_msg.key;
-       latest_sample.key.domain = domain;
-       latest_sample.highest_usage = sample_msg.highest;
-       latest_sample.lowest_usage = sample_msg.lowest;
-       latest_sample.channel_total_consumed = sample_msg.total_consumed;
+       channel_new_sample.key.key = sample_msg.key;
+       channel_new_sample.key.domain = domain;
+       channel_new_sample.highest_usage = sample_msg.highest;
+       channel_new_sample.lowest_usage = sample_msg.lowest;
 
        rcu_read_lock();
 
+       session = get_session_info_by_id(state, sample_msg.session_id);
+       if (!session) {
+               DBG("Received a sample for an unknown session from consumerd: session id = %" PRIu64,
+                               sample_msg.session_id);
+               goto end_unlock;
+       }
+
+       session_new_sample = session->last_state_sample;
+       session_new_sample.consumed_data_size += sample_msg.consumed_since_last_sample;
+       session_creds = {
+               .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
+               .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
+       };
+
+       session_trigger_list = get_session_trigger_list(state, session->name);
+       LTTNG_ASSERT(session_trigger_list);
+       cds_list_for_each_entry(trigger_list_element, &session_trigger_list->list,
+                       node) {
+               const struct lttng_condition *condition;
+               struct lttng_trigger *trigger;
+               struct notification_client_list *client_list = NULL;
+               struct lttng_evaluation *evaluation = NULL;
+               enum action_executor_status executor_status;
+
+               ret = 0;
+               trigger = trigger_list_element->trigger;
+               condition = lttng_trigger_get_const_condition(trigger);
+               LTTNG_ASSERT(condition);
+
+               ret = evaluate_session_condition(
+                               condition, session, &session_new_sample, &evaluation);
+               if (caa_unlikely(ret)) {
+                       break;
+               }
+
+               if (caa_likely(!evaluation)) {
+                       continue;
+               }
+
+               /*
+                * Ownership of `evaluation` transferred to the action executor
+                * no matter the result. The callee acquires a reference to the
+                * client list: we can release our own.
+                */
+               client_list = get_client_list_from_condition(state, condition);
+               executor_status = action_executor_enqueue_trigger(
+                               state->executor, trigger, evaluation,
+                               &session_creds, client_list);
+               notification_client_list_put(client_list);
+               evaluation = NULL;
+               switch (executor_status) {
+               case ACTION_EXECUTOR_STATUS_OK:
+                       break;
+               case ACTION_EXECUTOR_STATUS_ERROR:
+               case ACTION_EXECUTOR_STATUS_INVALID:
+                       /*
+                        * TODO Add trigger identification (name/id) when
+                        * it is added to the API.
+                        */
+                       ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
+                       ret = -1;
+                       goto end_unlock;
+               case ACTION_EXECUTOR_STATUS_OVERFLOW:
+                       /*
+                        * TODO Add trigger identification (name/id) when
+                        * it is added to the API.
+                        *
+                        * Not a fatal error.
+                        */
+                       WARN("No space left when enqueuing action associated with buffer-condition trigger");
+                       ret = 0;
+                       goto end_unlock;
+               default:
+                       abort();
+               }
+       }
+
        /* Retrieve the channel's informations */
        cds_lfht_lookup(state->channels_ht,
-                       hash_channel_key(&latest_sample.key),
+                       hash_channel_key(&channel_new_sample.key),
                        match_channel_info,
-                       &latest_sample.key,
+                       &channel_new_sample.key,
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
        if (caa_unlikely(!node)) {
@@ -4840,28 +5190,26 @@ int handle_notification_thread_channel_sample(
                 * sample.
                 */
                DBG("Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
-                               latest_sample.key.key,
+                               channel_new_sample.key.key,
                                lttng_domain_type_str(domain));
                goto end_unlock;
        }
+
        channel_info = caa_container_of(node, struct channel_info,
                        channels_ht_node);
-       DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
+       DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", consumed since last sample = %" PRIu64")",
                        channel_info->name,
-                       latest_sample.key.key,
+                       channel_new_sample.key.key,
                        channel_info->session_info->name,
-                       latest_sample.highest_usage,
-                       latest_sample.lowest_usage,
-                       latest_sample.channel_total_consumed);
-
-       previous_session_consumed_total =
-                       channel_info->session_info->consumed_data_size;
+                       channel_new_sample.highest_usage,
+                       channel_new_sample.lowest_usage,
+                       sample_msg.consumed_since_last_sample);
 
        /* Retrieve the channel's last sample, if it exists, and update it. */
        cds_lfht_lookup(state->channel_state_ht,
-                       hash_channel_key(&latest_sample.key),
+                       hash_channel_key(&channel_new_sample.key),
                        match_channel_state_sample,
-                       &latest_sample.key,
+                       &channel_new_sample.key,
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
        if (caa_likely(node)) {
@@ -4872,16 +5220,11 @@ int handle_notification_thread_channel_sample(
                                struct channel_state_sample,
                                channel_state_ht_node);
 
-               memcpy(&previous_sample, stored_sample,
-                               sizeof(previous_sample));
-               stored_sample->highest_usage = latest_sample.highest_usage;
-               stored_sample->lowest_usage = latest_sample.lowest_usage;
-               stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
+               memcpy(&channel_previous_sample, stored_sample,
+                               sizeof(channel_previous_sample));
+               stored_sample->highest_usage = channel_new_sample.highest_usage;
+               stored_sample->lowest_usage = channel_new_sample.lowest_usage;
                previous_sample_available = true;
-
-               latest_session_consumed_total =
-                               previous_session_consumed_total +
-                               (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
        } else {
                /*
                 * This is the channel's first sample, allocate space for and
@@ -4889,45 +5232,36 @@ int handle_notification_thread_channel_sample(
                 */
                struct channel_state_sample *stored_sample;
 
-               stored_sample = (channel_state_sample *) zmalloc(sizeof(*stored_sample));
+               stored_sample = zmalloc<channel_state_sample>();
                if (!stored_sample) {
                        ret = -1;
                        goto end_unlock;
                }
 
-               memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
+               memcpy(stored_sample, &channel_new_sample, sizeof(*stored_sample));
                cds_lfht_node_init(&stored_sample->channel_state_ht_node);
                cds_lfht_add(state->channel_state_ht,
                                hash_channel_key(&stored_sample->key),
                                &stored_sample->channel_state_ht_node);
-
-               latest_session_consumed_total =
-                               previous_session_consumed_total +
-                               latest_sample.channel_total_consumed;
        }
 
-       channel_info->session_info->consumed_data_size =
-                       latest_session_consumed_total;
-
        /* Find triggers associated with this channel. */
        cds_lfht_lookup(state->channel_triggers_ht,
-                       hash_channel_key(&latest_sample.key),
+                       hash_channel_key(&channel_new_sample.key),
                        match_channel_trigger_list,
-                       &latest_sample.key,
+                       &channel_new_sample.key,
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
-       if (caa_likely(!node)) {
-               goto end_unlock;
-       }
+       LTTNG_ASSERT(node);
 
        channel_creds = (typeof(channel_creds)) {
                .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
                .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
        };
 
-       trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
+       channel_trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
                        channel_triggers_ht_node);
-       cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
+       cds_list_for_each_entry(trigger_list_element, &channel_trigger_list->list,
                        node) {
                const struct lttng_condition *condition;
                struct lttng_trigger *trigger;
@@ -4940,33 +5274,28 @@ int handle_notification_thread_channel_sample(
                condition = lttng_trigger_get_const_condition(trigger);
                LTTNG_ASSERT(condition);
 
-               /*
-                * Check if any client is subscribed to the result of this
-                * evaluation.
-                */
-               client_list = get_client_list_from_condition(state, condition);
-
                ret = evaluate_buffer_condition(condition, &evaluation, state,
-                               previous_sample_available ? &previous_sample : NULL,
-                               &latest_sample,
-                               previous_session_consumed_total,
-                               latest_session_consumed_total,
+                               previous_sample_available ? &channel_previous_sample : NULL,
+                               &channel_new_sample,
                                channel_info);
                if (caa_unlikely(ret)) {
-                       goto put_list;
+                       break;
                }
 
                if (caa_likely(!evaluation)) {
-                       goto put_list;
+                       continue;
                }
 
                /*
                 * Ownership of `evaluation` transferred to the action executor
-                * no matter the result.
+                * no matter the result. The callee acquires a reference to the
+                * client list: we can release our own.
                 */
+               client_list = get_client_list_from_condition(state, condition);
                executor_status = action_executor_enqueue_trigger(
                                state->executor, trigger, evaluation,
                                &channel_creds, client_list);
+               notification_client_list_put(client_list);
                evaluation = NULL;
                switch (executor_status) {
                case ACTION_EXECUTOR_STATUS_OK:
@@ -4979,7 +5308,7 @@ int handle_notification_thread_channel_sample(
                         */
                        ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
                        ret = -1;
-                       goto put_list;
+                       goto end_unlock;
                case ACTION_EXECUTOR_STATUS_OVERFLOW:
                        /*
                         * TODO Add trigger identification (name/id) when
@@ -4989,18 +5318,16 @@ int handle_notification_thread_channel_sample(
                         */
                        WARN("No space left when enqueuing action associated with buffer-condition trigger");
                        ret = 0;
-                       goto put_list;
+                       goto end_unlock;
                default:
                        abort();
                }
-
-put_list:
-               notification_client_list_put(client_list);
-               if (caa_unlikely(ret)) {
-                       break;
-               }
        }
 end_unlock:
+       if (session) {
+               session->last_state_sample = session_new_sample;
+       }
+       session_info_put(session);
        rcu_read_unlock();
 end:
        return ret;
This page took 0.054347 seconds and 4 git commands to generate.