Fix: sessiond: size-based rotation threshold exceeded in per-pid tracing (1/2)
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.cpp
index 6f449fc638cef0df78f88c8abcc5d575f6376ad6..586a9900b641419b39a0729dfa16e00459a7fed0 100644 (file)
@@ -6,49 +6,49 @@
  */
 
 #include "lttng/action/action.h"
-#include "lttng/trigger/trigger-internal.h"
+#include "lttng/trigger/trigger-internal.hpp"
 #define _LGPL_SOURCE
 #include <urcu.h>
 #include <urcu/rculfhash.h>
 
-#include <common/defaults.h>
-#include <common/error.h>
-#include <common/futex.h>
-#include <common/unix.h>
-#include <common/dynamic-buffer.h>
-#include <common/hashtable/utils.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/macros.h>
+#include <common/defaults.hpp>
+#include <common/error.hpp>
+#include <common/futex.hpp>
+#include <common/unix.hpp>
+#include <common/dynamic-buffer.hpp>
+#include <common/hashtable/utils.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/macros.hpp>
 #include <lttng/condition/condition.h>
-#include <lttng/action/action-internal.h>
-#include <lttng/action/list-internal.h>
-#include <lttng/domain-internal.h>
-#include <lttng/notification/notification-internal.h>
-#include <lttng/condition/condition-internal.h>
-#include <lttng/condition/buffer-usage-internal.h>
-#include <lttng/condition/session-consumed-size-internal.h>
-#include <lttng/condition/session-rotation-internal.h>
-#include <lttng/condition/event-rule-matches-internal.h>
-#include <lttng/domain-internal.h>
-#include <lttng/notification/channel-internal.h>
-#include <lttng/trigger/trigger-internal.h>
-#include <lttng/event-rule/event-rule-internal.h>
+#include <lttng/action/action-internal.hpp>
+#include <lttng/action/list-internal.hpp>
+#include <lttng/domain-internal.hpp>
+#include <lttng/notification/notification-internal.hpp>
+#include <lttng/condition/condition-internal.hpp>
+#include <lttng/condition/buffer-usage-internal.hpp>
+#include <lttng/condition/session-consumed-size-internal.hpp>
+#include <lttng/condition/session-rotation-internal.hpp>
+#include <lttng/condition/event-rule-matches-internal.hpp>
+#include <lttng/domain-internal.hpp>
+#include <lttng/notification/channel-internal.hpp>
+#include <lttng/trigger/trigger-internal.hpp>
+#include <lttng/event-rule/event-rule-internal.hpp>
 
 #include <time.h>
 #include <unistd.h>
 #include <inttypes.h>
 #include <fcntl.h>
 
-#include "condition-internal.h"
-#include "event-notifier-error-accounting.h"
-#include "notification-thread.h"
-#include "notification-thread-events.h"
-#include "notification-thread-commands.h"
-#include "lttng-sessiond.h"
-#include "kernel.h"
+#include "condition-internal.hpp"
+#include "event-notifier-error-accounting.hpp"
+#include "notification-thread.hpp"
+#include "notification-thread-events.hpp"
+#include "notification-thread-commands.hpp"
+#include "lttng-sessiond.hpp"
+#include "kernel.hpp"
 
-#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
-#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
+#define CLIENT_POLL_EVENTS_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
+#define CLIENT_POLL_EVENTS_IN_OUT (CLIENT_POLL_EVENTS_IN | LPOLLOUT)
 
 /* The tracers currently limit the capture size to PIPE_BUF (4kb on linux). */
 #define MAX_CAPTURE_SIZE (PIPE_BUF)
@@ -60,12 +60,6 @@ enum lttng_object_type {
        LTTNG_OBJECT_TYPE_SESSION,
 };
 
