#include <lttng/notification/notification-internal.h>
#include <lttng/condition/condition-internal.h>
#include <lttng/condition/buffer-usage-internal.h>
+#include <lttng/condition/session-consumed-size-internal.h>
#include <lttng/notification/channel-internal.h>
#include <time.h>
struct cds_lfht_node channel_state_ht_node;
uint64_t highest_usage;
uint64_t lowest_usage;
+ uint64_t channel_total_consumed;
};
static unsigned long hash_channel_key(struct channel_key *key);
-static int evaluate_condition(struct lttng_condition *condition,
+static int evaluate_condition(const struct lttng_condition *condition,
struct lttng_evaluation **evaluation,
- struct notification_thread_state *state,
- struct channel_state_sample *previous_sample,
- struct channel_state_sample *latest_sample,
- uint64_t buffer_capacity);
+ const struct notification_thread_state *state,
+ const struct channel_state_sample *previous_sample,
+ const struct channel_state_sample *latest_sample,
+ uint64_t previous_session_consumed_total,
+ uint64_t latest_session_consumed_total,
+ struct channel_info *channel_info);
static
int send_evaluation_to_clients(struct lttng_trigger *trigger,
struct lttng_evaluation *evaluation,
struct notification_thread_state *state,
uid_t channel_uid, gid_t channel_gid);
+
+static
+void session_info_destroy(void *_data);
+static
+void session_info_get(struct session_info *session_info);
+static
+void session_info_put(struct session_info *session_info);
+static
+struct session_info *session_info_create(const char *name,
+ uid_t uid, gid_t gid);
+static
+void session_info_add_channel(struct session_info *session_info,
+ struct channel_info *channel_info);
+static
+void session_info_remove_channel(struct session_info *session_info,
+ struct channel_info *channel_info);
+
static
int match_client(struct cds_lfht_node *node, const void *key)
{
val = condition->threshold_ratio.value * (double) UINT32_MAX;
hash ^= hash_key_u64(&val, lttng_ht_seed);
- } else if (condition->threshold_ratio.set) {
+ } else if (condition->threshold_bytes.set) {
uint64_t val;
val = condition->threshold_bytes.value;
return hash;
}
+static
+unsigned long lttng_condition_session_consumed_size_hash(
+ struct lttng_condition *_condition)
+{
+ unsigned long hash = 0;
+ struct lttng_condition_session_consumed_size *condition;
+ uint64_t val;
+
+ condition = container_of(_condition,
+ struct lttng_condition_session_consumed_size, parent);
+
+ if (condition->session_name) {
+ hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
+ }
+ val = condition->consumed_threshold_bytes.value;
+ hash ^= hash_key_u64(&val, lttng_ht_seed);
+ return hash;
+}
+
/*
* The lttng_condition hashing code is kept in this file (rather than
* condition.c) since it makes use of GPLv2 code (hashtable utils), which we
case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
return lttng_condition_buffer_usage_hash(condition);
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ return lttng_condition_session_consumed_size_hash(condition);
default:
ERR("[notification-thread] Unexpected condition type caught");
abort();
return;
}
- if (channel_info->session_name) {
- free(channel_info->session_name);
+ if (channel_info->session_info) {
+ session_info_remove_channel(channel_info->session_info,
+ channel_info);
+ session_info_put(channel_info->session_info);
}
- if (channel_info->channel_name) {
- free(channel_info->channel_name);
+ if (channel_info->name) {
+ free(channel_info->name);
}
free(channel_info);
}
+/* Don't call directly, use the ref-counting mechanism. */
static
-struct channel_info *channel_info_copy(struct channel_info *channel_info)
+void session_info_destroy(void *_data)
{
- struct channel_info *copy = zmalloc(sizeof(*channel_info));
+ struct session_info *session_info = _data;
+ int ret;
- assert(channel_info);
- assert(channel_info->session_name);
- assert(channel_info->channel_name);
+ assert(session_info);
+ if (session_info->channel_infos_ht) {
+ ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
+ if (ret) {
+ ERR("[notification-thread] Failed to destroy channel information hash table");
+ }
+ }
+ free(session_info->name);
+ free(session_info);
+}
- if (!copy) {
- goto end;
+static
+void session_info_get(struct session_info *session_info)
+{
+ if (!session_info) {
+ return;
}
+ lttng_ref_get(&session_info->ref);
+}
- memcpy(copy, channel_info, sizeof(*channel_info));
- copy->session_name = NULL;
- copy->channel_name = NULL;
+static
+void session_info_put(struct session_info *session_info)
+{
+ if (!session_info) {
+ return;
+ }
+ lttng_ref_put(&session_info->ref);
+}
- copy->session_name = strdup(channel_info->session_name);
- if (!copy->session_name) {
+static
+struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid)
+{
+ struct session_info *session_info;
+
+ assert(name);
+
+ session_info = zmalloc(sizeof(*session_info));
+ if (!session_info) {
+ goto end;
+ }
+ lttng_ref_init(&session_info->ref, session_info_destroy);
+
+ session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
+ 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
+ if (!session_info->channel_infos_ht) {
goto error;
}
- copy->channel_name = strdup(channel_info->channel_name);
- if (!copy->channel_name) {
+
+ cds_lfht_node_init(&session_info->sessions_ht_node);
+ session_info->name = strdup(name);
+ if (!session_info->name) {
goto error;
}
+ session_info->uid = uid;
+ session_info->gid = gid;
+end:
+ return session_info;
+error:
+ session_info_put(session_info);
+ return NULL;
+}
+
+static
+void session_info_add_channel(struct session_info *session_info,
+ struct channel_info *channel_info)
+{
+ rcu_read_lock();
+ cds_lfht_add(session_info->channel_infos_ht,
+ hash_channel_key(&channel_info->key),
+ &channel_info->session_info_channels_ht_node);
+ rcu_read_unlock();
+}
+
+static
+void session_info_remove_channel(struct session_info *session_info,
+ struct channel_info *channel_info)
+{
+ rcu_read_lock();
+ cds_lfht_del(session_info->channel_infos_ht,
+ &channel_info->session_info_channels_ht_node);
+ rcu_read_unlock();
+}
+
+static
+struct channel_info *channel_info_create(const char *channel_name,
+ struct channel_key *channel_key, uint64_t channel_capacity,
+ struct session_info *session_info)
+{
+ struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
+
+ if (!channel_info) {
+ goto end;
+ }
+
cds_lfht_node_init(&channel_info->channels_ht_node);
+ cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
+ memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
+ channel_info->capacity = channel_capacity;
+
+ channel_info->name = strdup(channel_name);
+ if (!channel_info->name) {
+ goto error;
+ }
+
+ /*
+ * Set the references between session and channel infos:
+ * - channel_info holds a strong reference to session_info
+ * - session_info holds a weak reference to channel_info
+ */
+ session_info_get(session_info);
+ session_info_add_channel(session_info, channel_info);
+ channel_info->session_info = session_info;
end:
- return copy;
+ return channel_info;
error:
- channel_info_destroy(copy);
+ channel_info_destroy(channel_info);
return NULL;
}
assert(current_condition);
if (!lttng_condition_is_equal(condition,
- current_condition)) {
+ current_condition)) {
continue;
}
goto end;
}
- ret = evaluate_condition(condition, &evaluation, state, NULL,
- last_sample, channel_info->capacity);
+ ret = evaluate_condition(condition, &evaluation, state,
+ NULL, last_sample,
+ 0, channel_info->session_info->consumed_data_size,
+ channel_info);
if (ret) {
- WARN("[notification-thread] Fatal error occured while evaluating a newly subscribed-to condition");
+ WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
goto end;
}
/* Send evaluation result to the newly-subscribed client. */
DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
- state, channel_info->uid, channel_info->gid);
+ state, channel_info->session_info->uid,
+ channel_info->session_info->gid);
end:
return ret;
&iter);
node = cds_lfht_iter_get_node(&iter);
if (!node) {
+ /*
+ * No notification-emiting trigger registered with this
+ * condition. We don't evaluate the condition right away
+ * since this trigger is not registered yet.
+ */
free(client_list_element);
goto end_unlock;
}
client_list = caa_container_of(node, struct notification_client_list,
notification_trigger_ht_node);
+ /*
+ * The condition to which the client just subscribed is evaluated
+ * at this point so that conditions that are already TRUE result
+ * in a notification being sent out.
+ */
if (evaluate_condition_for_client(client_list->trigger, condition,
client, state)) {
WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
}
static
-bool trigger_applies_to_channel(struct lttng_trigger *trigger,
- struct channel_info *info)
+bool buffer_usage_condition_applies_to_channel(
+ struct lttng_condition *condition,
+ struct channel_info *channel_info)
{
enum lttng_condition_status status;
- struct lttng_condition *condition;
- const char *trigger_session_name = NULL;
- const char *trigger_channel_name = NULL;
- enum lttng_domain_type trigger_domain;
+ enum lttng_domain_type condition_domain;
+ const char *condition_session_name = NULL;
+ const char *condition_channel_name = NULL;
- condition = lttng_trigger_get_condition(trigger);
- if (!condition) {
+ status = lttng_condition_buffer_usage_get_domain_type(condition,
+ &condition_domain);
+ assert(status == LTTNG_CONDITION_STATUS_OK);
+ if (channel_info->key.domain != condition_domain) {
goto fail;
}
- switch (lttng_condition_get_type(condition)) {
- case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
- case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
- break;
- default:
+ status = lttng_condition_buffer_usage_get_session_name(
+ condition, &condition_session_name);
+ assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
+
+ status = lttng_condition_buffer_usage_get_channel_name(
+ condition, &condition_channel_name);
+ assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
+
+ if (strcmp(channel_info->session_info->name, condition_session_name)) {
+ goto fail;
+ }
+ if (strcmp(channel_info->name, condition_channel_name)) {
goto fail;
}
- status = lttng_condition_buffer_usage_get_domain_type(condition,
- &trigger_domain);
- assert(status == LTTNG_CONDITION_STATUS_OK);
- if (info->key.domain != trigger_domain) {
+ return true;
+fail:
+ return false;
+}
+
+static
+bool session_consumed_size_condition_applies_to_channel(
+ struct lttng_condition *condition,
+ struct channel_info *channel_info)
+{
+ enum lttng_condition_status status;
+ const char *condition_session_name = NULL;
+
+ status = lttng_condition_session_consumed_size_get_session_name(
+ condition, &condition_session_name);
+ assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
+
+ if (strcmp(channel_info->session_info->name, condition_session_name)) {
goto fail;
}
- status = lttng_condition_buffer_usage_get_session_name(
- condition, &trigger_session_name);
- assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
+ return true;
+fail:
+ return false;
+}
- status = lttng_condition_buffer_usage_get_channel_name(
- condition, &trigger_channel_name);
- assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
+static
+bool trigger_applies_to_channel(struct lttng_trigger *trigger,
+ struct channel_info *channel_info)
+{
+ struct lttng_condition *condition;
+ bool trigger_applies;
- if (strcmp(info->session_name, trigger_session_name)) {
+ condition = lttng_trigger_get_condition(trigger);
+ if (!condition) {
goto fail;
}
- if (strcmp(info->channel_name, trigger_channel_name)) {
+
+ switch (lttng_condition_get_type(condition)) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+ trigger_applies = buffer_usage_condition_applies_to_channel(
+ condition, channel_info);
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ trigger_applies = session_consumed_size_condition_applies_to_channel(
+ condition, channel_info);
+ break;
+ default:
goto fail;
}
- return true;
+ return trigger_applies;
fail:
return false;
}
return applies;
}
+static
+int match_session(struct cds_lfht_node *node, const void *key)
+{
+ const char *name = key;
+ struct session_info *session_info = caa_container_of(
+ node, struct session_info, sessions_ht_node);
+
+ return !strcmp(session_info->name, name);
+}
+
+static
+struct session_info *find_or_create_session_info(
+ struct notification_thread_state *state,
+ const char *name, uid_t uid, gid_t gid)
+{
+ struct session_info *session = NULL;
+ struct cds_lfht_node *node;
+ struct cds_lfht_iter iter;
+
+ rcu_read_lock();
+ cds_lfht_lookup(state->sessions_ht,
+ hash_key_str(name, lttng_ht_seed),
+ match_session,
+ name,
+ &iter);
+ node = cds_lfht_iter_get_node(&iter);
+ if (node) {
+ DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
+ name, uid, gid);
+ session = caa_container_of(node, struct session_info,
+ sessions_ht_node);
+ assert(session->uid == uid);
+ assert(session->gid == gid);
+ goto end;
+ }
+
+ session = session_info_create(name, uid, gid);
+ if (!session) {
+ ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
+ name, uid, gid);
+ goto end;
+ }
+end:
+ rcu_read_unlock();
+ return session;
+}
+
static
int handle_notification_thread_command_add_channel(
- struct notification_thread_state *state,
- struct channel_info *channel_info,
- enum lttng_error_code *cmd_result)
+ struct notification_thread_state *state,
+ const char *session_name, uid_t session_uid, gid_t session_gid,
+ const char *channel_name, enum lttng_domain_type channel_domain,
+ uint64_t channel_key_int, uint64_t channel_capacity,
+ enum lttng_error_code *cmd_result)
{
struct cds_list_head trigger_list;
- struct channel_info *new_channel_info;
- struct channel_key *channel_key;
+ struct channel_info *new_channel_info = NULL;
+ struct channel_key channel_key = {
+ .key = channel_key_int,
+ .domain = channel_domain,
+ };
struct lttng_channel_trigger_list *channel_trigger_list = NULL;
struct lttng_trigger_ht_element *trigger_ht_element = NULL;
int trigger_count = 0;
struct cds_lfht_iter iter;
+ struct session_info *session_info = NULL;
DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
- channel_info->channel_name, channel_info->session_name,
- channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
+ channel_name, session_name, channel_key_int,
+ channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
CDS_INIT_LIST_HEAD(&trigger_list);
- new_channel_info = channel_info_copy(channel_info);
- if (!new_channel_info) {
+ session_info = find_or_create_session_info(state, session_name,
+ session_uid, session_gid);
+ if (!session_info) {
+ /* Allocation error or an internal error occured. */
goto error;
}
- channel_key = &new_channel_info->key;
+ new_channel_info = channel_info_create(channel_name, &channel_key,
+ channel_capacity, session_info);
+ if (!new_channel_info) {
+ goto error;
+ }
/* Build a list of all triggers applying to the new channel. */
cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
struct lttng_trigger_list_element *new_element;
if (!trigger_applies_to_channel(trigger_ht_element->trigger,
- channel_info)) {
+ new_channel_info)) {
continue;
}
if (!channel_trigger_list) {
goto error;
}
- channel_trigger_list->channel_key = *channel_key;
+ channel_trigger_list->channel_key = new_channel_info->key;
CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
cds_list_splice(&trigger_list, &channel_trigger_list->list);
rcu_read_lock();
/* Add channel to the channel_ht which owns the channel_infos. */
cds_lfht_add(state->channels_ht,
- hash_channel_key(channel_key),
+ hash_channel_key(&new_channel_info->key),
&new_channel_info->channels_ht_node);
/*
* Add the list of triggers associated with this channel to the
* channel_triggers_ht.
*/
cds_lfht_add(state->channel_triggers_ht,
- hash_channel_key(channel_key),
+ hash_channel_key(&new_channel_info->key),
&channel_trigger_list->channel_triggers_ht_node);
rcu_read_unlock();
*cmd_result = LTTNG_OK;
return 0;
error:
- /* Empty trigger list */
channel_info_destroy(new_channel_info);
+ session_info_put(session_info);
return 1;
}
* are checked against the channel at that moment.
*
* If this function returns a non-zero value, it means something is
- * fundamentally and the whole subsystem/thread will be torn down.
+ * fundamentally broken and the whole subsystem/thread will be torn down.
*
* If a non-fatal error occurs, just set the cmd_result to the appropriate
* error code.
*/
static
int handle_notification_thread_command_register_trigger(
- struct notification_thread_state *state,
- struct lttng_trigger *trigger,
- enum lttng_error_code *cmd_result)
+ struct notification_thread_state *state,
+ struct lttng_trigger *trigger,
+ enum lttng_error_code *cmd_result)
{
int ret = 0;
struct lttng_condition *condition;
cds_lfht_add(state->notification_trigger_clients_ht,
lttng_condition_hash(condition),
&client_list->notification_trigger_ht_node);
- /*
- * Client list ownership transferred to the
- * notification_trigger_clients_ht.
- */
- client_list = NULL;
/*
* Add the trigger to list of triggers bound to the channels currently
CDS_INIT_LIST_HEAD(&trigger_list_element->node);
trigger_list_element->trigger = trigger;
cds_list_add(&trigger_list_element->node, &trigger_list->list);
+ }
- /* A trigger can only apply to one channel. */
- break;
+ /*
+ * Since there is nothing preventing clients from subscribing to a
+ * condition before the corresponding trigger is registered, we have
+ * to evaluate this new condition right away.
+ *
+ * At some point, we were waiting for the next "evaluation" (e.g. on
+ * reception of a channel sample) to evaluate this new condition, but
+ * that was broken.
+ *
+ * The reason it was broken is that waiting for the next sample
+ * does not allow us to properly handle transitions for edge-triggered
+ * conditions.
+ *
+ * Consider this example: when we handle a new channel sample, we
+ * evaluate each conditions twice: once with the previous state, and
+ * again with the newest state. We then use those two results to
+ * determine whether a state change happened: a condition was false and
+ * became true. If a state change happened, we have to notify clients.
+ *
+ * Now, if a client subscribes to a given notification and registers
+ * a trigger *after* that subscription, we have to make sure the
+ * condition is evaluated at this point while considering only the
+ * current state. Otherwise, the next evaluation cycle may only see
+ * that the evaluations remain the same (true for samples n-1 and n) and
+ * the client will never know that the condition has been met.
+ */
+ cds_list_for_each_entry_safe(client_list_element, tmp,
+ &client_list->list, node) {
+ ret = evaluate_condition_for_client(trigger, condition,
+ client_list_element->client, state);
+ if (ret) {
+ goto error_free_client_list;
+ }
}
+ /*
+ * Client list ownership transferred to the
+ * notification_trigger_clients_ht.
+ */
+ client_list = NULL;
+
*cmd_result = LTTNG_OK;
error_free_client_list:
if (client_list) {
uint64_t counter;
struct notification_thread_command *cmd;
- /* Read event_fd to put it back into a quiescent state. */
- ret = read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+ /* Read the event pipe to put it back into a quiescent state. */
+ ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
+ sizeof(counter));
if (ret == -1) {
goto error;
}
case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
DBG("[notification-thread] Received add channel command");
ret = handle_notification_thread_command_add_channel(
- state, &cmd->parameters.add_channel,
+ state,
+ cmd->parameters.add_channel.session.name,
+ cmd->parameters.add_channel.session.uid,
+ cmd->parameters.add_channel.session.gid,
+ cmd->parameters.add_channel.channel.name,
+ cmd->parameters.add_channel.channel.domain,
+ cmd->parameters.add_channel.channel.key,
+ cmd->parameters.add_channel.channel.capacity,
&cmd->reply_code);
break;
case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
if (client->communication.inbound.msg_type ==
LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
- /*
- * FIXME The current state should be evaluated on
- * subscription.
- */
ret = notification_thread_client_subscribe(client,
condition, state, &status);
} else {
}
static
-bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
- struct channel_state_sample *sample, uint64_t buffer_capacity)
+bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
+ const struct channel_state_sample *sample,
+ uint64_t buffer_capacity)
{
bool result = false;
uint64_t threshold;
enum lttng_condition_type condition_type;
- struct lttng_condition_buffer_usage *use_condition = container_of(
+ const struct lttng_condition_buffer_usage *use_condition = container_of(
condition, struct lttng_condition_buffer_usage,
parent);
- if (!sample) {
- goto end;
- }
-
if (use_condition->threshold_bytes.set) {
threshold = use_condition->threshold_bytes.value;
} else {
result = true;
}
}
-end:
+
return result;
}
static
-int evaluate_condition(struct lttng_condition *condition,
+bool evaluate_session_consumed_size_condition(
+ const struct lttng_condition *condition,
+ uint64_t session_consumed_size)
+{
+ uint64_t threshold;
+ const struct lttng_condition_session_consumed_size *size_condition =
+ container_of(condition,
+ struct lttng_condition_session_consumed_size,
+ parent);
+
+ threshold = size_condition->consumed_threshold_bytes.value;
+ DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
+ threshold, session_consumed_size);
+ return session_consumed_size >= threshold;
+}
+
+static
+int evaluate_condition(const struct lttng_condition *condition,
struct lttng_evaluation **evaluation,
- struct notification_thread_state *state,
- struct channel_state_sample *previous_sample,
- struct channel_state_sample *latest_sample,
- uint64_t buffer_capacity)
+ const struct notification_thread_state *state,
+ const struct channel_state_sample *previous_sample,
+ const struct channel_state_sample *latest_sample,
+ uint64_t previous_session_consumed_total,
+ uint64_t latest_session_consumed_total,
+ struct channel_info *channel_info)
{
int ret = 0;
enum lttng_condition_type condition_type;
- bool previous_sample_result;
+ const bool previous_sample_available = !!previous_sample;
+ bool previous_sample_result = false;
bool latest_sample_result;
condition_type = lttng_condition_get_type(condition);
- /* No other condition type supported for the moment. */
- assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
- condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
- previous_sample_result = evaluate_buffer_usage_condition(condition,
- previous_sample, buffer_capacity);
- latest_sample_result = evaluate_buffer_usage_condition(condition,
- latest_sample, buffer_capacity);
+ switch (condition_type) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
+ if (caa_likely(previous_sample_available)) {
+ previous_sample_result =
+ evaluate_buffer_usage_condition(condition,
+ previous_sample, channel_info->capacity);
+ }
+ latest_sample_result = evaluate_buffer_usage_condition(
+ condition, latest_sample,
+ channel_info->capacity);
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ if (caa_likely(previous_sample_available)) {
+ previous_sample_result =
+ evaluate_session_consumed_size_condition(
+ condition,
+ previous_session_consumed_total);
+ }
+ latest_sample_result =
+ evaluate_session_consumed_size_condition(
+ condition,
+ latest_session_consumed_total);
+ break;
+ default:
+ /* Unknown condition type; internal error. */
+ abort();
+ }
if (!latest_sample_result ||
(previous_sample_result == latest_sample_result)) {
goto end;
}
- if (evaluation && latest_sample_result) {
+ if (!evaluation || !latest_sample_result) {
+ goto end;
+ }
+
+ switch (condition_type) {
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
+ case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
*evaluation = lttng_evaluation_buffer_usage_create(
condition_type,
latest_sample->highest_usage,
- buffer_capacity);
- if (!*evaluation) {
- ret = -1;
- goto end;
- }
+ channel_info->capacity);
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ *evaluation = lttng_evaluation_session_consumed_size_create(
+ condition_type,
+ latest_session_consumed_total);
+ break;
+ default:
+ abort();
+ }
+
+ if (!*evaluation) {
+ ret = -1;
+ goto end;
}
end:
return ret;
{
int ret = 0;
struct lttcomm_consumer_channel_monitor_msg sample_msg;
- struct channel_state_sample previous_sample, latest_sample;
struct channel_info *channel_info;
struct cds_lfht_node *node;
struct cds_lfht_iter iter;
struct lttng_channel_trigger_list *trigger_list;
struct lttng_trigger_list_element *trigger_list_element;
bool previous_sample_available = false;
+ struct channel_state_sample previous_sample, latest_sample;
+ uint64_t previous_session_consumed_total, latest_session_consumed_total;
/*
* The monitoring pipe only holds messages smaller than PIPE_BUF,
latest_sample.key.domain = domain;
latest_sample.highest_usage = sample_msg.highest;
latest_sample.lowest_usage = sample_msg.lowest;
+ latest_sample.channel_total_consumed = sample_msg.total_consumed;
rcu_read_lock();
&latest_sample.key,
&iter);
node = cds_lfht_iter_get_node(&iter);
- if (!node) {
+ if (caa_unlikely(!node)) {
/*
* Not an error since the consumer can push a sample to the pipe
* and the rest of the session daemon could notify us of the
}
channel_info = caa_container_of(node, struct channel_info,
channels_ht_node);
- DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
- channel_info->channel_name,
+ DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
+ channel_info->name,
latest_sample.key.key,
- channel_info->session_name,
+ channel_info->session_info->name,
latest_sample.highest_usage,
- latest_sample.lowest_usage);
+ latest_sample.lowest_usage,
+ latest_sample.channel_total_consumed);
+
+ previous_session_consumed_total =
+ channel_info->session_info->consumed_data_size;
/* Retrieve the channel's last sample, if it exists, and update it. */
cds_lfht_lookup(state->channel_state_ht,
&latest_sample.key,
&iter);
node = cds_lfht_iter_get_node(&iter);
- if (node) {
+ if (caa_likely(node)) {
struct channel_state_sample *stored_sample;
/* Update the sample stored. */
stored_sample = caa_container_of(node,
struct channel_state_sample,
channel_state_ht_node);
+
memcpy(&previous_sample, stored_sample,
sizeof(previous_sample));
stored_sample->highest_usage = latest_sample.highest_usage;
stored_sample->lowest_usage = latest_sample.lowest_usage;
+ stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
previous_sample_available = true;
+
+ latest_session_consumed_total =
+ previous_session_consumed_total +
+ (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
} else {
/*
* This is the channel's first sample, allocate space for and
cds_lfht_add(state->channel_state_ht,
hash_channel_key(&stored_sample->key),
&stored_sample->channel_state_ht_node);
+
+ latest_session_consumed_total =
+ previous_session_consumed_total +
+ latest_sample.channel_total_consumed;
}
+ channel_info->session_info->consumed_data_size =
+ latest_session_consumed_total;
+
/* Find triggers associated with this channel. */
cds_lfht_lookup(state->channel_triggers_ht,
hash_channel_key(&latest_sample.key),
&latest_sample.key,
&iter);
node = cds_lfht_iter_get_node(&iter);
- if (!node) {
+ if (caa_likely(!node)) {
goto end_unlock;
}
ret = evaluate_condition(condition, &evaluation, state,
previous_sample_available ? &previous_sample : NULL,
- &latest_sample, channel_info->capacity);
- if (ret) {
+ &latest_sample,
+ previous_session_consumed_total,
+ latest_session_consumed_total,
+ channel_info);
+ if (caa_unlikely(ret)) {
goto end_unlock;
}
- if (!evaluation) {
+ if (caa_likely(!evaluation)) {
continue;
}
/* Dispatch evaluation result to all clients. */
ret = send_evaluation_to_clients(trigger_list_element->trigger,
evaluation, client_list, state,
- channel_info->uid, channel_info->gid);
- if (ret) {
+ channel_info->session_info->uid,
+ channel_info->session_info->gid);
+ lttng_evaluation_destroy(evaluation);
+ if (caa_unlikely(ret)) {
goto end_unlock;
}
}