Session consumed size notification
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
index df801be59d77f1b50561cae48705177277f133cb..6a7477aff44a165d1cb5d763526aea8498fde2f8 100644 (file)
@@ -32,6 +32,7 @@
 #include <lttng/notification/notification-internal.h>
 #include <lttng/condition/condition-internal.h>
 #include <lttng/condition/buffer-usage-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
 #include <lttng/notification/channel-internal.h>
 
 #include <time.h>
@@ -152,15 +153,18 @@ struct channel_state_sample {
        struct cds_lfht_node channel_state_ht_node;
        uint64_t highest_usage;
        uint64_t lowest_usage;
+       uint64_t channel_total_consumed;
 };
 
 static unsigned long hash_channel_key(struct channel_key *key);
-static int evaluate_condition(struct lttng_condition *condition,
+static int evaluate_condition(const struct lttng_condition *condition,
                struct lttng_evaluation **evaluation,
-               struct notification_thread_state *state,
-               struct channel_state_sample *previous_sample,
-               struct channel_state_sample *latest_sample,
-               uint64_t buffer_capacity);
+               const struct notification_thread_state *state,
+               const struct channel_state_sample *previous_sample,
+               const struct channel_state_sample *latest_sample,
+               uint64_t previous_session_consumed_total,
+               uint64_t latest_session_consumed_total,
+               struct channel_info *channel_info);
 static
 int send_evaluation_to_clients(struct lttng_trigger *trigger,
                struct lttng_evaluation *evaluation,
@@ -321,6 +325,25 @@ unsigned long lttng_condition_buffer_usage_hash(
        return hash;
 }
 
+static
+unsigned long lttng_condition_session_consumed_size_hash(
+       struct lttng_condition *_condition)
+{
+       unsigned long hash = 0;
+       struct lttng_condition_session_consumed_size *condition;
+       uint64_t val;
+
+       condition = container_of(_condition,
+                       struct lttng_condition_session_consumed_size, parent);
+
+       if (condition->session_name) {
+               hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
+       }
+       val = condition->consumed_threshold_bytes.value;
+       hash ^= hash_key_u64(&val, lttng_ht_seed);
+       return hash;
+}
+
 /*
  * The lttng_condition hashing code is kept in this file (rather than
  * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
@@ -333,6 +356,8 @@ unsigned long lttng_condition_hash(struct lttng_condition *condition)
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
        case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
                return lttng_condition_buffer_usage_hash(condition);
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               return lttng_condition_session_consumed_size_hash(condition);
        default:
                ERR("[notification-thread] Unexpected condition type caught");
                abort();
@@ -574,8 +599,10 @@ int evaluate_condition_for_client(struct lttng_trigger *trigger,
                goto end;
        }
 
-       ret = evaluate_condition(condition, &evaluation, state, NULL,
-                       last_sample, channel_info->capacity);
+       ret = evaluate_condition(condition, &evaluation, state,
+                       NULL, last_sample,
+                       0, channel_info->session_info->consumed_data_size,
+                       channel_info);
        if (ret) {
                WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
                goto end;
@@ -844,51 +871,90 @@ end:
 }
 
 static
-bool trigger_applies_to_channel(struct lttng_trigger *trigger,
+bool buffer_usage_condition_applies_to_channel(
+               struct lttng_condition *condition,
                struct channel_info *channel_info)
 {
        enum lttng_condition_status status;
-       struct lttng_condition *condition;
-       const char *trigger_session_name = NULL;
-       const char *trigger_channel_name = NULL;
-       enum lttng_domain_type trigger_domain;
+       enum lttng_domain_type condition_domain;
+       const char *condition_session_name = NULL;
+       const char *condition_channel_name = NULL;
 
-       condition = lttng_trigger_get_condition(trigger);
-       if (!condition) {
+       status = lttng_condition_buffer_usage_get_domain_type(condition,
+                       &condition_domain);
+       assert(status == LTTNG_CONDITION_STATUS_OK);
+       if (channel_info->key.domain != condition_domain) {
                goto fail;
        }
 
-       switch (lttng_condition_get_type(condition)) {
-       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
-       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
-               break;
-       default:
+       status = lttng_condition_buffer_usage_get_session_name(
+                       condition, &condition_session_name);
+       assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
+
+       status = lttng_condition_buffer_usage_get_channel_name(
+                       condition, &condition_channel_name);
+       assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
+
+       if (strcmp(channel_info->session_info->name, condition_session_name)) {
+               goto fail;
+       }
+       if (strcmp(channel_info->name, condition_channel_name)) {
                goto fail;
        }
 
-       status = lttng_condition_buffer_usage_get_domain_type(condition,
-                       &trigger_domain);
-       assert(status == LTTNG_CONDITION_STATUS_OK);
-       if (channel_info->key.domain != trigger_domain) {
+       return true;
+fail:
+       return false;
+}
+
+static
+bool session_consumed_size_condition_applies_to_channel(
+               struct lttng_condition *condition,
+               struct channel_info *channel_info)
+{
+       enum lttng_condition_status status;
+       const char *condition_session_name = NULL;
+
+       status = lttng_condition_session_consumed_size_get_session_name(
+                       condition, &condition_session_name);
+       assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
+
+       if (strcmp(channel_info->session_info->name, condition_session_name)) {
                goto fail;
        }
 
-       status = lttng_condition_buffer_usage_get_session_name(
-                       condition, &trigger_session_name);
-       assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
+       return true;
+fail:
+       return false;
+}
 
-       status = lttng_condition_buffer_usage_get_channel_name(
-                       condition, &trigger_channel_name);
-       assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
+static
+bool trigger_applies_to_channel(struct lttng_trigger *trigger,
+               struct channel_info *channel_info)
+{
+       struct lttng_condition *condition;
+       bool trigger_applies;
 
-       if (strcmp(channel_info->session_info->name, trigger_session_name)) {
+       condition = lttng_trigger_get_condition(trigger);
+       if (!condition) {
                goto fail;
        }
-       if (strcmp(channel_info->name, trigger_channel_name)) {
+
+       switch (lttng_condition_get_type(condition)) {
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+               trigger_applies = buffer_usage_condition_applies_to_channel(
+                               condition, channel_info);
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               trigger_applies = session_consumed_size_condition_applies_to_channel(
+                               condition, channel_info);
+               break;
+       default:
                goto fail;
        }
 
-       return true;
+       return trigger_applies;
 fail:
        return false;
 }
@@ -1315,9 +1381,6 @@ int handle_notification_thread_command_register_trigger(
                CDS_INIT_LIST_HEAD(&trigger_list_element->node);
                trigger_list_element->trigger = trigger;
                cds_list_add(&trigger_list_element->node, &trigger_list->list);
-
-               /* A trigger can only apply to one channel. */
-               break;
        }
 
        /*
@@ -2129,20 +2192,17 @@ end:
 }
 
 static
-bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
-               struct channel_state_sample *sample, uint64_t buffer_capacity)
+bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
+               const struct channel_state_sample *sample,
+               uint64_t buffer_capacity)
 {
        bool result = false;
        uint64_t threshold;
        enum lttng_condition_type condition_type;
-       struct lttng_condition_buffer_usage *use_condition = container_of(
+       const struct lttng_condition_buffer_usage *use_condition = container_of(
                        condition, struct lttng_condition_buffer_usage,
                        parent);
 
-       if (!sample) {
-               goto end;
-       }
-
        if (use_condition->threshold_bytes.set) {
                threshold = use_condition->threshold_bytes.value;
        } else {
@@ -2186,32 +2246,73 @@ bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
                        result = true;
                }
        }
-end:
+
        return result;
 }
 
 static
-int evaluate_condition(struct lttng_condition *condition,
+bool evaluate_session_consumed_size_condition(
+               const struct lttng_condition *condition,
+               uint64_t session_consumed_size)
+{
+       uint64_t threshold;
+       const struct lttng_condition_session_consumed_size *size_condition =
+                       container_of(condition,
+                               struct lttng_condition_session_consumed_size,
+                               parent);
+
+       threshold = size_condition->consumed_threshold_bytes.value;
+       DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
+                       threshold, session_consumed_size);
+       return session_consumed_size >= threshold;
+}
+
+static
+int evaluate_condition(const struct lttng_condition *condition,
                struct lttng_evaluation **evaluation,
-               struct notification_thread_state *state,
-               struct channel_state_sample *previous_sample,
-               struct channel_state_sample *latest_sample,
-               uint64_t buffer_capacity)
+               const struct notification_thread_state *state,
+               const struct channel_state_sample *previous_sample,
+               const struct channel_state_sample *latest_sample,
+               uint64_t previous_session_consumed_total,
+               uint64_t latest_session_consumed_total,
+               struct channel_info *channel_info)
 {
        int ret = 0;
        enum lttng_condition_type condition_type;
-       bool previous_sample_result;
+       const bool previous_sample_available = !!previous_sample;
+       bool previous_sample_result = false;
        bool latest_sample_result;
 
        condition_type = lttng_condition_get_type(condition);
-       /* No other condition type supported for the moment. */
-       assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
-                       condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
 