-struct lttng_trigger_list_element {
-       /* No ownership of the trigger object is assumed. */
-       struct lttng_trigger *trigger;
-       struct cds_list_head node;
-};
-
 struct lttng_channel_trigger_list {
        struct channel_key channel_key;
        /* List of struct lttng_trigger_list_element. */
@@ -117,6 +111,13 @@ struct lttng_session_trigger_list {
        struct rcu_head rcu_node;
 };
 
+namespace {
+struct lttng_trigger_list_element {
+       /* No ownership of the trigger object is assumed. */
+       struct lttng_trigger *trigger;
+       struct cds_list_head node;
+};
+
 struct lttng_trigger_ht_element {
        struct lttng_trigger *trigger;
        struct cds_lfht_node node;
@@ -140,6 +141,7 @@ struct channel_state_sample {
        /* call_rcu delayed reclaim. */
        struct rcu_head rcu_node;
 };
+} /* namespace */
 
 static unsigned long hash_channel_key(struct channel_key *key);
 static int evaluate_buffer_condition(const struct lttng_condition *condition,
@@ -166,13 +168,14 @@ void session_info_get(struct session_info *session_info);
 static
 void session_info_put(struct session_info *session_info);
 static
-struct session_info *session_info_create(const char *name,
-               uid_t uid, gid_t gid,
+struct session_info *session_info_create(uint64_t id,
+               const char *name,
+               uid_t uid,
+               gid_t gid,
                struct lttng_session_trigger_list *trigger_list,
                struct cds_lfht *sessions_ht);
-static
-void session_info_add_channel(struct session_info *session_info,
-               struct channel_info *channel_info);
+static void session_info_add_channel(
+               struct session_info *session_info, struct channel_info *channel_info);
 static
 void session_info_remove_channel(struct session_info *session_info,
                struct channel_info *channel_info);
@@ -223,8 +226,8 @@ int match_client_id(struct cds_lfht_node *node, const void *key)
 {
        /* This double-cast is intended to supress pointer-to-cast warning. */
        const notification_client_id id = *((notification_client_id *) key);
-       const struct notification_client *client = caa_container_of(
-                       node, struct notification_client, client_id_ht_node);
+       const struct notification_client *client = lttng::utils::container_of(
+                       node, &notification_client::client_id_ht_node);
 
        return client->id == id;
 }
@@ -320,13 +323,60 @@ int match_client_list_condition(struct cds_lfht_node *node, const void *key)
 }
 
 static
-int match_session(struct cds_lfht_node *node, const void *key)
+int match_session_info(struct cds_lfht_node *node, const void *key)
+{
+       const auto session_id = *((uint64_t *) key);
+       const auto *session_info = lttng::utils::container_of(
+               node, &session_info::sessions_ht_node);
+
+       return session_id == session_info->id;
+}
+
+static
+unsigned long hash_session_info_id(uint64_t id)
+{
+       return hash_key_u64(&id, lttng_ht_seed);
+}
+
+static
+unsigned long hash_session_info(const struct session_info *session_info)
+{
+       return hash_session_info_id(session_info->id);
+}
+
+static
+struct session_info *get_session_info_by_id(
+               const struct notification_thread_state *state, uint64_t id)
+{
+       struct cds_lfht_iter iter;
+       struct cds_lfht_node *node;
+       lttng::urcu::read_lock_guard read_lock_guard;
+
+       cds_lfht_lookup(state->sessions_ht,
+                       hash_session_info_id(id),
+                       match_session_info,
+                       &id,
+                       &iter);
+       node = cds_lfht_iter_get_node(&iter);
+
+       if (node) {
+               auto session_info = lttng::utils::container_of(node, &session_info::sessions_ht_node);
+
+               session_info_get(session_info);
+               return session_info;
+       }
+
+       return NULL;
+}
+
+static
+struct session_info *get_session_info_by_name(
+               const struct notification_thread_state *state, const char *name)
 {
-       const char *name = (const char *) key;
-       struct session_info *session_info = caa_container_of(
-               node, struct session_info, sessions_ht_node);
+       uint64_t session_id;
+       const auto found = sample_session_id_by_name(name, &session_id);
 
-       return !strcmp(session_info->name, name);
+       return found ? get_session_info_by_id(state, session_id) : NULL;
 }
 
 static
@@ -342,6 +392,10 @@ const char *notification_command_type_str(
                return "ADD_CHANNEL";
        case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
                return "REMOVE_CHANNEL";
+       case NOTIFICATION_COMMAND_TYPE_ADD_SESSION:
+               return "ADD_SESSION";
+       case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION:
+               return "REMOVE_SESSION";
        case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
                return "SESSION_ROTATION_ONGOING";
        case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
@@ -491,7 +545,7 @@ enum lttng_object_type get_condition_binding_object(
 static
 void free_channel_info_rcu(struct rcu_head *node)
 {
-       free(caa_container_of(node, struct channel_info, rcu_node));
+       free(lttng::utils::container_of(node, &channel_info::rcu_node));
 }
 
 static
@@ -515,7 +569,7 @@ void channel_info_destroy(struct channel_info *channel_info)
 static
 void free_session_info_rcu(struct rcu_head *node)
 {
-       free(caa_container_of(node, struct session_info, rcu_node));
+       free(lttng::utils::container_of(node, &session_info::rcu_node));
 }
 
 /* Don't call directly, use the ref-counting mechanism. */
@@ -561,7 +615,10 @@ void session_info_put(struct session_info *session_info)
 }
 
 static
-struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
+struct session_info *session_info_create(uint64_t id,
+               const char *name,
+               uid_t uid,
+               gid_t gid,
                struct lttng_session_trigger_list *trigger_list,
                struct cds_lfht *sessions_ht)
 {
@@ -569,10 +626,11 @@ struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
 
        LTTNG_ASSERT(name);
 
-       session_info = (struct session_info *) zmalloc(sizeof(*session_info));
+       session_info = zmalloc<struct session_info>();
        if (!session_info) {
                goto end;
        }
+
        lttng_ref_init(&session_info->ref, session_info_destroy);
 
        session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
@@ -582,10 +640,12 @@ struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
        }
 
        cds_lfht_node_init(&session_info->sessions_ht_node);
+       session_info->id = id;
        session_info->name = strdup(name);
        if (!session_info->name) {
                goto error;
        }
+
        session_info->uid = uid;
        session_info->gid = gid;
        session_info->trigger_list = trigger_list;
@@ -623,7 +683,7 @@ struct channel_info *channel_info_create(const char *channel_name,
                struct channel_key *channel_key, uint64_t channel_capacity,
                struct session_info *session_info)
 {
-       struct channel_info *channel_info = (struct channel_info *) zmalloc(sizeof(*channel_info));
+       struct channel_info *channel_info = zmalloc<struct channel_info>();
 
        if (!channel_info) {
                goto end;
@@ -670,7 +730,7 @@ static
 void notification_client_list_release(struct urcu_ref *list_ref)
 {
        struct notification_client_list *list =
-                       container_of(list_ref, typeof(*list), ref);
+                       lttng::utils::container_of(list_ref, &notification_client_list::ref);
        struct notification_client_list_element *client_list_element, *tmp;
 
        lttng_condition_put(list->condition);
@@ -723,7 +783,7 @@ struct notification_client_list *notification_client_list_create(
        struct cds_lfht_iter iter;
        struct notification_client_list *client_list;
 
-       client_list = (notification_client_list *) zmalloc(sizeof(*client_list));
+       client_list = zmalloc<notification_client_list>();
        if (!client_list) {
                PERROR("Failed to allocate notification client list");
                goto end;
@@ -755,7 +815,7 @@ struct notification_client_list *notification_client_list_create(
                        continue;
                }
 
-               client_list_element = (notification_client_list_element *) zmalloc(sizeof(*client_list_element));
+               client_list_element = zmalloc<notification_client_list_element>();
                if (!client_list_element) {
                        goto error_put_client_list;
                }
@@ -814,8 +874,8 @@ struct notification_client_list *get_client_list_from_condition(
                        &iter);
        node = cds_lfht_iter_get_node(&iter);
        if (node) {
-               list = container_of(node, struct notification_client_list,
-                               notification_trigger_clients_ht_node);
+               list = lttng::utils::container_of(node,
+                               &notification_client_list::notification_trigger_clients_ht_node);
                list = notification_client_list_get(list) ? list : NULL;
        }
 
@@ -958,32 +1018,21 @@ int evaluate_session_condition_for_client(
                uid_t *session_uid, gid_t *session_gid)
 {
        int ret;
-       struct cds_lfht_iter iter;
-       struct cds_lfht_node *node;
        const char *session_name;
        struct session_info *session_info = NULL;
 
        rcu_read_lock();
        session_name = get_condition_session_name(condition);
 
-       /* Find the session associated with the trigger. */
-       cds_lfht_lookup(state->sessions_ht,
-                       hash_key_str(session_name, lttng_ht_seed),
-                       match_session,
-                       session_name,
-                       &iter);
-       node = cds_lfht_iter_get_node(&iter);
-       if (!node) {
-               DBG("No known session matching name \"%s\"",
+       /* Find the session associated with the condition. */
+       session_info = get_session_info_by_name(state, session_name);
+       if (!session_info) {
+               DBG("Unknown session while evaluating session condition for client: name = `%s`",
                                session_name);
                ret = 0;
                goto end;
        }
 
-       session_info = caa_container_of(node, struct session_info,
-                       sessions_ht_node);
-       session_info_get(session_info);
-
        /*
         * Evaluation is performed in-line here since only one type of
         * session-bound condition is handled for the moment.
@@ -1031,8 +1080,15 @@ int evaluate_condition_for_client(const struct lttng_trigger *trigger,
        struct lttng_evaluation *evaluation = NULL;
        struct notification_client_list client_list = {
                .lock = PTHREAD_MUTEX_INITIALIZER,
+               .ref = {},
+               .condition = NULL,
+               .triggers_list = {},
+               .clients_list = {},
+               .notification_trigger_clients_ht = NULL,
+               .notification_trigger_clients_ht_node = {},
+               .rcu_node = {},
        };
-       struct notification_client_list_element client_list_element = { 0 };
+       struct notification_client_list_element client_list_element = {};
        uid_t object_uid = 0;
        gid_t object_gid = 0;
 
@@ -1116,12 +1172,12 @@ int notification_thread_client_subscribe(struct notification_client *client,
                }
        }
 
-       condition_list_element = (lttng_condition_list_element *) zmalloc(sizeof(*condition_list_element));
+       condition_list_element = zmalloc<lttng_condition_list_element>();
        if (!condition_list_element) {
                ret = -1;
                goto error;
        }
-       client_list_element = (notification_client_list_element *) zmalloc(sizeof(*client_list_element));
+       client_list_element = zmalloc<notification_client_list_element>();
        if (!client_list_element) {
                ret = -1;
                goto error;
@@ -1275,12 +1331,11 @@ end:
 static
 void free_notification_client_rcu(struct rcu_head *node)
 {
-       free(caa_container_of(node, struct notification_client, rcu_node));
+       free(lttng::utils::container_of(node, &notification_client::rcu_node));
 }
 
 static
-void notification_client_destroy(struct notification_client *client,
-               struct notification_thread_state *state)
+void notification_client_destroy(struct notification_client *client)
 {
        if (!client) {
                return;
@@ -1313,6 +1368,8 @@ struct notification_client *get_client_from_socket(int socket,
        struct cds_lfht_node *node;
        struct notification_client *client = NULL;
 
+       ASSERT_RCU_READ_LOCKED();
+
        cds_lfht_lookup(state->client_socket_ht,
                        hash_client_socket(socket),
                        match_client_socket,
@@ -1341,6 +1398,8 @@ struct notification_client *get_client_from_id(notification_client_id id,
        struct cds_lfht_node *node;
        struct notification_client *client = NULL;
 
+       ASSERT_RCU_READ_LOCKED();
+
        cds_lfht_lookup(state->client_id_ht,
                        hash_client_id(id),
                        match_client_id,
@@ -1456,6 +1515,8 @@ struct lttng_session_trigger_list *get_session_trigger_list(
        struct cds_lfht_node *node;
        struct cds_lfht_iter iter;
 
+       ASSERT_RCU_READ_LOCKED();
+
        cds_lfht_lookup(state->session_triggers_ht,
                        hash_key_str(session_name, lttng_ht_seed),
                        match_session_trigger_list,
@@ -1494,7 +1555,7 @@ struct lttng_session_trigger_list *lttng_session_trigger_list_create(
 {
        struct lttng_session_trigger_list *list;
 
-       list = (lttng_session_trigger_list *) zmalloc(sizeof(*list));
+       list = zmalloc<lttng_session_trigger_list>();
        if (!list) {
                goto end;
        }
@@ -1545,7 +1606,7 @@ int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
 {
        int ret = 0;
        struct lttng_trigger_list_element *new_element =
-                       (lttng_trigger_list_element *) zmalloc(sizeof(*new_element));
+                       zmalloc<lttng_trigger_list_element>();
 
        if (!new_element) {
                ret = -1;
@@ -1640,39 +1701,22 @@ error:
 }
 
 static
-struct session_info *find_or_create_session_info(
-               struct notification_thread_state *state,
-               const char *name, uid_t uid, gid_t gid)
+struct session_info *create_and_publish_session_info(struct notification_thread_state *state,
+               uint64_t id,
+               const char *name,
+               uid_t uid,
+               gid_t gid)
 {
        struct session_info *session = NULL;
-       struct cds_lfht_node *node;
-       struct cds_lfht_iter iter;
        struct lttng_session_trigger_list *trigger_list;
 
        rcu_read_lock();
-       cds_lfht_lookup(state->sessions_ht,
-                       hash_key_str(name, lttng_ht_seed),
-                       match_session,
-                       name,
-                       &iter);
-       node = cds_lfht_iter_get_node(&iter);
-       if (node) {
-               DBG("Found session info of session \"%s\" (uid = %i, gid = %i)",
-                               name, uid, gid);
-               session = caa_container_of(node, struct session_info,
-                               sessions_ht_node);
-               LTTNG_ASSERT(session->uid == uid);
-               LTTNG_ASSERT(session->gid == gid);
-               session_info_get(session);
-               goto end;
-       }
-
        trigger_list = lttng_session_trigger_list_build(state, name);
        if (!trigger_list) {
                goto error;
        }
 
-       session = session_info_create(name, uid, gid, trigger_list,
+       session = session_info_create(id, name, uid, gid, trigger_list,
                        state->sessions_ht);
        if (!session) {
                ERR("Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
@@ -1680,11 +1724,16 @@ struct session_info *find_or_create_session_info(
                lttng_session_trigger_list_destroy(trigger_list);
                goto error;
        }
+
+       /* Transferred ownership to the new session. */
        trigger_list = NULL;
 
-       cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
-                       &session->sessions_ht_node);
-end:
+       if (cds_lfht_add_unique(state->sessions_ht, hash_session_info(session), match_session_info,
+                           &id, &session->sessions_ht_node) != &session->sessions_ht_node) {
+               ERR("Duplicate session found: name = `%s`, id = %" PRIu64, name, id);
+               goto error;
+       }
+
        rcu_read_unlock();
        return session;
 error:
@@ -1694,11 +1743,12 @@ error:
 }
 
 static
-int handle_notification_thread_command_add_channel(
-               struct notification_thread_state *state,
-               const char *session_name, uid_t session_uid, gid_t session_gid,
-               const char *channel_name, enum lttng_domain_type channel_domain,
-               uint64_t channel_key_int, uint64_t channel_capacity,
+int handle_notification_thread_command_add_channel(struct notification_thread_state *state,
+               uint64_t session_id,
+               const char *channel_name,
+               enum lttng_domain_type channel_domain,
+               uint64_t channel_key_int,
+               uint64_t channel_capacity,
                enum lttng_error_code *cmd_result)
 {
        struct cds_list_head trigger_list;
@@ -1713,16 +1763,17 @@ int handle_notification_thread_command_add_channel(
        struct cds_lfht_iter iter;
        struct session_info *session_info = NULL;
 
-       DBG("Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
-                       channel_name, session_name, channel_key_int,
+       DBG("Adding channel: channel name = `%s`, session id = %" PRIu64 ", channel key = %" PRIu64 ", domain = %s",
+                       channel_name, session_id, channel_key_int,
                        lttng_domain_type_str(channel_domain));
 
        CDS_INIT_LIST_HEAD(&trigger_list);
 
-       session_info = find_or_create_session_info(state, session_name,
-                       session_uid, session_gid);
+       session_info = get_session_info_by_id(state, session_id);
        if (!session_info) {
-               /* Allocation error or an internal error occurred. */
+               /* Fatal logic error. */
+               ERR("Failed to find session while adding channel: session id = %" PRIu64,
+                               session_id);
                goto error;
        }
 
@@ -1743,7 +1794,7 @@ int handle_notification_thread_command_add_channel(
                        continue;
                }
 
-               new_element = (lttng_trigger_list_element *) zmalloc(sizeof(*new_element));
+               new_element = zmalloc<lttng_trigger_list_element>();
                if (!new_element) {
                        rcu_read_unlock();
                        goto error;
@@ -1757,7 +1808,7 @@ int handle_notification_thread_command_add_channel(
 
        DBG("Found %i triggers that apply to newly added channel",
                        trigger_count);
-       channel_trigger_list = (lttng_channel_trigger_list *) zmalloc(sizeof(*channel_trigger_list));
+       channel_trigger_list = zmalloc<lttng_channel_trigger_list>();
        if (!channel_trigger_list) {
                goto error;
        }
@@ -1788,6 +1839,67 @@ error:
        return 1;
 }
 
+static
+int handle_notification_thread_command_add_session(struct notification_thread_state *state,
+               uint64_t session_id,
+               const char *session_name,
+               uid_t session_uid,
+               gid_t session_gid,
+               enum lttng_error_code *cmd_result)
+{
+       int ret;
+
+       DBG("Adding session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d",
+                       session_name, session_id, session_uid, session_gid);
+
+       auto session = create_and_publish_session_info(state, session_id, session_name, session_uid, session_gid);
+       if (!session) {
+               PERROR("Failed to add session: session name = `%s`, session id = %" PRIu64 ", session uid = %d, session gid = %d",
+                               session_name, session_id, session_uid, session_gid);
+               ret = -1;
+               *cmd_result = LTTNG_ERR_NOMEM;
+               goto end;
+       }
+
+       /*
+        * Note that the reference to `session` is not released; this reference is
+        * the "global" reference that is used to allow look-ups. This reference will
+        * only be released when the session is removed. See
+        * handle_notification_thread_command_remove_session.
+        */
+       ret = 0;
+       *cmd_result = LTTNG_OK;
+end:
+       return ret;
+}
+
+static
+int handle_notification_thread_command_remove_session(
+               struct notification_thread_state *state,
+               uint64_t session_id,
+               enum lttng_error_code *cmd_result)
+{
+       int ret;
+
+       DBG("Removing session: session id = %" PRIu64, session_id);
+
+       auto session = get_session_info_by_id(state, session_id);
+       if (!session) {
+               ERR("Failed to remove session: session id = %" PRIu64, session_id);
+               ret = -1;
+               *cmd_result = LTTNG_ERR_NO_SESSION;
+               goto end;
+       }
+
+       /* Release the reference returned by the look-up, and then release the global reference. */
+       session_info_put(session);
+       session_info_put(session);
+       ret = 0;
+       *cmd_result = LTTNG_OK;
+end:
+       return ret;
+}
+
 static
 void free_channel_trigger_list_rcu(struct rcu_head *node)
 {
@@ -1888,7 +2000,7 @@ static
 int handle_notification_thread_command_session_rotation(
        struct notification_thread_state *state,
        enum notification_thread_command_type cmd_type,
-       const char *session_name, uid_t session_uid, gid_t session_gid,
+       uint64_t session_id,
        uint64_t trace_archive_chunk_id,
        struct lttng_trace_archive_location *location,
        enum lttng_error_code *_cmd_result)
@@ -1898,29 +2010,32 @@ int handle_notification_thread_command_session_rotation(
        struct lttng_session_trigger_list *trigger_list;
        struct lttng_trigger_list_element *trigger_list_element;
        struct session_info *session_info;
-       const struct lttng_credentials session_creds = {
-               .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid),
-               .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid),
-       };
+       struct lttng_credentials session_creds;
 
        rcu_read_lock();
 
-       session_info = find_or_create_session_info(state, session_name,
-                       session_uid, session_gid);
+       session_info = get_session_info_by_id(state, session_id);
        if (!session_info) {
-               /* Allocation error or an internal error occurred. */
+               /* Fatal logic error. */
+               ERR("Failed to find session while handling rotation state change: session id = %" PRIu64,
+                               session_id);
                ret = -1;
-               cmd_result = LTTNG_ERR_NOMEM;
+               cmd_result = LTTNG_ERR_FATAL;
                goto end;
        }
 
+       session_creds = {
+               .uid = LTTNG_OPTIONAL_INIT_VALUE(session_info->uid),
+               .gid = LTTNG_OPTIONAL_INIT_VALUE(session_info->gid),
+       };
+
        session_info->rotation.ongoing =
                        cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
        session_info->rotation.id = trace_archive_chunk_id;
-       trigger_list = get_session_trigger_list(state, session_name);
+       trigger_list = get_session_trigger_list(state, session_info->name);
        if (!trigger_list) {
-               DBG("No triggers applying to session \"%s\" found",
-                               session_name);
+               DBG("No triggers apply to session: session name = `%s` ",
+                               session_info->name);
                goto end;
        }
 
@@ -2020,7 +2135,7 @@ int handle_notification_thread_command_add_tracer_event_source(
        enum lttng_error_code cmd_result = LTTNG_OK;
        struct notification_event_tracer_event_source_element *element = NULL;
 
-       element = (notification_event_tracer_event_source_element *) zmalloc(sizeof(*element));
+       element = zmalloc<notification_event_tracer_event_source_element>();
        if (!element) {
                cmd_result = LTTNG_ERR_NOMEM;
                ret = -1;
@@ -2037,7 +2152,7 @@ int handle_notification_thread_command_add_tracer_event_source(
                        lttng_domain_type_str(domain_type));
 
        /* Adding the read side pipe to the event poll. */
-       ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLIN | LPOLLERR);
+       ret = lttng_poll_add(&state->events, tracer_event_source_fd, LPOLLPRI | LPOLLIN | LPOLLERR);
        if (ret < 0) {
                ERR("Failed to add tracer event source to poll set: tracer_event_source_fd = %d, domain = '%s'",
                                tracer_event_source_fd,
@@ -2059,7 +2174,7 @@ int drain_event_notifier_notification_pipe(
                struct notification_thread_state *state,
                int pipe, enum lttng_domain_type domain)
 {
-       struct lttng_poll_event events = {0};
+       struct lttng_poll_event events = {};
        int ret;
 
        ret = lttng_poll_create(&events, 1, LTTNG_CLOEXEC);
@@ -2227,7 +2342,7 @@ end:
 }
 
 static int handle_notification_thread_command_list_triggers(
-               struct notification_thread_handle *handle,
+               struct notification_thread_handle *handle __attribute__((unused)),
                struct notification_thread_state *state,
                uid_t client_uid,
                struct lttng_triggers **triggers,
@@ -2418,6 +2533,8 @@ int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
        const char *session_name;
        struct lttng_session_trigger_list *trigger_list;
 
+       ASSERT_RCU_READ_LOCKED();
+
        condition = lttng_trigger_get_const_condition(trigger);
        switch (lttng_condition_get_type(condition)) {
        case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
@@ -2464,6 +2581,8 @@ int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
        struct cds_lfht_iter iter;
        struct channel_info *channel;
 
+       ASSERT_RCU_READ_LOCKED();
+
        cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
                        channels_ht_node) {
                struct lttng_trigger_list_element *trigger_list_element;
@@ -2485,7 +2604,7 @@ int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
                                struct lttng_channel_trigger_list,
                                channel_triggers_ht_node);
 
-               trigger_list_element = (lttng_trigger_list_element *) zmalloc(sizeof(*trigger_list_element));
+               trigger_list_element = zmalloc<lttng_trigger_list_element>();
                if (!trigger_list_element) {
                        ret = -1;
                        goto end;
@@ -2606,7 +2725,7 @@ enum lttng_error_code setup_tracer_notifier(
        struct lttng_condition *condition = lttng_trigger_get_condition(trigger);
        struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
 
-       trigger_tokens_ht_element = (notification_trigger_tokens_ht_element *) zmalloc(sizeof(*trigger_tokens_ht_element));
+       trigger_tokens_ht_element = zmalloc<notification_trigger_tokens_ht_element>();
        if (!trigger_tokens_ht_element) {
                ret = LTTNG_ERR_NOMEM;
                goto end;
@@ -2729,7 +2848,7 @@ int handle_notification_thread_command_register_trigger(
                goto error;
        }
 
-       trigger_ht_element = (lttng_trigger_ht_element *) zmalloc(sizeof(*trigger_ht_element));
+       trigger_ht_element = zmalloc<lttng_trigger_ht_element>();
        if (!trigger_ht_element) {
                ret = -1;
                goto error;
@@ -3107,28 +3226,43 @@ end:
        return 0;
 }
 
+static
+int pop_cmd_queue(struct notification_thread_handle *handle,
+               struct notification_thread_command **cmd)
+{
+       int ret;
+       uint64_t counter;
+
+       pthread_mutex_lock(&handle->cmd_queue.lock);
+       ret = lttng_read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
+       if (ret != sizeof(counter)) {
+               ret = -1;
+               goto error_unlock;
+       }
+
+       *cmd = cds_list_first_entry(&handle->cmd_queue.list,
+                       struct notification_thread_command, cmd_list_node);
+       cds_list_del(&((*cmd)->cmd_list_node));
+       ret = 0;
+
+error_unlock:
+       pthread_mutex_unlock(&handle->cmd_queue.lock);
+       return ret;
+}
+
 /* Returns 0 on success, 1 on exit requested, negative value on error. */
 int handle_notification_thread_command(
                struct notification_thread_handle *handle,
                struct notification_thread_state *state)
 {
        int ret;
-       uint64_t counter;
        struct notification_thread_command *cmd;
 
-       /* Read the event pipe to put it back into a quiescent state. */
-       ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
-                       sizeof(counter));
-       if (ret != sizeof(counter)) {
+       ret = pop_cmd_queue(handle, &cmd);
+       if (ret) {
                goto error;
        }
 
-       pthread_mutex_lock(&handle->cmd_queue.lock);
-       cmd = cds_list_first_entry(&handle->cmd_queue.list,
-                       struct notification_thread_command, cmd_list_node);
-       cds_list_del(&cmd->cmd_list_node);
-       pthread_mutex_unlock(&handle->cmd_queue.lock);
-
        DBG("Received `%s` command",
                        notification_command_type_str(cmd->type));
        switch (cmd->type) {
@@ -3147,9 +3281,7 @@ int handle_notification_thread_command(
        case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
                ret = handle_notification_thread_command_add_channel(
                                state,
-                               cmd->parameters.add_channel.session.name,
-                               cmd->parameters.add_channel.session.uid,
-                               cmd->parameters.add_channel.session.gid,
+                               cmd->parameters.add_channel.session.id,
                                cmd->parameters.add_channel.channel.name,
                                cmd->parameters.add_channel.channel.domain,
                                cmd->parameters.add_channel.channel.key,
@@ -3162,14 +3294,23 @@ int handle_notification_thread_command(
                                cmd->parameters.remove_channel.domain,
                                &cmd->reply_code);
                break;
+       case NOTIFICATION_COMMAND_TYPE_ADD_SESSION:
+               ret = handle_notification_thread_command_add_session(state,
+                               cmd->parameters.add_session.session_id,
+                               cmd->parameters.add_session.session_name,
+                               cmd->parameters.add_session.session_uid,
+                               cmd->parameters.add_session.session_gid, &cmd->reply_code);
+               break;
+       case NOTIFICATION_COMMAND_TYPE_REMOVE_SESSION:
+               ret = handle_notification_thread_command_remove_session(
+                               state, cmd->parameters.remove_session.session_id, &cmd->reply_code);
+               break;
        case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
        case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
                ret = handle_notification_thread_command_session_rotation(
                                state,
                                cmd->type,
-                               cmd->parameters.session_rotation.session_name,
-                               cmd->parameters.session_rotation.uid,
-                               cmd->parameters.session_rotation.gid,
+                               cmd->parameters.session_rotation.session_id,
                                cmd->parameters.session_rotation.trace_archive_chunk_id,
                                cmd->parameters.session_rotation.location,
                                &cmd->reply_code);
@@ -3320,7 +3461,7 @@ int handle_notification_thread_client_connect(
 
        DBG("Handling new notification channel client connection");
 
-       client = (notification_client *) zmalloc(sizeof(*client));
+       client = zmalloc<notification_client>();
        if (!client) {
                /* Fatal error. */
                ret = -1;
@@ -3363,9 +3504,9 @@ int handle_notification_thread_client_connect(
                goto error;
        }
 
+       client->communication.current_poll_events = CLIENT_POLL_EVENTS_IN;
        ret = lttng_poll_add(&state->events, client->socket,
-                       LPOLLIN | LPOLLERR |
-                       LPOLLHUP | LPOLLRDHUP);
+                       client->communication.current_poll_events);
        if (ret < 0) {
                ERR("Failed to add notification channel client socket to poll set");
                ret = 0;
@@ -3386,7 +3527,7 @@ int handle_notification_thread_client_connect(
        return ret;
 
 error:
-       notification_client_destroy(client, state);
+       notification_client_destroy(client);
        return ret;
 }
 
@@ -3402,6 +3543,8 @@ int notification_thread_client_disconnect(
        int ret;
        struct lttng_condition_list_element *condition_list_element, *tmp;
 
+       ASSERT_RCU_READ_LOCKED();
+
        /* Acquire the client lock to disable its communication atomically. */
        pthread_mutex_lock(&client->lock);
        client->communication.active = false;
@@ -3426,7 +3569,7 @@ int notification_thread_client_disconnect(
         * Client no longer accessible to other threads (through the
         * client lists).
         */
-       notification_client_destroy(client, state);
+       notification_client_destroy(client);
        return ret;
 }
 
@@ -3497,6 +3640,18 @@ int handle_notification_thread_trigger_unregister_all(
        return error_occurred ? -1 : 0;
 }
 
+static
+bool client_has_outbound_data_left(
+               const struct notification_client *client)
+{
+       const struct lttng_payload_view pv = lttng_payload_view_from_payload(
+                       &client->communication.outbound.payload, 0, -1);
+       const bool has_data = pv.buffer.size != 0;
+       const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
+
+       return has_data || has_fds;
+}
+
 static
 int client_handle_transmission_status(
                struct notification_client *client,
@@ -3507,24 +3662,51 @@ int client_handle_transmission_status(
 
        switch (transmission_status) {
        case CLIENT_TRANSMISSION_STATUS_COMPLETE:
-               ret = lttng_poll_mod(&state->events, client->socket,
-                               CLIENT_POLL_MASK_IN);
-               if (ret) {
-                       goto end;
-               }
-
-               break;
        case CLIENT_TRANSMISSION_STATUS_QUEUED:
+       {
+               int current_poll_events;
+               int new_poll_events;
                /*
                 * We want to be notified whenever there is buffer space
-                * available to send the rest of the payload.
+                * available to send the rest of the payload if we are
+                * waiting to send data to the client.
+                *
+                * The state of the outbound queue being sampled here is
+                * fine since:
+                *   - it is okay to wake-up "for nothing" in case we see
+                *     that data is left, but another thread succeeds in
+                *     flushing it before us when handling the client "out"
+                *     event. We will simply stop monitoring that event the next
+                *     time it wakes us up and we see no data left to be sent,
+                *   - if another thread fails to flush the entire client
+                *     outgoing queue, it will issue a "communication update"
+                *     command and cause the client's (e)poll mask to be
+                *     re-evaluated.
+                *
+                * The situation we seek to avoid would be to disable the
+                * monitoring of "out" client events indefinitely when there is
+                * data to be sent, which can't happen because of the
+                * aforementioned "communication update" mechanism.
                 */
-               ret = lttng_poll_mod(&state->events, client->socket,
-                               CLIENT_POLL_MASK_IN_OUT);
-               if (ret) {
-                       goto end;
+               pthread_mutex_lock(&client->lock);
+               current_poll_events = client->communication.current_poll_events;
+               new_poll_events = client_has_outbound_data_left(client) ?
+                               CLIENT_POLL_EVENTS_IN_OUT :
+                                     CLIENT_POLL_EVENTS_IN;
+               client->communication.current_poll_events = new_poll_events;
+               pthread_mutex_unlock(&client->lock);
+
+               /* Update the monitored event set only if it changed. */
+               if (current_poll_events != new_poll_events) {
+                       ret = lttng_poll_mod(&state->events, client->socket,
+                                       new_poll_events);
+                       if (ret) {
+                               goto end;
+                       }
                }
+
                break;
+       }
        case CLIENT_TRANSMISSION_STATUS_FAIL:
                ret = notification_thread_client_disconnect(client, state);
                if (ret) {
@@ -3664,18 +3846,6 @@ error:
        return CLIENT_TRANSMISSION_STATUS_ERROR;
 }
 
-static
-bool client_has_outbound_data_left(
-               const struct notification_client *client)
-{
-       const struct lttng_payload_view pv = lttng_payload_view_from_payload(
-                       &client->communication.outbound.payload, 0, -1);
-       const bool has_data = pv.buffer.size != 0;
-       const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
-
-       return has_data || has_fds;
-}
-
 /* Client lock must _not_ be held by the caller. */
 static
 int client_send_command_reply(struct notification_client *client,
@@ -3686,13 +3856,14 @@ int client_send_command_reply(struct notification_client *client,
        struct lttng_notification_channel_command_reply reply = {
                .status = (int8_t) status,
        };
-       struct lttng_notification_channel_message msg = {
-               .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
-               .size = sizeof(reply),
-       };
+       struct lttng_notification_channel_message msg;
        char buffer[sizeof(msg) + sizeof(reply)];
        enum client_transmission_status transmission_status;
 
+       msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY;
+       msg.size = sizeof(reply);
+       msg.fds = 0;
+
        memcpy(buffer, &msg, sizeof(msg));
        memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
        DBG("Send command reply (%i)", (int) status);
@@ -3734,7 +3905,7 @@ error:
 
 static
 int client_handle_message_unknown(struct notification_client *client,
-               struct notification_thread_state *state)
+               struct notification_thread_state *state __attribute__((unused)))
 {
        int ret;
        /*
@@ -3790,14 +3961,15 @@ int client_handle_message_handshake(struct notification_client *client,
                        .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
                        .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
        };
-       const struct lttng_notification_channel_message msg_header = {
-                       .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
-                       .size = sizeof(handshake_reply),
-       };
+       struct lttng_notification_channel_message msg_header;
        enum lttng_notification_channel_status status =
                        LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
        char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
 
+       msg_header.type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE;
+       msg_header.size = sizeof(handshake_reply);
+       msg_header.fds = 0;
+
        memcpy(send_buffer, &msg_header, sizeof(msg_header));
        memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
                        sizeof(handshake_reply));
@@ -4082,7 +4254,47 @@ int handle_notification_thread_client_out(
        }
 
        pthread_mutex_lock(&client->lock);
-       transmission_status = client_flush_outgoing_queue(client);
+       if (!client_has_outbound_data_left(client)) {
+               /*
+                * A client "out" event can be received when no payload is left
+                * to send under some circumstances.
+                *
+                * Many threads can flush a client's outgoing queue and, if they
+                * had to queue their message (socket was full), will use the
+                * "communication update" command to signal the (e)poll thread
+                * to monitor for space being made available in the socket.
+                *
+                * Commands are sent over an internal pipe serviced by the same
+                * thread as the client sockets.
+                *
+                * When space is made available in the socket, there is a race
+                * between the (e)poll thread and the other threads that may
+                * wish to use the client's socket to flush its outgoing queue.
+                *
+                * A non-(e)poll thread may attempt (and succeed) in flushing
+                * the queue before the (e)poll thread gets a chance to service
+                * the client's "out" event.
+                *
+                * In this situation, the (e)poll thread processing the client
+                * out event will see an empty payload: there is nothing to do
+                * except unsubscribing (e)poll "out" events.
+                *
+                * Note that this thread is the (e)poll thread so it can modify
+                * the (e)poll mask directly without using a communication
+                * update command. Other threads that flush the outgoing queue
+                * will use the "communication update" command to wake up this
+                * thread and force it to monitor "out" events.
+                *
+                * When other threads succeed in emptying the outgoing queue,
+                * they don't need to update the (e)poll mask: if the "out"
+                * event is monitored, it will fire once and the (e)poll
+                * thread will reach this condition, causing the event to
+                * stop being monitored.
+                */
+               transmission_status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
+       } else {
+               transmission_status = client_flush_outgoing_queue(client);
+       }
        pthread_mutex_unlock(&client->lock);
 
        ret = client_handle_transmission_status(
@@ -4103,9 +4315,8 @@ bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
        bool result = false;
        uint64_t threshold;
        enum lttng_condition_type condition_type;
-       const struct lttng_condition_buffer_usage *use_condition = container_of(
-                       condition, struct lttng_condition_buffer_usage,
-                       parent);
+       const struct lttng_condition_buffer_usage *use_condition = lttng::utils::container_of(
+                       condition, &lttng_condition_buffer_usage::parent);
 
        if (use_condition->threshold_bytes.set) {
                threshold = use_condition->threshold_bytes.value;
@@ -4161,9 +4372,8 @@ bool evaluate_session_consumed_size_condition(
 {
        uint64_t threshold;
        const struct lttng_condition_session_consumed_size *size_condition =
-                       container_of(condition,
-                               struct lttng_condition_session_consumed_size,
-                               parent);
+                       lttng::utils::container_of(condition,
+                               &lttng_condition_session_consumed_size::parent);
 
        threshold = size_condition->consumed_threshold_bytes.value;
        DBG("Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
@@ -4174,7 +4384,7 @@ bool evaluate_session_consumed_size_condition(
 static
 int evaluate_buffer_condition(const struct lttng_condition *condition,
                struct lttng_evaluation **evaluation,
-               const struct notification_thread_state *state,
+               const struct notification_thread_state *state __attribute__((unused)),
                const struct channel_state_sample *previous_sample,
                const struct channel_state_sample *latest_sample,
                uint64_t previous_session_consumed_total,
@@ -4261,9 +4471,11 @@ static
 int client_notification_overflow(struct notification_client *client)
 {
        int ret = 0;
-       const struct lttng_notification_channel_message msg = {
-               .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
-       };
+       struct lttng_notification_channel_message msg;
+
+       msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED;
+       msg.size = 0;
+       msg.fds = 0;
 
        ASSERT_LOCKED(client->lock);
 
@@ -4365,14 +4577,16 @@ int notification_client_list_send_evaluation(
                .trigger = (struct lttng_trigger *) trigger,
                .evaluation = (struct lttng_evaluation *) evaluation,
        };
-       struct lttng_notification_channel_message msg_header = {
-               .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
-       };
+       struct lttng_notification_channel_message msg_header;
        const struct lttng_credentials *trigger_creds =
                        lttng_trigger_get_credentials(trigger);
 
        lttng_payload_init(&msg_payload);
 
+       msg_header.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
+       msg_header.size = 0;
+       msg_header.fds = 0;
+
        ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
                        sizeof(msg_header));
        if (ret) {
@@ -4562,7 +4776,7 @@ struct lttng_event_notifier_notification *recv_one_event_notifier_notification(
                goto end;
        }
 
-       capture_buffer = (char *) zmalloc(capture_buffer_size);
+       capture_buffer = calloc<char>(capture_buffer_size);
        if (!capture_buffer) {
                ERR("Failed to allocate capture buffer");
                goto end;
@@ -4642,10 +4856,9 @@ int dispatch_one_event_notifier_notification(struct notification_thread_state *s
        }
 
        evaluation = lttng_evaluation_event_rule_matches_create(
-                       container_of(lttng_trigger_get_const_condition(
+                       lttng::utils::container_of(lttng_trigger_get_const_condition(
                                                     element->trigger),
-                                       struct lttng_condition_event_rule_matches,
-                                       parent),
+                                       &lttng_condition_event_rule_matches::parent),
                        notification->capture_buffer,
                        notification->capture_buf_size, false);
 
@@ -4865,7 +5078,7 @@ int handle_notification_thread_channel_sample(
                 */
                struct channel_state_sample *stored_sample;
 
-               stored_sample = (channel_state_sample *) zmalloc(sizeof(*stored_sample));
+               stored_sample = zmalloc<channel_state_sample>();
                if (!stored_sample) {
                        ret = -1;
                        goto end_unlock;
This page took 0.041918 seconds and 4 git commands to generate.