From 32670d719327feb585374283a50eeb76ce36b962 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Sat, 23 Sep 2023 11:24:26 -0400 Subject: [PATCH] waiter: modernize the waiter interface MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérémie Galarneau Change-Id: I3e8ffac4324e36dc3bf7a7f79d874f228a48def6 --- .../notification-thread-commands.cpp | 47 ++++---- .../notification-thread-commands.hpp | 19 ++-- .../notification-thread-events.cpp | 13 ++- .../consumer/consumer-metadata-cache.cpp | 11 +- src/common/consumer/consumer.cpp | 18 ++-- src/common/consumer/consumer.hpp | 96 ++++++++--------- src/common/ust-consumer/ust-consumer.cpp | 7 +- src/common/waiter.cpp | 87 ++++++++------- src/common/waiter.hpp | 101 +++++++++++++----- 9 files changed, 236 insertions(+), 163 deletions(-) diff --git a/src/bin/lttng-sessiond/notification-thread-commands.cpp b/src/bin/lttng-sessiond/notification-thread-commands.cpp index a8d774a4f..ee0eb76b4 100644 --- a/src/bin/lttng-sessiond/notification-thread-commands.cpp +++ b/src/bin/lttng-sessiond/notification-thread-commands.cpp @@ -20,7 +20,6 @@ static void init_notification_thread_command(struct notification_thread_command *cmd) { CDS_INIT_LIST_HEAD(&cmd->cmd_list_node); - lttng_waiter_init(&cmd->reply_waiter); } static int run_command_wait(struct notification_thread_handle *handle, @@ -29,6 +28,9 @@ static int run_command_wait(struct notification_thread_handle *handle, int ret; uint64_t notification_counter = 1; + lttng::synchro::waiter command_completion_waiter; + cmd->command_completed_waker.emplace(command_completion_waiter.get_waker()); + pthread_mutex_lock(&handle->cmd_queue.lock); /* Add to queue. */ cds_list_add_tail(&cmd->cmd_list_node, &handle->cmd_queue.list); @@ -46,7 +48,7 @@ static int run_command_wait(struct notification_thread_handle *handle, } pthread_mutex_unlock(&handle->cmd_queue.lock); - lttng_waiter_wait(&cmd->reply_waiter); + command_completion_waiter.wait();; return 0; error_unlock_queue: pthread_mutex_unlock(&handle->cmd_queue.lock); @@ -58,14 +60,15 @@ notification_thread_command_copy(const struct notification_thread_command *origi { struct notification_thread_command *new_cmd; - new_cmd = zmalloc(); - if (!new_cmd) { - goto end; + try { + new_cmd = new notification_thread_command; + } catch (const std::bad_alloc &e) { + ERR("Failed to allocate notification_thread_command: %s", e.what()); + return nullptr; } *new_cmd = *original_cmd; init_notification_thread_command(new_cmd); -end: return new_cmd; } @@ -96,10 +99,12 @@ static int run_command_no_wait(struct notification_thread_handle *handle, cds_list_del(&new_cmd->cmd_list_node); goto error_unlock_queue; } + pthread_mutex_unlock(&handle->cmd_queue.lock); return 0; error_unlock_queue: - free(new_cmd); + + delete new_cmd; pthread_mutex_unlock(&handle->cmd_queue.lock); error: return -1; @@ -112,7 +117,7 @@ notification_thread_command_register_trigger(struct notification_thread_handle * { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; LTTNG_ASSERT(trigger); init_notification_thread_command(&cmd); @@ -138,7 +143,7 @@ notification_thread_command_unregister_trigger(struct notification_thread_handle { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); @@ -164,7 +169,7 @@ notification_thread_command_add_session(struct notification_thread_handle *handl { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); @@ -190,7 +195,7 @@ notification_thread_command_remove_session(struct notification_thread_handle *ha { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); @@ -217,7 +222,7 @@ notification_thread_command_add_channel(struct notification_thread_handle *handl { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); @@ -243,7 +248,7 @@ enum lttng_error_code notification_thread_command_remove_channel( { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); @@ -268,7 +273,7 @@ notification_thread_command_session_rotation_ongoing(struct notification_thread_ { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); @@ -294,7 +299,7 @@ enum lttng_error_code notification_thread_command_session_rotation_completed( { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); @@ -320,7 +325,7 @@ notification_thread_command_add_tracer_event_source(struct notification_thread_h { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; LTTNG_ASSERT(tracer_event_source_fd >= 0); @@ -347,7 +352,7 @@ notification_thread_command_remove_tracer_event_source(struct notification_threa { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); @@ -370,7 +375,7 @@ enum lttng_error_code notification_thread_command_list_triggers( { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; LTTNG_ASSERT(handle); LTTNG_ASSERT(triggers); @@ -396,7 +401,7 @@ end: void notification_thread_command_quit(struct notification_thread_handle *handle) { int ret; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); @@ -410,7 +415,7 @@ int notification_thread_client_communication_update( notification_client_id id, enum client_transmission_status transmission_status) { - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); @@ -427,7 +432,7 @@ notification_thread_command_get_trigger(struct notification_thread_handle *handl { int ret; enum lttng_error_code ret_code; - struct notification_thread_command cmd = {}; + notification_thread_command cmd; init_notification_thread_command(&cmd); diff --git a/src/bin/lttng-sessiond/notification-thread-commands.hpp b/src/bin/lttng-sessiond/notification-thread-commands.hpp index 4058de5a4..8a3150ae3 100644 --- a/src/bin/lttng-sessiond/notification-thread-commands.hpp +++ b/src/bin/lttng-sessiond/notification-thread-commands.hpp @@ -17,6 +17,8 @@ #include #include +#include + #include #include @@ -41,9 +43,9 @@ enum notification_thread_command_type { }; struct notification_thread_command { - struct cds_list_head cmd_list_node; + struct cds_list_head cmd_list_node = {}; - enum notification_thread_command_type type; + notification_thread_command_type type = NOTIFICATION_COMMAND_TYPE_QUIT; union { /* Register trigger. */ struct { @@ -108,7 +110,7 @@ struct notification_thread_command { const struct lttng_trigger *trigger; } get_trigger; - } parameters; + } parameters = {}; union { struct { @@ -117,11 +119,12 @@ struct notification_thread_command { struct { struct lttng_trigger *trigger; } get_trigger; - } reply; - /* lttng_waiter on which to wait for command reply (optional). */ - struct lttng_waiter reply_waiter; - enum lttng_error_code reply_code; - bool is_async; + } reply = {}; + + /* Used to wake origin thread for synchroneous commands. */ + nonstd::optional command_completed_waker = nonstd::nullopt; + lttng_error_code reply_code = LTTNG_ERR_UNK; + bool is_async = false; }; enum lttng_error_code diff --git a/src/bin/lttng-sessiond/notification-thread-events.cpp b/src/bin/lttng-sessiond/notification-thread-events.cpp index f18bc68d4..70774a975 100644 --- a/src/bin/lttng-sessiond/notification-thread-events.cpp +++ b/src/bin/lttng-sessiond/notification-thread-events.cpp @@ -3311,21 +3311,24 @@ int handle_notification_thread_command(struct notification_thread_handle *handle if (ret) { goto error_unlock; } + end: if (cmd->is_async) { - free(cmd); + delete cmd; cmd = nullptr; } else { - lttng_waiter_wake(&cmd->reply_waiter); + cmd->command_completed_waker->wake(); } + return ret; + error_unlock: /* Wake-up and return a fatal error to the calling thread. */ - lttng_waiter_wake(&cmd->reply_waiter); cmd->reply_code = LTTNG_ERR_FATAL; + error: - /* Indicate a fatal error to the caller. */ - return -1; + ret = -1; + goto end; } static int socket_set_non_blocking(int socket) diff --git a/src/common/consumer/consumer-metadata-cache.cpp b/src/common/consumer/consumer-metadata-cache.cpp index 462e079d8..5e84f2b96 100644 --- a/src/common/consumer/consumer-metadata-cache.cpp +++ b/src/common/consumer/consumer-metadata-cache.cpp @@ -258,18 +258,17 @@ void consumer_wait_metadata_cache_flushed(struct lttng_consumer_channel *channel /* Metadata cache is not currently flushed, wait on wait queue. */ for (;;) { - struct lttng_waiter waiter; + lttng::synchro::waiter waiter; - lttng_waiter_init(&waiter); - lttng_wait_queue_add(&channel->metadata_pushed_wait_queue, &waiter); + channel->metadata_pushed_wait_queue.add(waiter); if (consumer_metadata_cache_is_flushed(channel, offset, invoked_by_timer)) { /* Wake up all waiters, ourself included. */ - lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); + channel->metadata_pushed_wait_queue.wake_all(); /* Ensure proper teardown of waiter. */ - lttng_waiter_wait(&waiter); + waiter.wait(); break; } - lttng_waiter_wait(&waiter); + waiter.wait(); } } diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 4823d9b9c..7e7e7d384 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -303,7 +303,8 @@ static void free_channel_rcu(struct rcu_head *head) ERR("Unknown consumer_data type"); abort(); } - free(channel); + + delete channel; } /* @@ -462,7 +463,7 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); - lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue); + stream->chan->metadata_pushed_wait_queue.wake_all(); DBG("Delete flag set to metadata stream %d", stream->wait_fd); } @@ -1019,9 +1020,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, } } - channel = zmalloc(); - if (channel == nullptr) { - PERROR("malloc struct lttng_consumer_channel"); + try { + channel = new lttng_consumer_channel; + } catch (const std::bad_alloc& e) { + ERR("Failed to allocate lttng_consumer_channel: %s", e.what()); + channel = nullptr; goto end; } @@ -1037,7 +1040,6 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->is_live = is_in_live_session; pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->timer_lock, NULL); - lttng_wait_queue_init(&channel->metadata_pushed_wait_queue); switch (output) { case LTTNG_EVENT_SPLICE: @@ -1048,7 +1050,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, break; default: abort(); - free(channel); + delete channel; channel = nullptr; goto end; } @@ -2133,7 +2135,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l * pointer value. */ channel->metadata_stream = nullptr; - lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue); + channel->metadata_pushed_wait_queue.wake_all(); if (channel->metadata_cache) { pthread_mutex_unlock(&channel->metadata_cache->lock); diff --git a/src/common/consumer/consumer.hpp b/src/common/consumer/consumer.hpp index 9de8d05a3..c62e01453 100644 --- a/src/common/consumer/consumer.hpp +++ b/src/common/consumer/consumer.hpp @@ -109,30 +109,30 @@ struct consumer_metadata_cache; struct lttng_consumer_channel { /* Is the channel published in the channel hash tables? */ - bool is_published; + bool is_published = false; /* * Was the channel deleted (logically) and waiting to be reclaimed? * If this flag is set, no modification that is not cleaned-up by the * RCU reclamation callback should be made */ - bool is_deleted; + bool is_deleted = false; /* HT node used for consumer_data.channel_ht */ - struct lttng_ht_node_u64 node; + struct lttng_ht_node_u64 node = {}; /* HT node used for consumer_data.channels_by_session_id_ht */ - struct lttng_ht_node_u64 channels_by_session_id_ht_node; + struct lttng_ht_node_u64 channels_by_session_id_ht_node = {}; /* Indexed key. Incremented value in the consumer. */ - uint64_t key; + uint64_t key = 0; /* Number of streams referencing this channel */ - int refcount; + int refcount = 0; /* Tracing session id on the session daemon side. */ - uint64_t session_id; + uint64_t session_id = 0; /* Current trace chunk of the session in which this channel exists. */ - struct lttng_trace_chunk *trace_chunk; + struct lttng_trace_chunk *trace_chunk = nullptr; /* * Session id when requesting metadata to the session daemon for * a session with per-PID buffers. */ - uint64_t session_id_per_pid; + uint64_t session_id_per_pid = 0; /* * In the case of local streams, this field contains the channel's * output path; a path relative to the session's output path. @@ -144,77 +144,77 @@ struct lttng_consumer_channel { * peers, this contains a path of the form: * /hostname/session_path/ust/uid/1000/64-bit */ - char pathname[PATH_MAX]; + char pathname[PATH_MAX] = {}; /* Channel name. */ - char name[LTTNG_SYMBOL_NAME_LEN]; + char name[LTTNG_SYMBOL_NAME_LEN] = {}; /* Relayd id of the channel. -1ULL if it does not apply. */ - uint64_t relayd_id; + uint64_t relayd_id = 0; /* * Number of streams NOT initialized yet. This is used in order to not * delete this channel if streams are getting initialized. */ - unsigned int nb_init_stream_left; + unsigned int nb_init_stream_left = 0; /* Output type (mmap or splice). */ - enum consumer_channel_output output; + enum consumer_channel_output output = CONSUMER_CHANNEL_MMAP; /* Channel type for stream */ - enum consumer_channel_type type; + enum consumer_channel_type type = CONSUMER_CHANNEL_TYPE_METADATA; /* For UST */ - uid_t ust_app_uid; /* Application UID. */ - struct lttng_ust_ctl_consumer_channel *uchan; - unsigned char uuid[LTTNG_UUID_STR_LEN]; + uid_t ust_app_uid = 65534; /* Application UID. */ + struct lttng_ust_ctl_consumer_channel *uchan = nullptr; + unsigned char uuid[LTTNG_UUID_STR_LEN] = {}; /* * Temporary stream list used to store the streams once created and waiting * to be sent to the session daemon by receiving the * LTTNG_CONSUMER_GET_CHANNEL. */ - struct stream_list streams; + struct stream_list streams = {}; /* * Set if the channel is metadata. We keep a reference to the stream * because we have to flush data once pushed by the session daemon. For a * regular channel, this is always set to NULL. */ - struct lttng_consumer_stream *metadata_stream; + struct lttng_consumer_stream *metadata_stream = nullptr; /* for UST */ - int wait_fd; + int wait_fd = -1; /* Node within channel thread ht */ - struct lttng_ht_node_u64 wait_fd_node; + struct lttng_ht_node_u64 wait_fd_node = {}; /* Metadata cache is metadata channel */ - struct consumer_metadata_cache *metadata_cache; + struct consumer_metadata_cache *metadata_cache = nullptr; /* * Wait queue awaiting updates to metadata stream's flushed position. */ - struct lttng_wait_queue metadata_pushed_wait_queue; + lttng::synchro::wait_queue metadata_pushed_wait_queue; /* For UST metadata periodical flush */ - int switch_timer_enabled; - timer_t switch_timer; - int switch_timer_error; + int switch_timer_enabled = 0; + timer_t switch_timer = {}; + int switch_timer_error = 0; /* For the live mode */ - int live_timer_enabled; - timer_t live_timer; - int live_timer_error; + int live_timer_enabled = 0; + timer_t live_timer = {}; + int live_timer_error = 0; /* Channel is part of a live session ? */ - bool is_live; + bool is_live = false; /* For channel monitoring timer. */ - int monitor_timer_enabled; - timer_t monitor_timer; + int monitor_timer_enabled = 0; + timer_t monitor_timer = {}; /* On-disk circular buffer */ - uint64_t tracefile_size; - uint64_t tracefile_count; + uint64_t tracefile_size = 0; + uint64_t tracefile_count = 0; /* * Monitor or not the streams of this channel meaning this indicates if the * streams should be sent to the data/metadata thread or added to the no * monitor list of the channel. */ - unsigned int monitor; + unsigned int monitor = 0; /* * Channel lock. @@ -227,7 +227,7 @@ struct lttng_consumer_channel { * This is nested OUTSIDE stream lock. * This is nested OUTSIDE consumer_relayd_sock_pair lock. */ - pthread_mutex_t lock; + pthread_mutex_t lock = {}; /* * Channel teardown lock. @@ -241,24 +241,24 @@ struct lttng_consumer_channel { * This is nested OUTSIDE stream lock. * This is nested OUTSIDE consumer_relayd_sock_pair lock. */ - pthread_mutex_t timer_lock; + pthread_mutex_t timer_lock = {}; /* Timer value in usec for live streaming. */ - unsigned int live_timer_interval; + unsigned int live_timer_interval = 0; - int *stream_fds; - int nr_stream_fds; - char root_shm_path[PATH_MAX]; - char shm_path[PATH_MAX]; + int *stream_fds = nullptr; + int nr_stream_fds = 0; + char root_shm_path[PATH_MAX] = {}; + char shm_path[PATH_MAX] = {}; /* Only set for UST channels. */ - LTTNG_OPTIONAL(struct lttng_credentials) buffer_credentials; + LTTNG_OPTIONAL(struct lttng_credentials) buffer_credentials = {}; /* Total number of discarded events for that channel. */ - uint64_t discarded_events; + uint64_t discarded_events = 0; /* Total number of missed packets due to overwriting (overwrite). */ - uint64_t lost_packets; + uint64_t lost_packets = 0; - bool streams_sent_to_relayd; - uint64_t last_consumed_size_sample_sent; + bool streams_sent_to_relayd = false; + uint64_t last_consumed_size_sample_sent = false; }; struct stream_subbuffer { diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index 00671a876..3f40d03ed 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -930,7 +930,7 @@ error: */ consumer_stream_destroy(metadata->metadata_stream, nullptr); metadata->metadata_stream = nullptr; - lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue); + metadata->metadata_pushed_wait_queue.wake_all(); send_streams_error: error_no_stream: @@ -1015,7 +1015,7 @@ error_stream: */ consumer_stream_destroy(metadata_stream, nullptr); metadata_channel->metadata_stream = nullptr; - lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue); + metadata_channel->metadata_pushed_wait_queue.wake_all(); error: return ret; @@ -2548,8 +2548,9 @@ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream) ret = write_len; goto end; } + stream->ust_metadata_pushed += write_len; - lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue); + stream->chan->metadata_pushed_wait_queue.wake_all(); LTTNG_ASSERT(stream->chan->metadata_cache->contents.size >= stream->ust_metadata_pushed); ret = write_len; diff --git a/src/common/waiter.cpp b/src/common/waiter.cpp index d185e2997..037feca22 100644 --- a/src/common/waiter.cpp +++ b/src/common/waiter.cpp @@ -14,10 +14,9 @@ #include #include -/* - * Number of busy-loop attempts before waiting on futex. - */ -#define WAIT_ATTEMPTS 1000 +namespace { +/* Number of busy-loop attempts before waiting on futex. */ +constexpr auto wait_attempt_count = 1000; enum waiter_state { /* WAITER_WAITING is compared directly (futex compares it). */ @@ -27,36 +26,40 @@ enum waiter_state { WAITER_RUNNING = (1 << 1), WAITER_TEARDOWN = (1 << 2), }; +} /* namespace */ -void lttng_waiter_init(struct lttng_waiter *waiter) +lttng::synchro::waiter::waiter() { - cds_wfs_node_init(&waiter->wait_queue_node); - uatomic_set(&waiter->state, WAITER_WAITING); + arm(); +} + +void lttng::synchro::waiter::arm() noexcept +{ + cds_wfs_node_init(&_wait_queue_node); + uatomic_set(&_state, WAITER_WAITING); cmm_smp_mb(); } /* - * User must init "waiter" before passing its memory to waker thread. + * User must arm "waiter" before passing its memory to waker thread. */ -void lttng_waiter_wait(struct lttng_waiter *waiter) +void lttng::synchro::waiter::wait() { - unsigned int i; - DBG("Beginning of waiter \"wait\" period"); /* Load and test condition before read state. */ cmm_smp_rmb(); - for (i = 0; i < WAIT_ATTEMPTS; i++) { - if (uatomic_read(&waiter->state) != WAITER_WAITING) { + for (unsigned int i = 0; i < wait_attempt_count; i++) { + if (uatomic_read(&_state) != WAITER_WAITING) { goto skip_futex_wait; } caa_cpu_relax(); } - while (uatomic_read(&waiter->state) == WAITER_WAITING) { + while (uatomic_read(&_state) == WAITER_WAITING) { if (!futex_noasync( - &waiter->state, FUTEX_WAIT, WAITER_WAITING, nullptr, nullptr, 0)) { + &_state, FUTEX_WAIT, WAITER_WAITING, nullptr, nullptr, 0)) { /* * Prior queued wakeups queued by unrelated code * using the same address can cause futex wait to @@ -84,76 +87,82 @@ void lttng_waiter_wait(struct lttng_waiter *waiter) skip_futex_wait: /* Tell waker thread than we are running. */ - uatomic_or(&waiter->state, WAITER_RUNNING); + uatomic_or(&_state, WAITER_RUNNING); /* * Wait until waker thread lets us know it's ok to tear down * memory allocated for struct lttng_waiter. */ - for (i = 0; i < WAIT_ATTEMPTS; i++) { - if (uatomic_read(&waiter->state) & WAITER_TEARDOWN) { + for (unsigned int i = 0; i < wait_attempt_count; i++) { + if (uatomic_read(&_state) & WAITER_TEARDOWN) { break; } caa_cpu_relax(); } - while (!(uatomic_read(&waiter->state) & WAITER_TEARDOWN)) { + while (!(uatomic_read(&_state) & WAITER_TEARDOWN)) { poll(nullptr, 0, 10); } - LTTNG_ASSERT(uatomic_read(&waiter->state) & WAITER_TEARDOWN); + LTTNG_ASSERT(uatomic_read(&_state) & WAITER_TEARDOWN); DBG("End of waiter \"wait\" period"); } +lttng::synchro::waker lttng::synchro::waiter::get_waker() +{ + return lttng::synchro::waker(_state); +} + /* * Note: lttng_waiter_wake needs waiter to stay allocated throughout its * execution. In this scheme, the waiter owns the node memory, and we only allow * it to free this memory when it sees the WAITER_TEARDOWN flag. */ -void lttng_waiter_wake(struct lttng_waiter *waiter) +void lttng::synchro::waker::wake() { cmm_smp_mb(); - LTTNG_ASSERT(uatomic_read(&waiter->state) == WAITER_WAITING); - uatomic_set(&waiter->state, WAITER_WOKEN_UP); - if (!(uatomic_read(&waiter->state) & WAITER_RUNNING)) { - if (futex_noasync(&waiter->state, FUTEX_WAKE, 1, nullptr, nullptr, 0) < 0) { + + LTTNG_ASSERT(uatomic_read(&_state) == WAITER_WAITING); + + uatomic_set(&_state, WAITER_WOKEN_UP); + if (!(uatomic_read(&_state) & WAITER_RUNNING)) { + if (futex_noasync(&_state, FUTEX_WAKE, 1, nullptr, nullptr, 0) < 0) { PERROR("futex_noasync"); abort(); } } /* Allow teardown of struct urcu_wait memory. */ - uatomic_or(&waiter->state, WAITER_TEARDOWN); + uatomic_or(&_state, WAITER_TEARDOWN); } -void lttng_wait_queue_init(struct lttng_wait_queue *queue) +lttng::synchro::wait_queue::wait_queue() { - cds_wfs_init(&queue->stack); + cds_wfs_init(&_stack); } -void lttng_wait_queue_add(struct lttng_wait_queue *queue, struct lttng_waiter *waiter) +void lttng::synchro::wait_queue::add(waiter &waiter) noexcept { - (void) cds_wfs_push(&queue->stack, &waiter->wait_queue_node); + (void) cds_wfs_push(&_stack, &waiter._wait_queue_node); } -void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue) +void lttng::synchro::wait_queue::wake_all() { - cds_wfs_head *waiters; - cds_wfs_node *iter, *iter_n; - /* Move all waiters from the queue to our local stack. */ - waiters = __cds_wfs_pop_all(&queue->stack); + auto *waiters = __cds_wfs_pop_all(&_stack); /* Wake all waiters in our stack head. */ - cds_wfs_for_each_blocking_safe (waiters, iter, iter_n) { - auto *waiter = lttng::utils::container_of(iter, <tng_waiter::wait_queue_node); + cds_wfs_node *iter, *iter_n; + cds_wfs_for_each_blocking_safe(waiters, iter, iter_n) { + auto& waiter = *lttng::utils::container_of( + iter, <tng::synchro::waiter::_wait_queue_node); /* Don't wake already running threads. */ - if (waiter->state & WAITER_RUNNING) { + if (waiter._state & WAITER_RUNNING) { continue; } - lttng_waiter_wake(waiter); + waiter.get_waker().wake(); } } diff --git a/src/common/waiter.hpp b/src/common/waiter.hpp index 8ea0c2d83..7f5c2f171 100644 --- a/src/common/waiter.hpp +++ b/src/common/waiter.hpp @@ -18,38 +18,89 @@ #include #include -struct lttng_waiter { - struct cds_wfs_node wait_queue_node; - int32_t state; -}; +namespace lttng { +namespace synchro { +class waiter; +class wait_queue; + +class waker { + friend waiter; + +public: + waker(const waker&) = default; + waker(waker&&) = default; + waker& operator=(const waker& other) + { + _state = other._state; + return *this; + } + waker& operator=(waker&& other) + { + _state = other._state; + return *this; + } + + void wake(); + + ~waker() = default; -struct lttng_wait_queue { - struct cds_wfs_stack stack; +private: + waker(int32_t& state) : _state{ state } + { + } + + int32_t& _state; }; -void lttng_waiter_init(struct lttng_waiter *waiter); +class waiter final { + friend wait_queue; -void lttng_waiter_wait(struct lttng_waiter *waiter); +public: + waiter(); -/* - * lttng_waiter_wake must only be called by a single waker. - * It is invalid for multiple "wake" operations to be invoked - * on a single waiter without re-initializing it before. - */ -void lttng_waiter_wake(struct lttng_waiter *waiter); + /* Deactivate copy and assignment. */ + waiter(const waiter&) = delete; + waiter(waiter&&) = delete; + waiter& operator=(const waiter&) = delete; + waiter& operator=(waiter&&) = delete; + ~waiter() = default; -void lttng_wait_queue_init(struct lttng_wait_queue *queue); + void arm() noexcept; + void wait(); -/* - * Atomically add a waiter to a wait queue. - * A full memory barrier is issued before being added to the wait queue. - */ -void lttng_wait_queue_add(struct lttng_wait_queue *queue, struct lttng_waiter *waiter); + waker get_waker(); -/* - * Wake every waiter present in the wait queue and remove them from - * the queue. - */ -void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue); +private: + cds_wfs_node _wait_queue_node; + int32_t _state; +}; + +class wait_queue final { +public: + wait_queue(); + + /* Deactivate copy and assignment. */ + wait_queue(const wait_queue&) = delete; + wait_queue(wait_queue&&) = delete; + wait_queue& operator=(const wait_queue&) = delete; + wait_queue& operator=(wait_queue&&) = delete; + ~wait_queue() = default; + + /* + * Atomically add a waiter to a wait queue. + * A full memory barrier is issued before being added to the wait queue. + */ + void add(waiter& waiter) noexcept; + /* + * Wake every waiter present in the wait queue and remove them from + * the queue. + */ + void wake_all(); + +private: + cds_wfs_stack _stack; +}; +} /* namespace synchro */ +} /* namespace lttng */ #endif /* LTTNG_WAITER_H */ -- 2.34.1