-       previous_sample_result = evaluate_buffer_usage_condition(condition,
-                       previous_sample, buffer_capacity);
-       latest_sample_result = evaluate_buffer_usage_condition(condition,
-                       latest_sample, buffer_capacity);
+       switch (condition_type) {
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+               if (caa_likely(previous_sample_available)) {
+                       previous_sample_result =
+                               evaluate_buffer_usage_condition(condition,
+                                       previous_sample, channel_info->capacity);
+               }
+               latest_sample_result = evaluate_buffer_usage_condition(
+                               condition, latest_sample,
+                               channel_info->capacity);
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               if (caa_likely(previous_sample_available)) {
+                       previous_sample_result =
+                               evaluate_session_consumed_size_condition(
+                                       condition,
+                                       previous_session_consumed_total);
+               }
+               latest_sample_result =
+                               evaluate_session_consumed_size_condition(
+                                       condition,
+                                       latest_session_consumed_total);
+               break;
+       default:
+               /* Unknown condition type; internal error. */
+               abort();
+       }
 
        if (!latest_sample_result ||
                        (previous_sample_result == latest_sample_result)) {
@@ -2224,15 +2325,30 @@ int evaluate_condition(struct lttng_condition *condition,
                goto end;
        }
 
-       if (evaluation && latest_sample_result) {
+       if (!evaluation || !latest_sample_result) {
+               goto end;
+       }
+
+       switch (condition_type) {
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+       case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
                *evaluation = lttng_evaluation_buffer_usage_create(
                                condition_type,
                                latest_sample->highest_usage,
-                               buffer_capacity);
-               if (!*evaluation) {
-                       ret = -1;
-                       goto end;
-               }
+                               channel_info->capacity);
+               break;
+       case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+               *evaluation = lttng_evaluation_session_consumed_size_create(
+                               condition_type,
+                               latest_session_consumed_total);
+               break;
+       default:
+               abort();
+       }
+
+       if (!*evaluation) {
+               ret = -1;
+               goto end;
        }
 end:
        return ret;
@@ -2369,13 +2485,14 @@ int handle_notification_thread_channel_sample(
 {
        int ret = 0;
        struct lttcomm_consumer_channel_monitor_msg sample_msg;
-       struct channel_state_sample previous_sample, latest_sample;
        struct channel_info *channel_info;
        struct cds_lfht_node *node;
        struct cds_lfht_iter iter;
        struct lttng_channel_trigger_list *trigger_list;
        struct lttng_trigger_list_element *trigger_list_element;
        bool previous_sample_available = false;
+       struct channel_state_sample previous_sample, latest_sample;
+       uint64_t previous_session_consumed_total, latest_session_consumed_total;
 
        /*
         * The monitoring pipe only holds messages smaller than PIPE_BUF,
@@ -2394,6 +2511,7 @@ int handle_notification_thread_channel_sample(
        latest_sample.key.domain = domain;
        latest_sample.highest_usage = sample_msg.highest;
        latest_sample.lowest_usage = sample_msg.lowest;
+       latest_sample.channel_total_consumed = sample_msg.total_consumed;
 
        rcu_read_lock();
 
@@ -2419,12 +2537,16 @@ int handle_notification_thread_channel_sample(
        }
        channel_info = caa_container_of(node, struct channel_info,
                        channels_ht_node);
-       DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
+       DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
                        channel_info->name,
                        latest_sample.key.key,
                        channel_info->session_info->name,
                        latest_sample.highest_usage,
-                       latest_sample.lowest_usage);
+                       latest_sample.lowest_usage,
+                       latest_sample.channel_total_consumed);
+
+       previous_session_consumed_total =
+                       channel_info->session_info->consumed_data_size;
 
        /* Retrieve the channel's last sample, if it exists, and update it. */
        cds_lfht_lookup(state->channel_state_ht,
@@ -2440,12 +2562,17 @@ int handle_notification_thread_channel_sample(
                stored_sample = caa_container_of(node,
                                struct channel_state_sample,
                                channel_state_ht_node);
+
                memcpy(&previous_sample, stored_sample,
                                sizeof(previous_sample));
                stored_sample->highest_usage = latest_sample.highest_usage;
                stored_sample->lowest_usage = latest_sample.lowest_usage;
                stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
                previous_sample_available = true;
+
+               latest_session_consumed_total =
+                               previous_session_consumed_total +
+                               (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
        } else {
                /*
                 * This is the channel's first sample, allocate space for and
@@ -2464,8 +2591,15 @@ int handle_notification_thread_channel_sample(
                cds_lfht_add(state->channel_state_ht,
                                hash_channel_key(&stored_sample->key),
                                &stored_sample->channel_state_ht_node);
+
+               latest_session_consumed_total =
+                               previous_session_consumed_total +
+                               latest_sample.channel_total_consumed;
        }
 
+       channel_info->session_info->consumed_data_size =
+                       latest_session_consumed_total;
+
        /* Find triggers associated with this channel. */
        cds_lfht_lookup(state->channel_triggers_ht,
                        hash_channel_key(&latest_sample.key),
@@ -2521,12 +2655,15 @@ int handle_notification_thread_channel_sample(
 
                ret = evaluate_condition(condition, &evaluation, state,
                                previous_sample_available ? &previous_sample : NULL,
-                               &latest_sample, channel_info->capacity);
-               if (ret) {
+                               &latest_sample,
+                               previous_session_consumed_total,
+                               latest_session_consumed_total,
+                               channel_info);
+               if (caa_unlikely(ret)) {
                        goto end_unlock;
                }
 
-               if (!evaluation) {
+               if (caa_likely(!evaluation)) {
                        continue;
                }
 
This page took 0.028139 seconds and 4 git commands to generate.