*/
#include "lttng/action/action.h"
-#include "lttng/trigger/trigger-internal.h"
+#include "lttng/trigger/trigger-internal.hpp"
#define _LGPL_SOURCE
#include <urcu.h>
#include <urcu/rculfhash.h>
-#include <common/defaults.h>
-#include <common/error.h>
-#include <common/futex.h>
-#include <common/unix.h>
-#include <common/dynamic-buffer.h>
-#include <common/hashtable/utils.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/macros.h>
+#include <common/defaults.hpp>
+#include <common/error.hpp>
+#include <common/futex.hpp>
+#include <common/unix.hpp>
+#include <common/dynamic-buffer.hpp>
+#include <common/hashtable/utils.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/macros.hpp>
#include <lttng/condition/condition.h>
-#include <lttng/action/action-internal.h>
-#include <lttng/action/list-internal.h>
-#include <lttng/domain-internal.h>
-#include <lttng/notification/notification-internal.h>
-#include <lttng/condition/condition-internal.h>
-#include <lttng/condition/buffer-usage-internal.h>
-#include <lttng/condition/session-consumed-size-internal.h>
-#include <lttng/condition/session-rotation-internal.h>
-#include <lttng/condition/event-rule-matches-internal.h>
-#include <lttng/domain-internal.h>
-#include <lttng/notification/channel-internal.h>
-#include <lttng/trigger/trigger-internal.h>
-#include <lttng/event-rule/event-rule-internal.h>
+#include <lttng/action/action-internal.hpp>
+#include <lttng/action/list-internal.hpp>
+#include <lttng/domain-internal.hpp>
+#include <lttng/notification/notification-internal.hpp>
+#include <lttng/condition/condition-internal.hpp>
+#include <lttng/condition/buffer-usage-internal.hpp>
+#include <lttng/condition/session-consumed-size-internal.hpp>
+#include <lttng/condition/session-rotation-internal.hpp>
+#include <lttng/condition/event-rule-matches-internal.hpp>
+#include <lttng/domain-internal.hpp>
+#include <lttng/notification/channel-internal.hpp>
+#include <lttng/trigger/trigger-internal.hpp>
+#include <lttng/event-rule/event-rule-internal.hpp>
+#include <lttng/location-internal.hpp>
#include <time.h>
#include <unistd.h>
#include <inttypes.h>
#include <fcntl.h>
-#include "condition-internal.h"
-#include "event-notifier-error-accounting.h"
-#include "notification-thread.h"
-#include "notification-thread-events.h"
-#include "notification-thread-commands.h"
-#include "lttng-sessiond.h"
-#include "kernel.h"
+#include "condition-internal.hpp"
+#include "event-notifier-error-accounting.hpp"
+#include "notification-thread.hpp"
+#include "notification-thread-events.hpp"
+#include "notification-thread-commands.hpp"
+#include "lttng-sessiond.hpp"
+#include "kernel.hpp"
-#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
+#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLRDHUP)
+#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT)
/* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
#define MAX_CAPTURE_SIZE (PIPE_BUF)
LTTNG_OBJECT_TYPE_SESSION,
};
-struct lttng_trigger_list_element {
- /* No ownership of the trigger object is assumed. */
- struct lttng_trigger *trigger;
- struct cds_list_head node;
-};
-
struct lttng_channel_trigger_list {
struct channel_key channel_key;
/* List of struct lttng_trigger_list_element. */
* - lttng_session_trigger_list_add()
*/
struct lttng_session_trigger_list {
- /*
- * Not owned by this; points to the session_info structure's
- * session name.
- */
- const char *session_name;
+ char *session_name;
/* List of struct lttng_trigger_list_element. */
struct cds_list_head list;
/* Node in the session_triggers_ht */
struct rcu_head rcu_node;
};
+namespace {
+struct lttng_trigger_list_element {
+ /* No ownership of the trigger object is assumed. */
+ struct lttng_trigger *trigger;
+ struct cds_list_head node;
+};
+
struct lttng_trigger_ht_element {
struct lttng_trigger *trigger;
struct cds_lfht_node node;
struct cds_lfht_node channel_state_ht_node;
uint64_t highest_usage;
uint64_t lowest_usage;
- uint64_t channel_total_consumed;
/* call_rcu delayed reclaim. */
struct rcu_head rcu_node;
};
+} /* namespace */
static unsigned long hash_channel_key(struct channel_key *key);
static int evaluate_buffer_condition(const struct lttng_condition *condition,
const struct notification_thread_state *state,
const struct channel_state_sample *previous_sample,
const struct channel_state_sample *latest_sample,
- uint64_t previous_session_consumed_total,
- uint64_t latest_session_consumed_total,
struct channel_info *channel_info);
static
int send_evaluation_to_clients(const struct lttng_trigger *trigger,
static
void session_info_put(struct session_info *session_info);
static
-struct session_info *session_info_create(const char *name,
- uid_t uid, gid_t gid,
+struct session_info *session_info_create(uint64_t id,
+ const char *name,
+ uid_t uid,
+ gid_t gid,
struct lttng_session_trigger_list *trigger_list,
struct cds_lfht *sessions_ht);
-static
-void session_info_add_channel(struct session_info *session_info,
- struct channel_info *channel_info);
+static void session_info_add_channel(
+ struct session_info *session_info, struct channel_info *channel_info);
static
void session_info_remove_channel(struct session_info *session_info,
struct channel_info *channel_info);
{
/* This double-cast is intended to supress pointer-to-cast warning. */
const notification_client_id id = *((notification_client_id *) key);
- const struct notification_client *client = caa_container_of(
- node, struct notification_client, client_id_ht_node);
+ const struct notification_client *client = lttng::utils::container_of(
+ node, ¬ification_client::client_id_ht_node);
return client->id == id;
}
}
static
-int match_session(struct cds_lfht_node *node, const void *key)
+int match_session_info(struct cds_lfht_node *node, const void *key)
+{
+ const auto session_id = *((uint64_t *) key);
+ const auto *session_info = lttng::utils::container_of(
+ node, &session_info::sessions_ht_node);
+
+ return session_id == session_info->id;
+}
+
+static
+unsigned long hash_session_info_id(uint64_t id)
{
- const char *name = (const char *) key;
- struct session_info *session_info = caa_container_of(
- node, struct session_info, sessions_ht_node);
+ return hash_key_u64(&id, lttng_ht_seed);
+}
- return !strcmp(session_info->name, name);
+static
+unsigned long hash_session_info(const struct session_info *session_info)
+{
+ return hash_session_info_id(session_info->id);
+}
+
+static
+struct session_info *get_session_info_by_id(
+ const struct notification_thread_state *state, uint64_t id)
+{
+ struct cds_lfht_iter iter;
+ struct cds_lfht_node *node;
+ lttng::urcu::read_lock_guard read_lock_guard;
+
+ cds_lfht_lookup(state->sessions_ht,
+ hash_session_info_id(id),
+ match_session_info,
+ &id,
+ &iter);
+ node = cds_lfht_iter_get_node(&iter);
+
+ if (node) {
+ auto session_info = lttng::utils::container_of(node, &session_info::sessions_ht_node);
+
+ session_info_get(session_info);
+ return session_info;
+ }
+
+ return NULL;
+}
+
+static
+struct session_info *get_session_info_by_name(
+ const struct notification_thread_state *state, const char *name)
+{
+ uint64_t session_id;
+ const auto found = sample_session_id_by_name(name, &session_id);
+
+ return found ? get_session_info_by_id(state, session_id) : NULL;
}
static
return "ADD_CHANNEL";
case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
return "REMOVE_CHANNEL";
+ case NOTIFICATION_COMMAND_TYPE_ADD_SESSION:
+ return "ADD_SESSION";
+ case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION:
+ return "REMOVE_SESSION";
case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
return "SESSION_ROTATION_ONGOING";
case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
switch (lttng_condition_get_type(condition)) {
case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
- case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
return LTTNG_OBJECT_TYPE_CHANNEL;
case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
return LTTNG_OBJECT_TYPE_SESSION;
case LTTNG_CONDITION_TYPE_EVENT_RULE_MATCHES:
return LTTNG_OBJECT_TYPE_NONE;
static
void free_channel_info_rcu(struct rcu_head *node)
{
- free(caa_container_of(node, struct channel_info, rcu_node));
+ free(lttng::utils::container_of(node, &channel_info::rcu_node));
}
static
static
void free_session_info_rcu(struct rcu_head *node)
{
- free(caa_container_of(node, struct session_info, rcu_node));
+ free(lttng::utils::container_of(node, &session_info::rcu_node));
}
/* Don't call directly, use the ref-counting mechanism. */
&session_info->sessions_ht_node);
rcu_read_unlock();
free(session_info->name);
+ lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location);
call_rcu(&session_info->rcu_node, free_session_info_rcu);
}
}
static
-struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
+struct session_info *session_info_create(uint64_t id,
+ const char *name,
+ uid_t uid,
+ gid_t gid,
struct lttng_session_trigger_list *trigger_list,
struct cds_lfht *sessions_ht)
{
LTTNG_ASSERT(name);
- session_info = (struct session_info *) zmalloc(sizeof(*session_info));
+ session_info = zmalloc<struct session_info>();
if (!session_info) {
goto end;
}
+
lttng_ref_init(&session_info->ref, session_info_destroy);
session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
}
cds_lfht_node_init(&session_info->sessions_ht_node);
+ session_info->id = id;
session_info->name = strdup(name);
if (!session_info->name) {
goto error;
}
+
session_info->uid = uid;
session_info->gid = gid;
session_info->trigger_list = trigger_list;
struct channel_key *channel_key, uint64_t channel_capacity,
struct session_info *session_info)
{
- struct channel_info *channel_info = (struct channel_info *) zmalloc(sizeof(*channel_info));
+ struct channel_info *channel_info = zmalloc<struct channel_info>();
if (!channel_info) {
goto end;
void notification_client_list_release(struct urcu_ref *list_ref)
{
struct notification_client_list *list =
- container_of(list_ref, typeof(*list), ref);
+ lttng::utils::container_of(list_ref, ¬ification_client_list::ref);
struct notification_client_list_element *client_list_element, *tmp;
lttng_condition_put(list->condition);
struct cds_lfht_iter iter;
struct notification_client_list *client_list;
- client_list = (notification_client_list *) zmalloc(sizeof(*client_list));
+ client_list = zmalloc<notification_client_list>();
if (!client_list) {
PERROR("Failed to allocate notification client list");
goto end;
continue;
}
- client_list_element = (notification_client_list_element *) zmalloc(sizeof(*client_list_element));
+ client_list_element = zmalloc<notification_client_list_element>();
if (!client_list_element) {
goto error_put_client_list;
}
&iter);
node = cds_lfht_iter_get_node(&iter);
if (node) {
- list = container_of(node, struct notification_client_list,
- notification_trigger_clients_ht_node);
+ list = lttng::utils::container_of(node,
+ ¬ification_client_list::notification_trigger_clients_ht_node);
list = notification_client_list_get(list) ? list : NULL;
}
ret = evaluate_buffer_condition(condition, evaluation, state,
NULL, last_sample,
- 0, channel_info->session_info->consumed_data_size,
channel_info);
if (ret) {
WARN("Fatal error occurred while evaluating a newly subscribed-to condition");
}
static
-int evaluate_session_condition_for_client(
+bool evaluate_session_rotation_ongoing_condition(const struct lttng_condition *condition
+ __attribute__((unused)),
+ const struct session_state_sample *sample)
+{
+ return sample->rotation.ongoing;
+}
+
+static
+bool evaluate_session_consumed_size_condition(
const struct lttng_condition *condition,
- struct notification_thread_state *state,
- struct lttng_evaluation **evaluation,
- uid_t *session_uid, gid_t *session_gid)
+ const struct session_state_sample *sample)
+{
+ uint64_t threshold;
+ const struct lttng_condition_session_consumed_size *size_condition =
+ lttng::utils::container_of(condition,
+ <tng_condition_session_consumed_size::parent);
+
+ threshold = size_condition->consumed_threshold_bytes.value;
+ DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
+ threshold, sample->consumed_data_size);
+ return sample->consumed_data_size >= threshold;
+}
+
+/*
+ * `new_state` can be NULL to indicate that we are not evaluating a
+ * state transition. A client subscribed or a trigger was registered and
+ * we wish to perform an initial evaluation.
+ */
+static
+int evaluate_session_condition(
+ const struct lttng_condition *condition,
+ const struct session_info *session_info,
+ const struct session_state_sample *new_state,
+ struct lttng_evaluation **evaluation)
{
int ret;
- struct cds_lfht_iter iter;
- struct cds_lfht_node *node;
- const char *session_name;
- struct session_info *session_info = NULL;
+ bool previous_result, newest_result;
- rcu_read_lock();
- session_name = get_condition_session_name(condition);
+ switch (lttng_condition_get_type(condition)) {
+ case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
+ if (new_state) {
+ previous_result = evaluate_session_rotation_ongoing_condition(
+ condition, &session_info->last_state_sample);
+ newest_result = evaluate_session_rotation_ongoing_condition(
+ condition, new_state);
+ } else {
+ previous_result = false;
+ newest_result = evaluate_session_rotation_ongoing_condition(
+ condition, &session_info->last_state_sample);
+ }
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ if (new_state) {
+ previous_result = evaluate_session_consumed_size_condition(
+ condition, &session_info->last_state_sample);
+ newest_result = evaluate_session_consumed_size_condition(
+ condition, new_state);
+ } else {
+ previous_result = false;
+ newest_result = evaluate_session_consumed_size_condition(
+ condition, &session_info->last_state_sample);
+ }
+ break;
+ case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+ /*
+ * Note that LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED is
+ * evaluated differently to only consider state transitions without regard for the
+ * initial state. This is a deliberate choice as it is unlikely that a user would
+ * expect an action to occur for a rotation that occurred long before the trigger or
+ * subscription occurred.
+ */
+ if (!new_state) {
+ ret = 0;
+ goto end;
+ }
- /* Find the session associated with the trigger. */
- cds_lfht_lookup(state->sessions_ht,
- hash_key_str(session_name, lttng_ht_seed),
- match_session,
- session_name,
- &iter);
- node = cds_lfht_iter_get_node(&iter);
- if (!node) {
- DBG("No known session matching name \"%s\"",
- session_name);
+ previous_result = !session_info->last_state_sample.rotation.ongoing;
+ newest_result = !new_state->rotation.ongoing;
+ break;
+ default:
ret = 0;
goto end;
}
- session_info = caa_container_of(node, struct session_info,
- sessions_ht_node);
- session_info_get(session_info);
+ if (!newest_result || (previous_result == newest_result)) {
+ /* Not a state transition, evaluate to false. */
+ ret = 0;
+ goto end;
+ }
- /*
- * Evaluation is performed in-line here since only one type of
- * session-bound condition is handled for the moment.
- */
switch (lttng_condition_get_type(condition)) {
case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
- if (!session_info->rotation.ongoing) {
- ret = 0;
- goto end_session_put;
- }
+ {
+ const auto rotation_id = new_state ?
+ new_state->rotation.id :
+ session_info->last_state_sample.rotation.id;
- *evaluation = lttng_evaluation_session_rotation_ongoing_create(
- session_info->rotation.id);
- if (!*evaluation) {
- /* Fatal error. */
- ERR("Failed to create session rotation ongoing evaluation for session \"%s\"",
- session_info->name);
- ret = -1;
- goto end_session_put;
- }
- ret = 0;
+ *evaluation = lttng_evaluation_session_rotation_ongoing_create(rotation_id);
+ break;
+ }
+ case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+ {
+ const auto& sample = new_state ? *new_state : session_info->last_state_sample;
+ const auto rotation_id = sample.rotation.id;
+
+ /* Callee acquires a reference to location. */
+ *evaluation = lttng_evaluation_session_rotation_completed_create(
+ rotation_id, sample.rotation.location);
break;
+ }
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
+ {
+ const auto latest_session_consumed_total = new_state ?
+ new_state->consumed_data_size :
+ session_info->last_state_sample.consumed_data_size;
+
+ *evaluation = lttng_evaluation_session_consumed_size_create(
+ latest_session_consumed_total);
+ break;
+ }
default:
- ret = 0;
- goto end_session_put;
+ abort();
}
- *session_uid = session_info->uid;
- *session_gid = session_info->gid;
+ if (!*evaluation) {
+ /* Fatal error. */
+ ERR("Failed to create session condition evaluation: session name = `%s`",
+ session_info->name);
+ ret = -1;
+ goto end;
+ }
-end_session_put:
- session_info_put(session_info);
+ ret = 0;
end:
- rcu_read_unlock();
return ret;
}
struct lttng_evaluation *evaluation = NULL;
struct notification_client_list client_list = {
.lock = PTHREAD_MUTEX_INITIALIZER,
+ .ref = {},
+ .condition = NULL,
+ .triggers_list = {},
+ .clients_list = {},
+ .notification_trigger_clients_ht = NULL,
+ .notification_trigger_clients_ht_node = {},
+ .rcu_node = {},
};
- struct notification_client_list_element client_list_element = { 0 };
+ struct notification_client_list_element client_list_element = {};
uid_t object_uid = 0;
gid_t object_gid = 0;
switch (get_condition_binding_object(condition)) {
case LTTNG_OBJECT_TYPE_SESSION:
- ret = evaluate_session_condition_for_client(condition, state,
- &evaluation, &object_uid, &object_gid);
+ {
+ /* Find the session associated with the condition. */
+ const auto *session_name = get_condition_session_name(condition);
+ auto session_info = get_session_info_by_name(state, session_name);
+ if (!session_info) {
+ /* Not an error, the session doesn't exist yet. */
+ DBG("Session not found while evaluating session condition for client: session name = `%s`",
+ session_name);
+ ret = 0;
+ goto end;
+ }
+
+ object_uid = session_info->uid;
+ object_gid = session_info->gid;
+
+ ret = evaluate_session_condition(condition, session_info, NULL, &evaluation);
+ session_info_put(session_info);
break;
+ }
case LTTNG_OBJECT_TYPE_CHANNEL:
ret = evaluate_channel_condition_for_client(condition, state,
&evaluation, &object_uid, &object_gid);
}
}
- condition_list_element = (lttng_condition_list_element *) zmalloc(sizeof(*condition_list_element));
+ condition_list_element = zmalloc<lttng_condition_list_element>();
if (!condition_list_element) {
ret = -1;
goto error;
}
- client_list_element = (notification_client_list_element *) zmalloc(sizeof(*client_list_element));
+ client_list_element = zmalloc<notification_client_list_element>();
if (!client_list_element) {
ret = -1;
goto error;
static
void free_notification_client_rcu(struct rcu_head *node)
{
- free(caa_container_of(node, struct notification_client, rcu_node));
+ free(lttng::utils::container_of(node, ¬ification_client::rcu_node));
}
static
-void notification_client_destroy(struct notification_client *client,
- struct notification_thread_state *state)
+void notification_client_destroy(struct notification_client *client)
{
if (!client) {
return;
struct cds_lfht_node *node;
struct notification_client *client = NULL;
+ ASSERT_RCU_READ_LOCKED();
+
cds_lfht_lookup(state->client_socket_ht,
hash_client_socket(socket),
match_client_socket,
struct cds_lfht_node *node;
struct notification_client *client = NULL;
+ ASSERT_RCU_READ_LOCKED();
+
cds_lfht_lookup(state->client_id_ht,
hash_client_id(id),
match_client_id,
return false;
}
-static
-bool session_consumed_size_condition_applies_to_channel(
- const struct lttng_condition *condition,
- const struct channel_info *channel_info)
-{
- enum lttng_condition_status status;
- const char *condition_session_name = NULL;
-
- status = lttng_condition_session_consumed_size_get_session_name(
- condition, &condition_session_name);
- LTTNG_ASSERT((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
-
- if (strcmp(channel_info->session_info->name, condition_session_name)) {
- goto fail;
- }
-
- return true;
-fail:
- return false;
-}
-
static
bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
const struct channel_info *channel_info)
trigger_applies = buffer_usage_condition_applies_to_channel(
condition, channel_info);
break;
- case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
- trigger_applies = session_consumed_size_condition_applies_to_channel(
- condition, channel_info);
- break;
default:
goto fail;
}
struct cds_lfht_node *node;
struct cds_lfht_iter iter;
+ ASSERT_RCU_READ_LOCKED();
+
cds_lfht_lookup(state->session_triggers_ht,
hash_key_str(session_name, lttng_ht_seed),
match_session_trigger_list,
/*
* Allocate an empty lttng_session_trigger_list for the session named
* 'session_name'.
- *
- * No ownership of 'session_name' is assumed by the session trigger list.
- * It is the caller's responsability to ensure the session name is alive
- * for as long as this list is.
*/
static
struct lttng_session_trigger_list *lttng_session_trigger_list_create(
const char *session_name,
struct cds_lfht *session_triggers_ht)
{
- struct lttng_session_trigger_list *list;
+ struct lttng_session_trigger_list *list = NULL;
+ char *session_name_copy = strdup(session_name);
- list = (lttng_session_trigger_list *) zmalloc(sizeof(*list));
+ if (!session_name_copy) {
+ PERROR("Failed to allocate session name while building trigger list");
+ goto end;
+ }
+
+ list = zmalloc<lttng_session_trigger_list>();
if (!list) {
+ PERROR("Failed to allocate session trigger list while building trigger list");
goto end;
}
- list->session_name = session_name;
+
+ list->session_name = session_name_copy;
CDS_INIT_LIST_HEAD(&list->list);
cds_lfht_node_init(&list->session_triggers_ht_node);
list->session_triggers_ht = session_triggers_ht;
static
void free_session_trigger_list_rcu(struct rcu_head *node)
{
- free(caa_container_of(node, struct lttng_session_trigger_list,
- rcu_node));
+ struct lttng_session_trigger_list *list =
+ caa_container_of(node, struct lttng_session_trigger_list, rcu_node);
+
+ free(list->session_name);
+ free(list);
}
static
{
int ret = 0;
struct lttng_trigger_list_element *new_element =
- (lttng_trigger_list_element *) zmalloc(sizeof(*new_element));
+ zmalloc<lttng_trigger_list_element>();
if (!new_element) {
ret = -1;
switch (lttng_condition_get_type(condition)) {
case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
+ case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
{
- enum lttng_condition_status condition_status;
const char *condition_session_name;
- condition_status = lttng_condition_session_rotation_get_session_name(
- condition, &condition_session_name);
- if (condition_status != LTTNG_CONDITION_STATUS_OK) {
- ERR("Failed to retrieve session rotation condition's session name");
- goto end;
- }
-
+ condition_session_name = get_condition_session_name(condition);
LTTNG_ASSERT(condition_session_name);
applies = !strcmp(condition_session_name, session_name);
break;
/*
* Allocate and initialize an lttng_session_trigger_list which contains
* all triggers that apply to the session named 'session_name'.
- *
- * No ownership of 'session_name' is assumed by the session trigger list.
- * It is the caller's responsability to ensure the session name is alive
- * for as long as this list is.
*/
static
struct lttng_session_trigger_list *lttng_session_trigger_list_build(
}
static
-struct session_info *find_or_create_session_info(
- struct notification_thread_state *state,
- const char *name, uid_t uid, gid_t gid)
+struct session_info *create_and_publish_session_info(struct notification_thread_state *state,
+ uint64_t id,
+ const char *name,
+ uid_t uid,
+ gid_t gid)
{
struct session_info *session = NULL;
- struct cds_lfht_node *node;
- struct cds_lfht_iter iter;
struct lttng_session_trigger_list *trigger_list;
rcu_read_lock();
- cds_lfht_lookup(state->sessions_ht,
- hash_key_str(name, lttng_ht_seed),
- match_session,
- name,
- &iter);
- node = cds_lfht_iter_get_node(&iter);
- if (node) {
- DBG("Found session info of session \"%s\" (uid = %i, gid = %i)",
- name, uid, gid);
- session = caa_container_of(node, struct session_info,
- sessions_ht_node);
- LTTNG_ASSERT(session->uid == uid);
- LTTNG_ASSERT(session->gid == gid);
- session_info_get(session);
- goto end;
- }
-
trigger_list = lttng_session_trigger_list_build(state, name);
if (!trigger_list) {
goto error;
}
- session = session_info_create(name, uid, gid, trigger_list,
+ session = session_info_create(id, name, uid, gid, trigger_list,
state->sessions_ht);
if (!session) {
ERR("Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
lttng_session_trigger_list_destroy(trigger_list);
goto error;
}
+
+ /* Transferred ownership to the new session. */
trigger_list = NULL;
- cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
- &session->sessions_ht_node);
-end:
+ if (cds_lfht_add_unique(state->sessions_ht, hash_session_info(session), match_session_info,
+ &id, &session->sessions_ht_node) != &session->sessions_ht_node) {
+ ERR("Duplicate session found: name = `%s`, id = %" PRIu64, name, id);
+ goto error;
+ }
+
rcu_read_unlock();
return session;
error:
}
static
-int handle_notification_thread_command_add_channel(
- struct notification_thread_state *state,
- const char *session_name, uid_t session_uid, gid_t session_gid,
- const char *channel_name, enum lttng_domain_type channel_domain,
- uint64_t channel_key_int, uint64_t channel_capacity,
+int handle_notification_thread_command_add_channel(struct notification_thread_state *state,
+ uint64_t session_id,
+ const char *channel_name,
+ enum lttng_domain_type channel_domain,
+ uint64_t channel_key_int,
+ uint64_t channel_capacity,
enum lttng_error_code *cmd_result)
{
struct cds_list_head trigger_list;
struct cds_lfht_iter iter;
struct session_info *session_info = NULL;
- DBG("Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
- channel_name, session_name, channel_key_int,
+ DBG("Adding channel: channel name = `%s`, session id = %" PRIu64 ", channel key = %" PRIu64 ", domain = %s",
+ channel_name, session_id, channel_key_int,
lttng_domain_type_str(channel_domain));
CDS_INIT_LIST_HEAD(&trigger_list);
- session_info = find_or_create_session_info(state, session_name,
- session_uid, session_gid);
+ session_info = get_session_info_by_id(state, session_id);
if (!session_info) {
- /* Allocation error or an internal error occurred. */
+ /* Fatal logic error. */
+ ERR("Failed to find session while adding channel: session id = %" PRIu64,
+ session_id);
goto error;
}
continue;
}
- new_element = (lttng_trigger_list_element *) zmalloc(sizeof(*new_element));
+ new_element = zmalloc<lttng_trigger_list_element>();
if (!new_element) {
rcu_read_unlock();
goto error;
DBG("Found %i triggers that apply to newly added channel",
trigger_count);
- channel_trigger_list = (lttng_channel_trigger_list *) zmalloc(sizeof(*channel_trigger_list));
+ channel_trigger_list = zmalloc<lttng_channel_trigger_list>();
if (!channel_trigger_list) {
goto error;
}
return 1;
}
+static
+int handle_notification_thread_command_add_session(struct notification_thread_state *state,
+ uint64_t session_id,
+ const char *session_name,
+ uid_t session_uid,
+ gid_t session_gid,
+ enum lttng_error_code *cmd_result)
+{
+ int ret;
+
+ DBG("Adding session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d",
+ session_name, session_id, session_uid, session_gid);
+
+ auto session = create_and_publish_session_info(state, session_id, session_name, session_uid, session_gid);
+ if (!session) {
+ PERROR("Failed to add session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d",
+ session_name, session_id, session_uid, session_gid);
+ ret = -1;
+ *cmd_result = LTTNG_ERR_NOMEM;
+ goto end;
+ }
+
+ /*
+ * Note that the reference to `session` is not released; this reference is
+ * the "global" reference that is used to allow look-ups. This reference will
+ * only be released when the session is removed. See
+ * handle_notification_thread_command_remove_session.
+ */
+ ret = 0;
+ *cmd_result = LTTNG_OK;
+end:
+ return ret;
+}
+
+static
+int handle_notification_thread_command_remove_session(
+ struct notification_thread_state *state,
+ uint64_t session_id,
+ enum lttng_error_code *cmd_result)
+{
+ int ret;
+
+ DBG("Removing session: session id = %" PRIu64, session_id);
+
+ auto session = get_session_info_by_id(state, session_id);
+ if (!session) {
+ ERR("Failed to remove session: session id = %" PRIu64, session_id);
+ ret = -1;
+ *cmd_result = LTTNG_ERR_NO_SESSION;
+ goto end;
+ }
+
+ /* Release the reference returned by the look-up, and then release the global reference. */
+ session_info_put(session);
+ session_info_put(session);
+ ret = 0;
+ *cmd_result = LTTNG_OK;
+end:
+ return ret;
+}
+
static
void free_channel_trigger_list_rcu(struct rcu_head *node)
{
int handle_notification_thread_command_session_rotation(
struct notification_thread_state *state,
enum notification_thread_command_type cmd_type,
- const char *session_name, uid_t session_uid, gid_t session_gid,
+ uint64_t session_id,
uint64_t trace_archive_chunk_id,
struct lttng_trace_archive_location *location,
enum lttng_error_code *_cmd_result)
struct lttng_session_trigger_list *trigger_list;
struct lttng_trigger_list_element *trigger_list_element;
struct session_info *session_info;
- const struct lttng_credentials session_creds = {
- .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid),
- .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid),
- };
+ struct lttng_credentials session_creds;
+ struct session_state_sample new_session_state;
rcu_read_lock();
- session_info = find_or_create_session_info(state, session_name,
- session_uid, session_gid);
+ session_info = get_session_info_by_id(state, session_id);
if (!session_info) {
- /* Allocation error or an internal error occurred. */
+ /* Fatal logic error. */
+ ERR("Failed to find session while handling rotation state change: session id = %" PRIu64,
+ session_id);
ret = -1;
- cmd_result = LTTNG_ERR_NOMEM;
+ cmd_result = LTTNG_ERR_FATAL;
goto end;
}
- session_info->rotation.ongoing =
- cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
- session_info->rotation.id = trace_archive_chunk_id;
- trigger_list = get_session_trigger_list(state, session_name);
- if (!trigger_list) {
- DBG("No triggers applying to session \"%s\" found",
- session_name);
- goto end;
+ new_session_state = session_info->last_state_sample;
+ if (location) {
+ lttng_trace_archive_location_get(location);
+ new_session_state.rotation.location = location;
+ } else {
+ new_session_state.rotation.location = NULL;
}
+ session_creds = {
+ .uid = LTTNG_OPTIONAL_INIT_VALUE(session_info->uid),
+ .gid = LTTNG_OPTIONAL_INIT_VALUE(session_info->gid),
+ };
+
+ new_session_state.rotation.ongoing = cmd_type ==
+ NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
+ new_session_state.rotation.id = trace_archive_chunk_id;
+
+ trigger_list = get_session_trigger_list(state, session_info->name);
+ LTTNG_ASSERT(trigger_list);
+
cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
node) {
const struct lttng_condition *condition;
struct lttng_trigger *trigger;
struct notification_client_list *client_list;
struct lttng_evaluation *evaluation = NULL;
- enum lttng_condition_type condition_type;
enum action_executor_status executor_status;
trigger = trigger_list_element->trigger;
condition = lttng_trigger_get_const_condition(trigger);
LTTNG_ASSERT(condition);
- condition_type = lttng_condition_get_type(condition);
- if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
- cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
- continue;
- } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
- cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
- continue;
- }
-
- client_list = get_client_list_from_condition(state, condition);
- if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
- evaluation = lttng_evaluation_session_rotation_ongoing_create(
- trace_archive_chunk_id);
- } else {
- evaluation = lttng_evaluation_session_rotation_completed_create(
- trace_archive_chunk_id, location);
+ ret = evaluate_session_condition(
+ condition, session_info, &new_session_state, &evaluation);
+ if (ret) {
+ ret = -1;
+ cmd_result = LTTNG_ERR_NOMEM;
+ goto end;
}
if (!evaluation) {
- /* Internal error */
- ret = -1;
- cmd_result = LTTNG_ERR_UNK;
- goto put_list;
+ continue;
}
/*
* Ownership of `evaluation` transferred to the action executor
- * no matter the result.
+ * no matter the result. The callee acquires a reference to the
+ * client list: we can release our own.
*/
+ client_list = get_client_list_from_condition(state, condition);
executor_status = action_executor_enqueue_trigger(
state->executor, trigger, evaluation,
&session_creds, client_list);
+ notification_client_list_put(client_list);
evaluation = NULL;
switch (executor_status) {
case ACTION_EXECUTOR_STATUS_OK:
*/
ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
ret = -1;
- goto put_list;
+ goto end;
case ACTION_EXECUTOR_STATUS_OVERFLOW:
/*
* TODO Add trigger identification (name/id) when
*/
WARN("No space left when enqueuing action associated with session rotation trigger");
ret = 0;
- goto put_list;
+ goto end;
default:
abort();
}
-
-put_list:
- notification_client_list_put(client_list);
- if (caa_unlikely(ret)) {
- break;
- }
}
+
end:
+ if (session_info) {
+ /* Ownership of new_session_state::location is transferred. */
+ lttng_trace_archive_location_put(session_info->last_state_sample.rotation.location);
+ session_info->last_state_sample = new_session_state;
+ }
+
session_info_put(session_info);
*_cmd_result = cmd_result;
rcu_read_unlock();
enum lttng_error_code cmd_result = LTTNG_OK;
struct notification_event_tracer_event_source_element *element = NULL;
- element = (notification_event_tracer_event_source_element *) zmalloc(sizeof(*element));
+ element = zmalloc<notification_event_tracer_event_source_element>();
if (!element) {
cmd_result = LTTNG_ERR_NOMEM;
ret = -1;
lttng_domain_type_str(domain_type));
/* Adding the read side pipe to the event poll. */
- ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR);
+ ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLPRI | LPOLLIN);
if (ret < 0) {
ERR("Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
tracer_event_source_fd,
struct notification_thread_state *state,
int pipe, enum lttng_domain_type domain)
{
- struct lttng_poll_event events = {0};
+ struct lttng_poll_event events = {};
int ret;
ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
}
static int handle_notification_thread_command_list_triggers(
- struct notification_thread_handle *handle,
+ struct notification_thread_handle *handle __attribute__((unused)),
struct notification_thread_state *state,
uid_t client_uid,
struct lttng_triggers **triggers,
const char *session_name;
struct lttng_session_trigger_list *trigger_list;
- condition = lttng_trigger_get_const_condition(trigger);
- switch (lttng_condition_get_type(condition)) {
- case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
- case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
- {
- enum lttng_condition_status status;
-
- status = lttng_condition_session_rotation_get_session_name(
- condition, &session_name);
- if (status != LTTNG_CONDITION_STATUS_OK) {
- ERR("Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
- ret = -1;
- goto end;
- }
- break;
- }
- default:
- ret = -1;
- goto end;
- }
-
+ ASSERT_RCU_READ_LOCKED();
+
+ condition = lttng_trigger_get_const_condition(trigger);
+ session_name = get_condition_session_name(condition);
+
trigger_list = get_session_trigger_list(state, session_name);
if (!trigger_list) {
DBG("Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
struct cds_lfht_iter iter;
struct channel_info *channel;
+ ASSERT_RCU_READ_LOCKED();
+
cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
channels_ht_node) {
struct lttng_trigger_list_element *trigger_list_element;
struct lttng_channel_trigger_list,
channel_triggers_ht_node);
- trigger_list_element = (lttng_trigger_list_element *) zmalloc(sizeof(*trigger_list_element));
+ trigger_list_element = zmalloc<lttng_trigger_list_element>();
if (!trigger_list_element) {
ret = -1;
goto end;
struct lttng_condition *condition = lttng_trigger_get_condition(trigger);
struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
- trigger_tokens_ht_element = (notification_trigger_tokens_ht_element *) zmalloc(sizeof(*trigger_tokens_ht_element));
+ trigger_tokens_ht_element = zmalloc<notification_trigger_tokens_ht_element>();
if (!trigger_tokens_ht_element) {
ret = LTTNG_ERR_NOMEM;
goto end;
goto error;
}
- trigger_ht_element = (lttng_trigger_ht_element *) zmalloc(sizeof(*trigger_ht_element));
+ trigger_ht_element = zmalloc<lttng_trigger_ht_element>();
if (!trigger_ht_element) {
ret = -1;
goto error;
client_list = notification_client_list_create(state, condition);
if (!client_list) {
ERR("Error creating notification client list for trigger %s", trigger->name);
+ ret = -1;
goto error_free_ht_element;
}
}
*/
switch (get_condition_binding_object(condition)) {
case LTTNG_OBJECT_TYPE_SESSION:
- ret = evaluate_session_condition_for_client(condition, state,
- &evaluation, &object_uid,
- &object_gid);
- LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
- LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
+ {
+ /* Find the session associated with the condition. */
+ const auto *session_name = get_condition_session_name(condition);
+ auto session_info = get_session_info_by_name(state, session_name);
+ if (!session_info) {
+ /* Not an error, the session doesn't exist yet. */
+ DBG("Session not found while evaluating session condition during registration of trigger: session name = `%s`",
+ session_name);
+ ret = 0;
+ goto success;
+ }
+
+ LTTNG_OPTIONAL_SET(&object_creds.uid, session_info->uid);
+ LTTNG_OPTIONAL_SET(&object_creds.gid, session_info->gid);
+
+ ret = evaluate_session_condition(condition, session_info, NULL, &evaluation);
+ session_info_put(session_info);
break;
+ }
case LTTNG_OBJECT_TYPE_CHANNEL:
ret = evaluate_channel_condition_for_client(condition, state,
&evaluation, &object_uid,
}
}
+static
+void remove_trigger_from_session_trigger_list(
+ struct lttng_session_trigger_list *trigger_list,
+ const struct lttng_trigger *trigger)
+{
+ bool found = false;
+ struct lttng_trigger_list_element *trigger_element, *tmp;
+
+ cds_list_for_each_entry_safe (trigger_element, tmp, &trigger_list->list, node) {
+ if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
+ continue;
+ }
+
+ DBG("Removed trigger from session_triggers_ht");
+ cds_list_del(&trigger_element->node);
+ free(trigger_element);
+ /* A trigger can only appear once per session. */
+ found = true;
+ break;
+ }
+
+ if (!found) {
+ ERR("Failed to find trigger associated with session: session name = `%s`",
+ trigger_list->session_name);
+ }
+
+ LTTNG_ASSERT(found);
+}
+
static
int handle_notification_thread_command_unregister_trigger(
struct notification_thread_state *state,
{
struct cds_lfht_iter iter;
struct cds_lfht_node *triggers_ht_node;
- struct lttng_channel_trigger_list *trigger_list;
struct notification_client_list *client_list;
struct lttng_trigger_ht_element *trigger_ht_element = NULL;
const struct lttng_condition *condition = lttng_trigger_get_const_condition(
trigger_ht_element = caa_container_of(triggers_ht_node,
struct lttng_trigger_ht_element, node);
- /* Remove trigger from channel_triggers_ht. */
- cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
- channel_triggers_ht_node) {
- struct lttng_trigger_list_element *trigger_element, *tmp;
+ switch (get_condition_binding_object(condition)) {
+ case LTTNG_OBJECT_TYPE_CHANNEL:
+ {
+ struct lttng_channel_trigger_list *trigger_list;
- cds_list_for_each_entry_safe(trigger_element, tmp,
- &trigger_list->list, node) {
- if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
- continue;
+ /*
+ * Remove trigger from channel_triggers_ht.
+ *
+ * Note that multiple channels may have matched the trigger's
+ * condition (e.g. all instances of a given channel in per-pid buffering
+ * mode).
+ *
+ * Iterate on all lists since we don't know the target channels' keys.
+ */
+ cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
+ channel_triggers_ht_node) {
+ struct lttng_trigger_list_element *trigger_element, *tmp;
+
+ cds_list_for_each_entry_safe(
+ trigger_element, tmp, &trigger_list->list, node) {
+ if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
+ continue;
+ }
+
+ DBG("Removed trigger from channel_triggers_ht");
+ cds_list_del(&trigger_element->node);
+ free(trigger_element);
+ /* A trigger can only appear once per channel */
+ break;
}
+ }
+ break;
+ }
+ case LTTNG_OBJECT_TYPE_SESSION:
+ {
+ auto session = get_session_info_by_name(
+ state, get_condition_session_name(condition));
- DBG("Removed trigger from channel_triggers_ht");
- cds_list_del(&trigger_element->node);
- /* A trigger can only appear once per channel */
+ /* Session doesn't exist, no trigger to remove. */
+ if (!session) {
break;
}
+
+ auto session_trigger_list = get_session_trigger_list(state, session->name);
+ remove_trigger_from_session_trigger_list(session_trigger_list, trigger);
+ session_info_put(session);
+ }
+ case LTTNG_OBJECT_TYPE_NONE:
+ break;
+ default:
+ abort();
}
if (lttng_trigger_needs_tracer_notifier(trigger)) {
return 0;
}
+static
+int pop_cmd_queue(struct notification_thread_handle *handle,
+ struct notification_thread_command **cmd)
+{
+ int ret;
+ uint64_t counter;
+
+ pthread_mutex_lock(&handle->cmd_queue.lock);
+ ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+ if (ret != sizeof(counter)) {
+ ret = -1;
+ goto error_unlock;
+ }
+
+ *cmd = cds_list_first_entry(&handle->cmd_queue.list,
+ struct notification_thread_command, cmd_list_node);
+ cds_list_del(&((*cmd)->cmd_list_node));
+ ret = 0;
+
+error_unlock:
+ pthread_mutex_unlock(&handle->cmd_queue.lock);
+ return ret;
+}
+
/* Returns 0 on success, 1 on exit requested, negative value on error. */
int handle_notification_thread_command(
struct notification_thread_handle *handle,
struct notification_thread_state *state)
{
int ret;
- uint64_t counter;
struct notification_thread_command *cmd;
- /* Read the event pipe to put it back into a quiescent state. */
- ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
- sizeof(counter));
- if (ret != sizeof(counter)) {
+ ret = pop_cmd_queue(handle, &cmd);
+ if (ret) {
goto error;
}
- pthread_mutex_lock(&handle->cmd_queue.lock);
- cmd = cds_list_first_entry(&handle->cmd_queue.list,
- struct notification_thread_command, cmd_list_node);
- cds_list_del(&cmd->cmd_list_node);
- pthread_mutex_unlock(&handle->cmd_queue.lock);
-
DBG("Received `%s` command",
notification_command_type_str(cmd->type));
switch (cmd->type) {
case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
ret = handle_notification_thread_command_add_channel(
state,
- cmd->parameters.add_channel.session.name,
- cmd->parameters.add_channel.session.uid,
- cmd->parameters.add_channel.session.gid,
+ cmd->parameters.add_channel.session.id,
cmd->parameters.add_channel.channel.name,
cmd->parameters.add_channel.channel.domain,
cmd->parameters.add_channel.channel.key,
cmd->parameters.remove_channel.domain,
&cmd->reply_code);
break;
+ case NOTIFICATION_COMMAND_TYPE_ADD_SESSION:
+ ret = handle_notification_thread_command_add_session(state,
+ cmd->parameters.add_session.session_id,
+ cmd->parameters.add_session.session_name,
+ cmd->parameters.add_session.session_uid,
+ cmd->parameters.add_session.session_gid, &cmd->reply_code);
+ break;
+ case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION:
+ ret = handle_notification_thread_command_remove_session(
+ state, cmd->parameters.remove_session.session_id, &cmd->reply_code);
+ break;
case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
ret = handle_notification_thread_command_session_rotation(
state,
cmd->type,
- cmd->parameters.session_rotation.session_name,
- cmd->parameters.session_rotation.uid,
- cmd->parameters.session_rotation.gid,
+ cmd->parameters.session_rotation.session_id,
cmd->parameters.session_rotation.trace_archive_chunk_id,
cmd->parameters.session_rotation.location,
&cmd->reply_code);
DBG("Handling new notification channel client connection");
- client = (notification_client *) zmalloc(sizeof(*client));
+ client = zmalloc<notification_client>();
if (!client) {
/* Fatal error. */
ret = -1;
goto error;
}
+ client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN;
ret = lttng_poll_add(&state->events, client->socket,
- LPOLLIN | LPOLLERR |
- LPOLLHUP | LPOLLRDHUP);
+ client->communication.current_poll_events);
if (ret < 0) {
ERR("Failed to add notification channel client socket to poll set");
ret = 0;
return ret;
error:
- notification_client_destroy(client, state);
+ notification_client_destroy(client);
return ret;
}
int ret;
struct lttng_condition_list_element *condition_list_element, *tmp;
+ ASSERT_RCU_READ_LOCKED();
+
/* Acquire the client lock to disable its communication atomically. */
pthread_mutex_lock(&client->lock);
client->communication.active = false;
* Client no longer accessible to other threads (through the
* client lists).
*/
- notification_client_destroy(client, state);
+ notification_client_destroy(client);
return ret;
}
return error_occurred ? -1 : 0;
}
+static
+bool client_has_outbound_data_left(
+ const struct notification_client *client)
+{
+ const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+ &client->communication.outbound.payload, 0, -1);
+ const bool has_data = pv.buffer.size != 0;
+ const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
+
+ return has_data || has_fds;
+}
+
static
int client_handle_transmission_status(
struct notification_client *client,
switch (transmission_status) {
case CLIENT_TRANSMISSION_STATUS_COMPLETE:
- ret = lttng_poll_mod(&state->events, client->socket,
- CLIENT_POLL_MASK_IN);
- if (ret) {
- goto end;
- }
-
- break;
case CLIENT_TRANSMISSION_STATUS_QUEUED:
+ {
+ int current_poll_events;
+ int new_poll_events;
/*
* We want to be notified whenever there is buffer space
- * available to send the rest of the payload.
+ * available to send the rest of the payload if we are
+ * waiting to send data to the client.
+ *
+ * The state of the outbound queue being sampled here is
+ * fine since:
+ * - it is okay to wake-up "for nothing" in case we see
+ * that data is left, but another thread succeeds in
+ * flushing it before us when handling the client "out"
+ * event. We will simply stop monitoring that event the next
+ * time it wakes us up and we see no data left to be sent,
+ * - if another thread fails to flush the entire client
+ * outgoing queue, it will issue a "communication update"
+ * command and cause the client's (e)poll mask to be
+ * re-evaluated.
+ *
+ * The situation we seek to avoid would be to disable the
+ * monitoring of "out" client events indefinitely when there is
+ * data to be sent, which can't happen because of the
+ * aforementioned "communication update" mechanism.
*/
- ret = lttng_poll_mod(&state->events, client->socket,
- CLIENT_POLL_MASK_IN_OUT);
- if (ret) {
- goto end;
+ pthread_mutex_lock(&client->lock);
+ current_poll_events = client->communication.current_poll_events;
+ new_poll_events = client_has_outbound_data_left(client) ?
+ CLIENT_POLL_EVENTS_IN_OUT :
+ CLIENT_POLL_EVENTS_IN;
+ client->communication.current_poll_events = new_poll_events;
+ pthread_mutex_unlock(&client->lock);
+
+ /* Update the monitored event set only if it changed. */
+ if (current_poll_events != new_poll_events) {
+ ret = lttng_poll_mod(&state->events, client->socket,
+ new_poll_events);
+ if (ret) {
+ goto end;
+ }
}
+
break;
+ }
case CLIENT_TRANSMISSION_STATUS_FAIL:
ret = notification_thread_client_disconnect(client, state);
if (ret) {
return CLIENT_TRANSMISSION_STATUS_ERROR;
}
-static
-bool client_has_outbound_data_left(
- const struct notification_client *client)
-{
- const struct lttng_payload_view pv = lttng_payload_view_from_payload(
- &client->communication.outbound.payload, 0, -1);
- const bool has_data = pv.buffer.size != 0;
- const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
-
- return has_data || has_fds;
-}
-
/* Client lock must _not_ be held by the caller. */
static
int client_send_command_reply(struct notification_client *client,
struct lttng_notification_channel_command_reply reply = {
.status = (int8_t) status,
};
- struct lttng_notification_channel_message msg = {
- .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
- .size = sizeof(reply),
- };
+ struct lttng_notification_channel_message msg;
char buffer[sizeof(msg) + sizeof(reply)];
enum client_transmission_status transmission_status;
+ msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY;
+ msg.size = sizeof(reply);
+ msg.fds = 0;
+
memcpy(buffer, &msg, sizeof(msg));
memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
DBG("Send command reply (%i)", (int) status);
static
int client_handle_message_unknown(struct notification_client *client,
- struct notification_thread_state *state)
+ struct notification_thread_state *state __attribute__((unused)))
{
int ret;
/*
.major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
.minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
};
- const struct lttng_notification_channel_message msg_header = {
- .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
- .size = sizeof(handshake_reply),
- };
+ struct lttng_notification_channel_message msg_header;
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
+ msg_header.type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE;
+ msg_header.size = sizeof(handshake_reply);
+ msg_header.fds = 0;
+
memcpy(send_buffer, &msg_header, sizeof(msg_header));
memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
sizeof(handshake_reply));
}
pthread_mutex_lock(&client->lock);
- transmission_status = client_flush_outgoing_queue(client);
+ if (!client_has_outbound_data_left(client)) {
+ /*
+ * A client "out" event can be received when no payload is left
+ * to send under some circumstances.
+ *
+ * Many threads can flush a client's outgoing queue and, if they
+ * had to queue their message (socket was full), will use the
+ * "communication update" command to signal the (e)poll thread
+ * to monitor for space being made available in the socket.
+ *
+ * Commands are sent over an internal pipe serviced by the same
+ * thread as the client sockets.
+ *
+ * When space is made available in the socket, there is a race
+ * between the (e)poll thread and the other threads that may
+ * wish to use the client's socket to flush its outgoing queue.
+ *
+ * A non-(e)poll thread may attempt (and succeed) in flushing
+ * the queue before the (e)poll thread gets a chance to service
+ * the client's "out" event.
+ *
+ * In this situation, the (e)poll thread processing the client
+ * out event will see an empty payload: there is nothing to do
+ * except unsubscribing (e)poll "out" events.
+ *
+ * Note that this thread is the (e)poll thread so it can modify
+ * the (e)poll mask directly without using a communication
+ * update command. Other threads that flush the outgoing queue
+ * will use the "communication update" command to wake up this
+ * thread and force it to monitor "out" events.
+ *
+ * When other threads succeed in emptying the outgoing queue,
+ * they don't need to update the (e)poll mask: if the "out"
+ * event is monitored, it will fire once and the (e)poll
+ * thread will reach this condition, causing the event to
+ * stop being monitored.
+ */
+ transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+ } else {
+ transmission_status = client_flush_outgoing_queue(client);
+ }
pthread_mutex_unlock(&client->lock);
ret = client_handle_transmission_status(
bool result = false;
uint64_t threshold;
enum lttng_condition_type condition_type;
- const struct lttng_condition_buffer_usage *use_condition = container_of(
- condition, struct lttng_condition_buffer_usage,
- parent);
+ const struct lttng_condition_buffer_usage *use_condition = lttng::utils::container_of(
+ condition, <tng_condition_buffer_usage::parent);
if (use_condition->threshold_bytes.set) {
threshold = use_condition->threshold_bytes.value;
return result;
}
-static
-bool evaluate_session_consumed_size_condition(
- const struct lttng_condition *condition,
- uint64_t session_consumed_size)
-{
- uint64_t threshold;
- const struct lttng_condition_session_consumed_size *size_condition =
- container_of(condition,
- struct lttng_condition_session_consumed_size,
- parent);
-
- threshold = size_condition->consumed_threshold_bytes.value;
- DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
- threshold, session_consumed_size);
- return session_consumed_size >= threshold;
-}
-
static
int evaluate_buffer_condition(const struct lttng_condition *condition,
struct lttng_evaluation **evaluation,
- const struct notification_thread_state *state,
+ const struct notification_thread_state *state __attribute__((unused)),
const struct channel_state_sample *previous_sample,
const struct channel_state_sample *latest_sample,
- uint64_t previous_session_consumed_total,
- uint64_t latest_session_consumed_total,
struct channel_info *channel_info)
{
int ret = 0;
condition, latest_sample,
channel_info->capacity);
break;
- case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
- if (caa_likely(previous_sample_available)) {
- previous_sample_result =
- evaluate_session_consumed_size_condition(
- condition,
- previous_session_consumed_total);
- }
- latest_sample_result =
- evaluate_session_consumed_size_condition(
- condition,
- latest_session_consumed_total);
- break;
default:
/* Unknown condition type; internal error. */
abort();
latest_sample->highest_usage,
channel_info->capacity);
break;
- case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
- *evaluation = lttng_evaluation_session_consumed_size_create(
- latest_session_consumed_total);
- break;
default:
abort();
}
int client_notification_overflow(struct notification_client *client)
{
int ret = 0;
- const struct lttng_notification_channel_message msg = {
- .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
- };
+ struct lttng_notification_channel_message msg;
+
+ msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED;
+ msg.size = 0;
+ msg.fds = 0;
ASSERT_LOCKED(client->lock);
.trigger = (struct lttng_trigger *) trigger,
.evaluation = (struct lttng_evaluation *) evaluation,
};
- struct lttng_notification_channel_message msg_header = {
- .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
- };
+ struct lttng_notification_channel_message msg_header;
const struct lttng_credentials *trigger_creds =
lttng_trigger_get_credentials(trigger);
lttng_payload_init(&msg_payload);
+ msg_header.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
+ msg_header.size = 0;
+ msg_header.fds = 0;
+
ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
sizeof(msg_header));
if (ret) {
goto end;
}
- capture_buffer = (char *) zmalloc(capture_buffer_size);
+ capture_buffer = calloc<char>(capture_buffer_size);
if (!capture_buffer) {
ERR("Failed to allocate capture buffer");
goto end;
}
evaluation = lttng_evaluation_event_rule_matches_create(
- container_of(lttng_trigger_get_const_condition(
+ lttng::utils::container_of(lttng_trigger_get_const_condition(
element->trigger),
- struct lttng_condition_event_rule_matches,
- parent),
+ <tng_condition_event_rule_matches::parent),
notification->capture_buffer,
notification->capture_buf_size, false);
{
int ret = 0;
struct lttcomm_consumer_channel_monitor_msg sample_msg;
- struct channel_info *channel_info;
+ struct channel_info *channel_info = NULL;
struct cds_lfht_node *node;
struct cds_lfht_iter iter;
- struct lttng_channel_trigger_list *trigger_list;
+ struct lttng_channel_trigger_list *channel_trigger_list;
+ struct lttng_session_trigger_list *session_trigger_list;
struct lttng_trigger_list_element *trigger_list_element;
bool previous_sample_available = false;
- struct channel_state_sample previous_sample, latest_sample;
- uint64_t previous_session_consumed_total, latest_session_consumed_total;
- struct lttng_credentials channel_creds;
+ struct channel_state_sample channel_previous_sample, channel_new_sample;
+ struct session_state_sample session_new_sample;
+ struct lttng_credentials channel_creds = {};
+ struct lttng_credentials session_creds = {};
+ struct session_info *session;
/*
* The monitoring pipe only holds messages smaller than PIPE_BUF,
}
ret = 0;
- latest_sample.key.key = sample_msg.key;
- latest_sample.key.domain = domain;
- latest_sample.highest_usage = sample_msg.highest;
- latest_sample.lowest_usage = sample_msg.lowest;
- latest_sample.channel_total_consumed = sample_msg.total_consumed;
+ channel_new_sample.key.key = sample_msg.key;
+ channel_new_sample.key.domain = domain;
+ channel_new_sample.highest_usage = sample_msg.highest;
+ channel_new_sample.lowest_usage = sample_msg.lowest;
rcu_read_lock();
+ session = get_session_info_by_id(state, sample_msg.session_id);
+ if (!session) {
+ DBG("Received a sample for an unknown session from consumerd: session id = %" PRIu64,
+ sample_msg.session_id);
+ goto end_unlock;
+ }
+
+ session_new_sample = session->last_state_sample;
+ session_new_sample.consumed_data_size += sample_msg.consumed_since_last_sample;
+ session_creds = {
+ .uid = LTTNG_OPTIONAL_INIT_VALUE(session->uid),
+ .gid = LTTNG_OPTIONAL_INIT_VALUE(session->gid),
+ };
+
+ session_trigger_list = get_session_trigger_list(state, session->name);
+ LTTNG_ASSERT(session_trigger_list);
+ cds_list_for_each_entry(trigger_list_element, &session_trigger_list->list,
+ node) {
+ const struct lttng_condition *condition;
+ struct lttng_trigger *trigger;
+ struct notification_client_list *client_list = NULL;
+ struct lttng_evaluation *evaluation = NULL;
+ enum action_executor_status executor_status;
+
+ ret = 0;
+ trigger = trigger_list_element->trigger;
+ condition = lttng_trigger_get_const_condition(trigger);
+ LTTNG_ASSERT(condition);
+
+ ret = evaluate_session_condition(
+ condition, session, &session_new_sample, &evaluation);
+ if (caa_unlikely(ret)) {
+ break;
+ }
+
+ if (caa_likely(!evaluation)) {
+ continue;
+ }
+
+ /*
+ * Ownership of `evaluation` transferred to the action executor
+ * no matter the result. The callee acquires a reference to the
+ * client list: we can release our own.
+ */
+ client_list = get_client_list_from_condition(state, condition);
+ executor_status = action_executor_enqueue_trigger(
+ state->executor, trigger, evaluation,
+ &session_creds, client_list);
+ notification_client_list_put(client_list);
+ evaluation = NULL;
+ switch (executor_status) {
+ case ACTION_EXECUTOR_STATUS_OK:
+ break;
+ case ACTION_EXECUTOR_STATUS_ERROR:
+ case ACTION_EXECUTOR_STATUS_INVALID:
+ /*
+ * TODO Add trigger identification (name/id) when
+ * it is added to the API.
+ */
+ ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
+ ret = -1;
+ goto end_unlock;
+ case ACTION_EXECUTOR_STATUS_OVERFLOW:
+ /*
+ * TODO Add trigger identification (name/id) when
+ * it is added to the API.
+ *
+ * Not a fatal error.
+ */
+ WARN("No space left when enqueuing action associated with buffer-condition trigger");
+ ret = 0;
+ goto end_unlock;
+ default:
+ abort();
+ }
+ }
+
/* Retrieve the channel's informations */
cds_lfht_lookup(state->channels_ht,
- hash_channel_key(&latest_sample.key),
+ hash_channel_key(&channel_new_sample.key),
match_channel_info,
- &latest_sample.key,
+ &channel_new_sample.key,
&iter);
node = cds_lfht_iter_get_node(&iter);
if (caa_unlikely(!node)) {
* sample.
*/
DBG("Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
- latest_sample.key.key,
+ channel_new_sample.key.key,
lttng_domain_type_str(domain));
goto end_unlock;
}
+
channel_info = caa_container_of(node, struct channel_info,
channels_ht_node);
- DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
+ DBG("Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", consumed since last sample = %" PRIu64")",
channel_info->name,
- latest_sample.key.key,
+ channel_new_sample.key.key,
channel_info->session_info->name,
- latest_sample.highest_usage,
- latest_sample.lowest_usage,
- latest_sample.channel_total_consumed);
-
- previous_session_consumed_total =
- channel_info->session_info->consumed_data_size;
+ channel_new_sample.highest_usage,
+ channel_new_sample.lowest_usage,
+ sample_msg.consumed_since_last_sample);
/* Retrieve the channel's last sample, if it exists, and update it. */
cds_lfht_lookup(state->channel_state_ht,
- hash_channel_key(&latest_sample.key),
+ hash_channel_key(&channel_new_sample.key),
match_channel_state_sample,
- &latest_sample.key,
+ &channel_new_sample.key,
&iter);
node = cds_lfht_iter_get_node(&iter);
if (caa_likely(node)) {
struct channel_state_sample,
channel_state_ht_node);
- memcpy(&previous_sample, stored_sample,
- sizeof(previous_sample));
- stored_sample->highest_usage = latest_sample.highest_usage;
- stored_sample->lowest_usage = latest_sample.lowest_usage;
- stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
+ memcpy(&channel_previous_sample, stored_sample,
+ sizeof(channel_previous_sample));
+ stored_sample->highest_usage = channel_new_sample.highest_usage;
+ stored_sample->lowest_usage = channel_new_sample.lowest_usage;
previous_sample_available = true;
-
- latest_session_consumed_total =
- previous_session_consumed_total +
- (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
} else {
/*
* This is the channel's first sample, allocate space for and
*/
struct channel_state_sample *stored_sample;
- stored_sample = (channel_state_sample *) zmalloc(sizeof(*stored_sample));
+ stored_sample = zmalloc<channel_state_sample>();
if (!stored_sample) {
ret = -1;
goto end_unlock;
}
- memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
+ memcpy(stored_sample, &channel_new_sample, sizeof(*stored_sample));
cds_lfht_node_init(&stored_sample->channel_state_ht_node);
cds_lfht_add(state->channel_state_ht,
hash_channel_key(&stored_sample->key),
&stored_sample->channel_state_ht_node);
-
- latest_session_consumed_total =
- previous_session_consumed_total +
- latest_sample.channel_total_consumed;
}
- channel_info->session_info->consumed_data_size =
- latest_session_consumed_total;
-
/* Find triggers associated with this channel. */
cds_lfht_lookup(state->channel_triggers_ht,
- hash_channel_key(&latest_sample.key),
+ hash_channel_key(&channel_new_sample.key),
match_channel_trigger_list,
- &latest_sample.key,
+ &channel_new_sample.key,
&iter);
node = cds_lfht_iter_get_node(&iter);
- if (caa_likely(!node)) {
- goto end_unlock;
- }
+ LTTNG_ASSERT(node);
channel_creds = (typeof(channel_creds)) {
.uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
.gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
};
- trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
+ channel_trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
channel_triggers_ht_node);
- cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
+ cds_list_for_each_entry(trigger_list_element, &channel_trigger_list->list,
node) {
const struct lttng_condition *condition;
struct lttng_trigger *trigger;
condition = lttng_trigger_get_const_condition(trigger);
LTTNG_ASSERT(condition);
- /*
- * Check if any client is subscribed to the result of this
- * evaluation.
- */
- client_list = get_client_list_from_condition(state, condition);
-
ret = evaluate_buffer_condition(condition, &evaluation, state,
- previous_sample_available ? &previous_sample : NULL,
- &latest_sample,
- previous_session_consumed_total,
- latest_session_consumed_total,
+ previous_sample_available ? &channel_previous_sample : NULL,
+ &channel_new_sample,
channel_info);
if (caa_unlikely(ret)) {
- goto put_list;
+ break;
}
if (caa_likely(!evaluation)) {
- goto put_list;
+ continue;
}
/*
* Ownership of `evaluation` transferred to the action executor
- * no matter the result.
+ * no matter the result. The callee acquires a reference to the
+ * client list: we can release our own.
*/
+ client_list = get_client_list_from_condition(state, condition);
executor_status = action_executor_enqueue_trigger(
state->executor, trigger, evaluation,
&channel_creds, client_list);
+ notification_client_list_put(client_list);
evaluation = NULL;
switch (executor_status) {
case ACTION_EXECUTOR_STATUS_OK:
*/
ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
ret = -1;
- goto put_list;
+ goto end_unlock;
case ACTION_EXECUTOR_STATUS_OVERFLOW:
/*
* TODO Add trigger identification (name/id) when
*/
WARN("No space left when enqueuing action associated with buffer-condition trigger");
ret = 0;
- goto put_list;
+ goto end_unlock;
default:
abort();
}
-
-put_list:
- notification_client_list_put(client_list);
- if (caa_unlikely(ret)) {
- break;
- }
}
end_unlock:
+ if (session) {
+ session->last_state_sample = session_new_sample;
+ }
+ session_info_put(session);
rcu_read_unlock();
end:
return ret;