waiter: modernize the waiter interface
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Sat, 23 Sep 2023 15:24:26 +0000 (11:24 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Mon, 2 Oct 2023 15:28:41 +0000 (11:28 -0400)
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I3e8ffac4324e36dc3bf7a7f79d874f228a48def6

src/bin/lttng-sessiond/notification-thread-commands.cpp
src/bin/lttng-sessiond/notification-thread-commands.hpp
src/bin/lttng-sessiond/notification-thread-events.cpp
src/common/consumer/consumer-metadata-cache.cpp
src/common/consumer/consumer.cpp
src/common/consumer/consumer.hpp
src/common/ust-consumer/ust-consumer.cpp
src/common/waiter.cpp
src/common/waiter.hpp

index a8d774a4f0764081e9d3203fd20fd49cea5fbaf8..ee0eb76b47b66712f9c95af5718a79f3a9d1b32c 100644 (file)
@@ -20,7 +20,6 @@
 static void init_notification_thread_command(struct notification_thread_command *cmd)
 {
        CDS_INIT_LIST_HEAD(&cmd->cmd_list_node);
-       lttng_waiter_init(&cmd->reply_waiter);
 }
 
 static int run_command_wait(struct notification_thread_handle *handle,
@@ -29,6 +28,9 @@ static int run_command_wait(struct notification_thread_handle *handle,
        int ret;
        uint64_t notification_counter = 1;
 
+       lttng::synchro::waiter command_completion_waiter;
+       cmd->command_completed_waker.emplace(command_completion_waiter.get_waker());
+
        pthread_mutex_lock(&handle->cmd_queue.lock);
        /* Add to queue. */
        cds_list_add_tail(&cmd->cmd_list_node, &handle->cmd_queue.list);
@@ -46,7 +48,7 @@ static int run_command_wait(struct notification_thread_handle *handle,
        }
        pthread_mutex_unlock(&handle->cmd_queue.lock);
 
-       lttng_waiter_wait(&cmd->reply_waiter);
+       command_completion_waiter.wait();;
        return 0;
 error_unlock_queue:
        pthread_mutex_unlock(&handle->cmd_queue.lock);
@@ -58,14 +60,15 @@ notification_thread_command_copy(const struct notification_thread_command *origi
 {
        struct notification_thread_command *new_cmd;
 
-       new_cmd = zmalloc<notification_thread_command>();
-       if (!new_cmd) {
-               goto end;
+       try {
+               new_cmd = new notification_thread_command;
+       } catch (const std::bad_alloc &e) {
+               ERR("Failed to allocate notification_thread_command: %s", e.what());
+               return nullptr;
        }
 
        *new_cmd = *original_cmd;
        init_notification_thread_command(new_cmd);
-end:
        return new_cmd;
 }
 
@@ -96,10 +99,12 @@ static int run_command_no_wait(struct notification_thread_handle *handle,
                cds_list_del(&new_cmd->cmd_list_node);
                goto error_unlock_queue;
        }
+
        pthread_mutex_unlock(&handle->cmd_queue.lock);
        return 0;
 error_unlock_queue:
-       free(new_cmd);
+
+       delete new_cmd;
        pthread_mutex_unlock(&handle->cmd_queue.lock);
 error:
        return -1;
@@ -112,7 +117,7 @@ notification_thread_command_register_trigger(struct notification_thread_handle *
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        LTTNG_ASSERT(trigger);
        init_notification_thread_command(&cmd);
@@ -138,7 +143,7 @@ notification_thread_command_unregister_trigger(struct notification_thread_handle
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
@@ -164,7 +169,7 @@ notification_thread_command_add_session(struct notification_thread_handle *handl
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
@@ -190,7 +195,7 @@ notification_thread_command_remove_session(struct notification_thread_handle *ha
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
@@ -217,7 +222,7 @@ notification_thread_command_add_channel(struct notification_thread_handle *handl
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
@@ -243,7 +248,7 @@ enum lttng_error_code notification_thread_command_remove_channel(
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
@@ -268,7 +273,7 @@ notification_thread_command_session_rotation_ongoing(struct notification_thread_
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
@@ -294,7 +299,7 @@ enum lttng_error_code notification_thread_command_session_rotation_completed(
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
@@ -320,7 +325,7 @@ notification_thread_command_add_tracer_event_source(struct notification_thread_h
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        LTTNG_ASSERT(tracer_event_source_fd >= 0);
 
@@ -347,7 +352,7 @@ notification_thread_command_remove_tracer_event_source(struct notification_threa
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
@@ -370,7 +375,7 @@ enum lttng_error_code notification_thread_command_list_triggers(
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        LTTNG_ASSERT(handle);
        LTTNG_ASSERT(triggers);
@@ -396,7 +401,7 @@ end:
 void notification_thread_command_quit(struct notification_thread_handle *handle)
 {
        int ret;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
@@ -410,7 +415,7 @@ int notification_thread_client_communication_update(
        notification_client_id id,
        enum client_transmission_status transmission_status)
 {
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
@@ -427,7 +432,7 @@ notification_thread_command_get_trigger(struct notification_thread_handle *handl
 {
        int ret;
        enum lttng_error_code ret_code;
-       struct notification_thread_command cmd = {};
+       notification_thread_command cmd;
 
        init_notification_thread_command(&cmd);
 
index 4058de5a4d89953b41b4d947dcc4f9e230b9ce70..8a3150ae32d72987b5c46a20b9096188649e97a1 100644 (file)
@@ -17,6 +17,8 @@
 #include <lttng/domain.h>
 #include <lttng/lttng-error.h>
 
+#include <vendor/optional.hpp>
+
 #include <stdbool.h>
 #include <urcu/rculfhash.h>
 
@@ -41,9 +43,9 @@ enum notification_thread_command_type {
 };
 
 struct notification_thread_command {
-       struct cds_list_head cmd_list_node;
+       struct cds_list_head cmd_list_node = {};
 
-       enum notification_thread_command_type type;
+       notification_thread_command_type type = NOTIFICATION_COMMAND_TYPE_QUIT;
        union {
                /* Register trigger. */
                struct {
@@ -108,7 +110,7 @@ struct notification_thread_command {
                        const struct lttng_trigger *trigger;
                } get_trigger;
 
-       } parameters;
+       } parameters = {};
 
        union {
                struct {
@@ -117,11 +119,12 @@ struct notification_thread_command {
                struct {
                        struct lttng_trigger *trigger;
                } get_trigger;
-       } reply;
-       /* lttng_waiter on which to wait for command reply (optional). */
-       struct lttng_waiter reply_waiter;
-       enum lttng_error_code reply_code;
-       bool is_async;
+       } reply = {};
+
+       /* Used to wake origin thread for synchroneous commands. */
+       nonstd::optional<lttng::synchro::waker> command_completed_waker = nonstd::nullopt;
+       lttng_error_code reply_code = LTTNG_ERR_UNK;
+       bool is_async = false;
 };
 
 enum lttng_error_code
index f18bc68d44760c969b01c347500c90214c201982..70774a97530b1b625033387c2215ab052e47bf54 100644 (file)
@@ -3311,21 +3311,24 @@ int handle_notification_thread_command(struct notification_thread_handle *handle
        if (ret) {
                goto error_unlock;
        }
+
 end:
        if (cmd->is_async) {
-               free(cmd);
+               delete cmd;
                cmd = nullptr;
        } else {
-               lttng_waiter_wake(&cmd->reply_waiter);
+               cmd->command_completed_waker->wake();
        }
+
        return ret;
+
 error_unlock:
        /* Wake-up and return a fatal error to the calling thread. */
-       lttng_waiter_wake(&cmd->reply_waiter);
        cmd->reply_code = LTTNG_ERR_FATAL;
+
 error:
-       /* Indicate a fatal error to the caller. */
-       return -1;
+       ret = -1;
+       goto end;
 }
 
 static int socket_set_non_blocking(int socket)
index 462e079d8c705dfb5785244fd67f3cd4b63d5f59..5e84f2b96f277ca7b80c4144feda327b717c2f99 100644 (file)
@@ -258,18 +258,17 @@ void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel
 
        /* Metadata cache is not currently flushed, wait on wait queue. */
        for (;;) {
-               struct lttng_waiter waiter;
+               lttng::synchro::waiter waiter;
 
-               lttng_waiter_init(&waiter);
-               lttng_wait_queue_add(&channel->metadata_pushed_wait_queue, &waiter);
+               channel->metadata_pushed_wait_queue.add(waiter);
                if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) {
                        /* Wake up all waiters, ourself included. */
-                       lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
+                       channel->metadata_pushed_wait_queue.wake_all();
                        /* Ensure proper teardown of waiter. */
-                       lttng_waiter_wait(&waiter);
+                       waiter.wait();
                        break;
                }
 
-               lttng_waiter_wait(&waiter);
+               waiter.wait();
        }
 }
index 4823d9b9c59aa6f9933d9e7552fe27fb48865cb6..7e7e7d384cd20565e48bf3bad2793a41f31d1ca6 100644 (file)
@@ -303,7 +303,8 @@ static void free_channel_rcu(struct rcu_head *head)
                ERR("Unknown consumer_data type");
                abort();
        }
-       free(channel);
+
+       delete channel;
 }
 
 /*
@@ -462,7 +463,7 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
        cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
-                       lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
+                       stream->chan->metadata_pushed_wait_queue.wake_all();
 
                        DBG("Delete flag set to metadata stream %d", stream->wait_fd);
                }
@@ -1019,9 +1020,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                }
        }
 
-       channel = zmalloc<lttng_consumer_channel>();
-       if (channel == nullptr) {
-               PERROR("malloc struct lttng_consumer_channel");
+       try {
+               channel = new lttng_consumer_channel;
+       } catch (const std::bad_alloc& e) {
+               ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+               channel = nullptr;
                goto end;
        }
 
@@ -1037,7 +1040,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->is_live = is_in_live_session;
        pthread_mutex_init(&channel->lock, NULL);
        pthread_mutex_init(&channel->timer_lock, NULL);
-       lttng_wait_queue_init(&channel->metadata_pushed_wait_queue);
 
        switch (output) {
        case LTTNG_EVENT_SPLICE:
@@ -1048,7 +1050,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                break;
        default:
                abort();
-               free(channel);
+               delete channel;
                channel = nullptr;
                goto end;
        }
@@ -2133,7 +2135,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l
         * pointer value.
         */
        channel->metadata_stream = nullptr;
-       lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
+       channel->metadata_pushed_wait_queue.wake_all();
 
        if (channel->metadata_cache) {
                pthread_mutex_unlock(&channel->metadata_cache->lock);
index 9de8d05a3cb188b98c12e8c0c5ac1a476500591a..c62e014530bfabdc39a28fb333f2d586056af384 100644 (file)
@@ -109,30 +109,30 @@ struct consumer_metadata_cache;
 
 struct lttng_consumer_channel {
        /* Is the channel published in the channel hash tables? */
-       bool is_published;
+       bool is_published = false;
        /*
         * Was the channel deleted (logically) and waiting to be reclaimed?
         * If this flag is set, no modification that is not cleaned-up by the
         * RCU reclamation callback should be made
         */
-       bool is_deleted;
+       bool is_deleted = false;
        /* HT node used for consumer_data.channel_ht */
-       struct lttng_ht_node_u64 node;
+       struct lttng_ht_node_u64 node = {};
        /* HT node used for consumer_data.channels_by_session_id_ht */
-       struct lttng_ht_node_u64 channels_by_session_id_ht_node;
+       struct lttng_ht_node_u64 channels_by_session_id_ht_node = {};
        /* Indexed key. Incremented value in the consumer. */
-       uint64_t key;
+       uint64_t key = 0;
        /* Number of streams referencing this channel */
-       int refcount;
+       int refcount = 0;
        /* Tracing session id on the session daemon side. */
-       uint64_t session_id;
+       uint64_t session_id = 0;
        /* Current trace chunk of the session in which this channel exists. */
-       struct lttng_trace_chunk *trace_chunk;
+       struct lttng_trace_chunk *trace_chunk = nullptr;
        /*
         * Session id when requesting metadata to the session daemon for
         * a session with per-PID buffers.
         */
-       uint64_t session_id_per_pid;
+       uint64_t session_id_per_pid = 0;
        /*
         * In the case of local streams, this field contains the channel's
         * output path; a path relative to the session's output path.
@@ -144,77 +144,77 @@ struct lttng_consumer_channel {
         * peers, this contains a path of the form:
         *   /hostname/session_path/ust/uid/1000/64-bit
         */
-       char pathname[PATH_MAX];
+       char pathname[PATH_MAX] = {};
        /* Channel name. */
-       char name[LTTNG_SYMBOL_NAME_LEN];
+       char name[LTTNG_SYMBOL_NAME_LEN] = {};
        /* Relayd id of the channel. -1ULL if it does not apply. */
-       uint64_t relayd_id;
+       uint64_t relayd_id = 0;
        /*
         * Number of streams NOT initialized yet. This is used in order to not
         * delete this channel if streams are getting initialized.
         */
-       unsigned int nb_init_stream_left;
+       unsigned int nb_init_stream_left = 0;
        /* Output type (mmap or splice). */
-       enum consumer_channel_output output;
+       enum consumer_channel_output output = CONSUMER_CHANNEL_MMAP;
        /* Channel type for stream */
-       enum consumer_channel_type type;
+       enum consumer_channel_type type = CONSUMER_CHANNEL_TYPE_METADATA;
 
        /* For UST */
-       uid_t ust_app_uid; /* Application UID. */
-       struct lttng_ust_ctl_consumer_channel *uchan;
-       unsigned char uuid[LTTNG_UUID_STR_LEN];
+       uid_t ust_app_uid = 65534; /* Application UID. */
+       struct lttng_ust_ctl_consumer_channel *uchan = nullptr;
+       unsigned char uuid[LTTNG_UUID_STR_LEN] = {};
        /*
         * Temporary stream list used to store the streams once created and waiting
         * to be sent to the session daemon by receiving the
         * LTTNG_CONSUMER_GET_CHANNEL.
         */
-       struct stream_list streams;
+       struct stream_list streams = {};
 
        /*
         * Set if the channel is metadata. We keep a reference to the stream
         * because we have to flush data once pushed by the session daemon. For a
         * regular channel, this is always set to NULL.
         */
-       struct lttng_consumer_stream *metadata_stream;
+       struct lttng_consumer_stream *metadata_stream = nullptr;
 
        /* for UST */
-       int wait_fd;
+       int wait_fd = -1;
        /* Node within channel thread ht */
-       struct lttng_ht_node_u64 wait_fd_node;
+       struct lttng_ht_node_u64 wait_fd_node = {};
 
        /* Metadata cache is metadata channel */
-       struct consumer_metadata_cache *metadata_cache;
+       struct consumer_metadata_cache *metadata_cache = nullptr;
 
        /*
         * Wait queue awaiting updates to metadata stream's flushed position.
         */
-       struct lttng_wait_queue metadata_pushed_wait_queue;
+       lttng::synchro::wait_queue metadata_pushed_wait_queue;
 
        /* For UST metadata periodical flush */
-       int switch_timer_enabled;
-       timer_t switch_timer;
-       int switch_timer_error;
+       int switch_timer_enabled = 0;
+       timer_t switch_timer = {};
+       int switch_timer_error = 0;
 
        /* For the live mode */
-       int live_timer_enabled;
-       timer_t live_timer;
-       int live_timer_error;
+       int live_timer_enabled = 0;
+       timer_t live_timer = {};
+       int live_timer_error = 0;
        /* Channel is part of a live session ? */
-       bool is_live;
+       bool is_live = false;
 
        /* For channel monitoring timer. */
-       int monitor_timer_enabled;
-       timer_t monitor_timer;
+       int monitor_timer_enabled = 0;
+       timer_t monitor_timer = {};
 
        /* On-disk circular buffer */
-       uint64_t tracefile_size;
-       uint64_t tracefile_count;
+       uint64_t tracefile_size = 0;
+       uint64_t tracefile_count = 0;
        /*
         * Monitor or not the streams of this channel meaning this indicates if the
         * streams should be sent to the data/metadata thread or added to the no
         * monitor list of the channel.
         */
-       unsigned int monitor;
+       unsigned int monitor = 0;
 
        /*
         * Channel lock.
@@ -227,7 +227,7 @@ struct lttng_consumer_channel {
         * This is nested OUTSIDE stream lock.
         * This is nested OUTSIDE consumer_relayd_sock_pair lock.
         */
-       pthread_mutex_t lock;
+       pthread_mutex_t lock = {};
 
        /*
         * Channel teardown lock.
@@ -241,24 +241,24 @@ struct lttng_consumer_channel {
         * This is nested OUTSIDE stream lock.
         * This is nested OUTSIDE consumer_relayd_sock_pair lock.
         */
-       pthread_mutex_t timer_lock;
+       pthread_mutex_t timer_lock = {};
 
        /* Timer value in usec for live streaming. */
-       unsigned int live_timer_interval;
+       unsigned int live_timer_interval = 0;
 
-       int *stream_fds;
-       int nr_stream_fds;
-       char root_shm_path[PATH_MAX];
-       char shm_path[PATH_MAX];
+       int *stream_fds = nullptr;
+       int nr_stream_fds = 0;
+       char root_shm_path[PATH_MAX] = {};
+       char shm_path[PATH_MAX] = {};
        /* Only set for UST channels. */
-       LTTNG_OPTIONAL(struct lttng_credentials) buffer_credentials;
+       LTTNG_OPTIONAL(struct lttng_credentials) buffer_credentials = {};
        /* Total number of discarded events for that channel. */
-       uint64_t discarded_events;
+       uint64_t discarded_events = 0;
        /* Total number of missed packets due to overwriting (overwrite). */
-       uint64_t lost_packets;
+       uint64_t lost_packets = 0;
 
-       bool streams_sent_to_relayd;
-       uint64_t last_consumed_size_sample_sent;
+       bool streams_sent_to_relayd = false;
+       uint64_t last_consumed_size_sample_sent = false;
 };
 
 struct stream_subbuffer {
index 00671a876a8bb637d631b58e31d1bb55cd793322..3f40d03ed3d846cb8fa95788c94bf555be5fb28f 100644 (file)
@@ -930,7 +930,7 @@ error:
         */
        consumer_stream_destroy(metadata->metadata_stream, nullptr);
        metadata->metadata_stream = nullptr;
-       lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue);
+       metadata->metadata_pushed_wait_queue.wake_all();
 
 send_streams_error:
 error_no_stream:
@@ -1015,7 +1015,7 @@ error_stream:
         */
        consumer_stream_destroy(metadata_stream, nullptr);
        metadata_channel->metadata_stream = nullptr;
-       lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue);
+       metadata_channel->metadata_pushed_wait_queue.wake_all();
 
 error:
        return ret;
@@ -2548,8 +2548,9 @@ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
                ret = write_len;
                goto end;
        }
+
        stream->ust_metadata_pushed += write_len;
-       lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
+       stream->chan->metadata_pushed_wait_queue.wake_all();
 
        LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed);
        ret = write_len;
index d185e299720617e64e7514d114e33cea4a0eb386..037feca2260b34d2e99cd553369e3763abb28d33 100644 (file)
 #include <urcu/futex.h>
 #include <urcu/uatomic.h>
 
-/*
- * Number of busy-loop attempts before waiting on futex.
- */
-#define WAIT_ATTEMPTS 1000
+namespace {
+/* Number of busy-loop attempts before waiting on futex. */
+constexpr auto wait_attempt_count = 1000;
 
 enum waiter_state {
        /* WAITER_WAITING is compared directly (futex compares it). */
@@ -27,36 +26,40 @@ enum waiter_state {
        WAITER_RUNNING = (1 << 1),
        WAITER_TEARDOWN = (1 << 2),
 };
+} /* namespace */
 
-void lttng_waiter_init(struct lttng_waiter *waiter)
+lttng::synchro::waiter::waiter()
 {
-       cds_wfs_node_init(&waiter->wait_queue_node);
-       uatomic_set(&waiter->state, WAITER_WAITING);
+       arm();
+}
+
+void lttng::synchro::waiter::arm() noexcept
+{
+       cds_wfs_node_init(&_wait_queue_node);
+       uatomic_set(&_state, WAITER_WAITING);
        cmm_smp_mb();
 }
 
 /*
- * User must init "waiter" before passing its memory to waker thread.
+ * User must arm "waiter" before passing its memory to waker thread.
  */
-void lttng_waiter_wait(struct lttng_waiter *waiter)
+void lttng::synchro::waiter::wait()
 {
-       unsigned int i;
-
        DBG("Beginning of waiter \"wait\" period");
 
        /* Load and test condition before read state. */
        cmm_smp_rmb();
-       for (i = 0; i < WAIT_ATTEMPTS; i++) {
-               if (uatomic_read(&waiter->state) != WAITER_WAITING) {
+       for (unsigned int i = 0; i < wait_attempt_count; i++) {
+               if (uatomic_read(&_state) != WAITER_WAITING) {
                        goto skip_futex_wait;
                }
 
                caa_cpu_relax();
        }
 
-       while (uatomic_read(&waiter->state) == WAITER_WAITING) {
+       while (uatomic_read(&_state) == WAITER_WAITING) {
                if (!futex_noasync(
-                           &waiter->state, FUTEX_WAIT, WAITER_WAITING, nullptr, nullptr, 0)) {
+                           &_state, FUTEX_WAIT, WAITER_WAITING, nullptr, nullptr, 0)) {
                        /*
                         * Prior queued wakeups queued by unrelated code
                         * using the same address can cause futex wait to
@@ -84,76 +87,82 @@ void lttng_waiter_wait(struct lttng_waiter *waiter)
 skip_futex_wait:
 
        /* Tell waker thread than we are running. */
-       uatomic_or(&waiter->state, WAITER_RUNNING);
+       uatomic_or(&_state, WAITER_RUNNING);
 
        /*
         * Wait until waker thread lets us know it's ok to tear down
         * memory allocated for struct lttng_waiter.
         */
-       for (i = 0; i < WAIT_ATTEMPTS; i++) {
-               if (uatomic_read(&waiter->state) & WAITER_TEARDOWN) {
+       for (unsigned int i = 0; i < wait_attempt_count; i++) {
+               if (uatomic_read(&_state) & WAITER_TEARDOWN) {
                        break;
                }
 
                caa_cpu_relax();
        }
 
-       while (!(uatomic_read(&waiter->state) & WAITER_TEARDOWN)) {
+       while (!(uatomic_read(&_state) & WAITER_TEARDOWN)) {
                poll(nullptr, 0, 10);
        }
 
-       LTTNG_ASSERT(uatomic_read(&waiter->state) & WAITER_TEARDOWN);
+       LTTNG_ASSERT(uatomic_read(&_state) & WAITER_TEARDOWN);
        DBG("End of waiter \"wait\" period");
 }
 
+lttng::synchro::waker lttng::synchro::waiter::get_waker()
+{
+       return lttng::synchro::waker(_state);
+}
+
 /*
  * Note: lttng_waiter_wake needs waiter to stay allocated throughout its
  * execution. In this scheme, the waiter owns the node memory, and we only allow
  * it to free this memory when it sees the WAITER_TEARDOWN flag.
  */
-void lttng_waiter_wake(struct lttng_waiter *waiter)
+void lttng::synchro::waker::wake()
 {
        cmm_smp_mb();
-       LTTNG_ASSERT(uatomic_read(&waiter->state) == WAITER_WAITING);
-       uatomic_set(&waiter->state, WAITER_WOKEN_UP);
-       if (!(uatomic_read(&waiter->state) & WAITER_RUNNING)) {
-               if (futex_noasync(&waiter->state, FUTEX_WAKE, 1, nullptr, nullptr, 0) < 0) {
+
+       LTTNG_ASSERT(uatomic_read(&_state) == WAITER_WAITING);
+
+       uatomic_set(&_state, WAITER_WOKEN_UP);
+       if (!(uatomic_read(&_state) & WAITER_RUNNING)) {
+               if (futex_noasync(&_state, FUTEX_WAKE, 1, nullptr, nullptr, 0) < 0) {
                        PERROR("futex_noasync");
                        abort();
                }
        }
 
        /* Allow teardown of struct urcu_wait memory. */
-       uatomic_or(&waiter->state, WAITER_TEARDOWN);
+       uatomic_or(&_state, WAITER_TEARDOWN);
 }
 
-void lttng_wait_queue_init(struct lttng_wait_queue *queue)
+lttng::synchro::wait_queue::wait_queue()
 {
-       cds_wfs_init(&queue->stack);
+       cds_wfs_init(&_stack);
 }
 
-void lttng_wait_queue_add(struct lttng_wait_queue *queue, struct lttng_waiter *waiter)
+void lttng::synchro::wait_queue::add(waiter &waiter) noexcept
 {
-       (void) cds_wfs_push(&queue->stack, &waiter->wait_queue_node);
+       (void) cds_wfs_push(&_stack, &waiter._wait_queue_node);
 }
 
-void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue)
+void lttng::synchro::wait_queue::wake_all()
 {
-       cds_wfs_head *waiters;
-       cds_wfs_node *iter, *iter_n;
-
        /* Move all waiters from the queue to our local stack. */
-       waiters = __cds_wfs_pop_all(&queue->stack);
+       auto *waiters = __cds_wfs_pop_all(&_stack);
 
        /* Wake all waiters in our stack head. */
-       cds_wfs_for_each_blocking_safe (waiters, iter, iter_n) {
-               auto *waiter = lttng::utils::container_of(iter, &lttng_waiter::wait_queue_node);
+       cds_wfs_node *iter, *iter_n;
+       cds_wfs_for_each_blocking_safe(waiters, iter, iter_n) {
+               auto& waiter = *lttng::utils::container_of(
+                       iter, &lttng::synchro::waiter::_wait_queue_node);
 
                /* Don't wake already running threads. */
-               if (waiter->state & WAITER_RUNNING) {
+               if (waiter._state & WAITER_RUNNING) {
                        continue;
                }
 
-               lttng_waiter_wake(waiter);
+               waiter.get_waker().wake();
        }
 }
index 8ea0c2d83f450ad7ff6da749f57e6e0424c68e13..7f5c2f171f4e10448e011cff42a0cc7022d05a85 100644 (file)
 #include <stdint.h>
 #include <urcu/wfstack.h>
 
-struct lttng_waiter {
-       struct cds_wfs_node wait_queue_node;
-       int32_t state;
-};
+namespace lttng {
+namespace synchro {
+class waiter;
+class wait_queue;
+
+class waker {
+       friend waiter;
+
+public:
+       waker(const waker&) = default;
+       waker(waker&&) = default;
+       waker& operator=(const waker& other)
+       {
+               _state = other._state;
+               return *this;
+       }
+       waker& operator=(waker&& other)
+       {
+               _state = other._state;
+               return *this;
+       }
+
+       void wake();
+
+       ~waker() = default;
 
-struct lttng_wait_queue {
-       struct cds_wfs_stack stack;
+private:
+       waker(int32_t& state) : _state{ state }
+       {
+       }
+
+       int32_t& _state;
 };
 
-void lttng_waiter_init(struct lttng_waiter *waiter);
+class waiter final {
+       friend wait_queue;
 
-void lttng_waiter_wait(struct lttng_waiter *waiter);
+public:
+       waiter();
 
-/*
- * lttng_waiter_wake must only be called by a single waker.
- * It is invalid for multiple "wake" operations to be invoked
- * on a single waiter without re-initializing it before.
- */
-void lttng_waiter_wake(struct lttng_waiter *waiter);
+       /* Deactivate copy and assignment. */
+       waiter(const waiter&) = delete;
+       waiter(waiter&&) = delete;
+       waiter& operator=(const waiter&) = delete;
+       waiter& operator=(waiter&&) = delete;
+       ~waiter() = default;
 
-void lttng_wait_queue_init(struct lttng_wait_queue *queue);
+       void arm() noexcept;
+       void wait();
 
-/*
- * Atomically add a waiter to a wait queue.
- * A full memory barrier is issued before being added to the wait queue.
- */
-void lttng_wait_queue_add(struct lttng_wait_queue *queue, struct lttng_waiter *waiter);
+       waker get_waker();
 
-/*
- * Wake every waiter present in the wait queue and remove them from
- * the queue.
- */
-void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue);
+private:
+       cds_wfs_node _wait_queue_node;
+       int32_t _state;
+};
+
+class wait_queue final {
+public:
+       wait_queue();
+
+       /* Deactivate copy and assignment. */
+       wait_queue(const wait_queue&) = delete;
+       wait_queue(wait_queue&&) = delete;
+       wait_queue& operator=(const wait_queue&) = delete;
+       wait_queue& operator=(wait_queue&&) = delete;
+       ~wait_queue() = default;
+
+       /*
+        * Atomically add a waiter to a wait queue.
+        * A full memory barrier is issued before being added to the wait queue.
+        */
+       void add(waiter& waiter) noexcept;
+       /*
+        * Wake every waiter present in the wait queue and remove them from
+        * the queue.
+        */
+       void wake_all();
+
+private:
+       cds_wfs_stack _stack;
+};
+} /* namespace synchro */
+} /* namespace lttng */
 
 #endif /* LTTNG_WAITER_H */
This page took 0.04185 seconds and 4 git commands to generate.