X-Git-Url: https://git.lttng.org/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fwaiter.cpp;h=2a1dded89805a6188a5b77b9f75e8ce5a99a976c;hp=3ddb68feb7c76dd91f09bec9d7664c02224627a2;hb=HEAD;hpb=36962e16797b5ca590c54a385ca594e2859ef8c2 diff --git a/src/common/waiter.cpp b/src/common/waiter.cpp index 3ddb68feb..8d2ba4d69 100644 --- a/src/common/waiter.cpp +++ b/src/common/waiter.cpp @@ -6,58 +6,77 @@ * */ -#include "waiter.hpp" -#include -#include #include "error.hpp" +#include "macros.hpp" +#include "waiter.hpp" + #include +#include +#include -/* - * Number of busy-loop attempts before waiting on futex. - */ -#define WAIT_ATTEMPTS 1000 +namespace { +/* Number of busy-loop attempts before waiting on futex. */ +constexpr auto wait_attempt_count = 1000; enum waiter_state { /* WAITER_WAITING is compared directly (futex compares it). */ - WAITER_WAITING = 0, + WAITER_WAITING = 0, /* non-zero are used as masks. */ - WAITER_WOKEN_UP = (1 << 0), - WAITER_RUNNING = (1 << 1), - WAITER_TEARDOWN = (1 << 2), + WAITER_WOKEN_UP = (1 << 0), + WAITER_RUNNING = (1 << 1), + WAITER_TEARDOWN = (1 << 2), }; +} /* namespace */ + +lttng::synchro::waiter::waiter() +{ + arm(); +} -void lttng_waiter_init(struct lttng_waiter *waiter) +void lttng::synchro::waiter::arm() noexcept { - cds_wfs_node_init(&waiter->wait_queue_node); - uatomic_set(&waiter->state, WAITER_WAITING); + cds_wfs_node_init(&_wait_queue_node); + uatomic_set(&_state, WAITER_WAITING); cmm_smp_mb(); } /* - * User must init "waiter" before passing its memory to waker thread. + * User must arm "waiter" before passing its memory to waker thread. */ -void lttng_waiter_wait(struct lttng_waiter *waiter) +void lttng::synchro::waiter::wait() { - unsigned int i; + DBG("Beginning of waiter \"wait\" period"); - DBG("Beginning of waiter wait period"); - /* Load and test condition before read state */ + /* Load and test condition before read state. */ cmm_smp_rmb(); - for (i = 0; i < WAIT_ATTEMPTS; i++) { - if (uatomic_read(&waiter->state) != WAITER_WAITING) { + for (unsigned int i = 0; i < wait_attempt_count; i++) { + if (uatomic_read(&_state) != WAITER_WAITING) { goto skip_futex_wait; } + caa_cpu_relax(); } - while (futex_noasync(&waiter->state, FUTEX_WAIT, WAITER_WAITING, - NULL, NULL, 0)) { + + while (uatomic_read(&_state) == WAITER_WAITING) { + if (!futex_noasync(&_state, FUTEX_WAIT, WAITER_WAITING, nullptr, nullptr, 0)) { + /* + * Prior queued wakeups queued by unrelated code + * using the same address can cause futex wait to + * return 0 even through the futex value is still + * WAITER_WAITING (spurious wakeups). Check + * the value again in user-space to validate + * whether it really differs from WAITER_WAITING. + */ + continue; + } + switch (errno) { - case EWOULDBLOCK: + case EAGAIN: /* Value already changed. */ goto skip_futex_wait; case EINTR: /* Retry if interrupted by signal. */ - break; /* Get out of switch. */ + break; /* Get out of switch. Check again. */ default: /* Unexpected error. */ PERROR("futex_noasync"); @@ -67,23 +86,31 @@ void lttng_waiter_wait(struct lttng_waiter *waiter) skip_futex_wait: /* Tell waker thread than we are running. */ - uatomic_or(&waiter->state, WAITER_RUNNING); + uatomic_or(&_state, WAITER_RUNNING); /* * Wait until waker thread lets us know it's ok to tear down * memory allocated for struct lttng_waiter. */ - for (i = 0; i < WAIT_ATTEMPTS; i++) { - if (uatomic_read(&waiter->state) & WAITER_TEARDOWN) { + for (unsigned int i = 0; i < wait_attempt_count; i++) { + if (uatomic_read(&_state) & WAITER_TEARDOWN) { break; } + caa_cpu_relax(); } - while (!(uatomic_read(&waiter->state) & WAITER_TEARDOWN)) { - poll(NULL, 0, 10); + + while (!(uatomic_read(&_state) & WAITER_TEARDOWN)) { + poll(nullptr, 0, 10); } - LTTNG_ASSERT(uatomic_read(&waiter->state) & WAITER_TEARDOWN); - DBG("End of waiter wait period"); + + LTTNG_ASSERT(uatomic_read(&_state) & WAITER_TEARDOWN); + DBG("End of waiter \"wait\" period"); +} + +lttng::synchro::waker lttng::synchro::waiter::get_waker() +{ + return lttng::synchro::waker(_state); } /* @@ -91,18 +118,50 @@ skip_futex_wait: * execution. In this scheme, the waiter owns the node memory, and we only allow * it to free this memory when it sees the WAITER_TEARDOWN flag. */ -void lttng_waiter_wake_up(struct lttng_waiter *waiter) +void lttng::synchro::waker::wake() { cmm_smp_mb(); - LTTNG_ASSERT(uatomic_read(&waiter->state) == WAITER_WAITING); - uatomic_set(&waiter->state, WAITER_WOKEN_UP); - if (!(uatomic_read(&waiter->state) & WAITER_RUNNING)) { - if (futex_noasync(&waiter->state, FUTEX_WAKE, 1, - NULL, NULL, 0) < 0) { + + LTTNG_ASSERT(uatomic_read(&_state.get()) == WAITER_WAITING); + + uatomic_set(&_state.get(), WAITER_WOKEN_UP); + if (!(uatomic_read(&_state.get()) & WAITER_RUNNING)) { + if (futex_noasync(&_state.get(), FUTEX_WAKE, 1, nullptr, nullptr, 0) < 0) { PERROR("futex_noasync"); abort(); } } + /* Allow teardown of struct urcu_wait memory. */ - uatomic_or(&waiter->state, WAITER_TEARDOWN); + uatomic_or(&_state.get(), WAITER_TEARDOWN); +} + +lttng::synchro::wait_queue::wait_queue() +{ + cds_wfs_init(&_stack); +} + +void lttng::synchro::wait_queue::add(waiter& waiter) noexcept +{ + (void) cds_wfs_push(&_stack, &waiter._wait_queue_node); +} + +void lttng::synchro::wait_queue::wake_all() +{ + /* Move all waiters from the queue to our local stack. */ + auto *waiters = __cds_wfs_pop_all(&_stack); + + /* Wake all waiters in our stack head. */ + cds_wfs_node *iter, *iter_n; + cds_wfs_for_each_blocking_safe (waiters, iter, iter_n) { + auto& waiter = *lttng::utils::container_of( + iter, <tng::synchro::waiter::_wait_queue_node); + + /* Don't wake already running threads. */ + if (waiter._state & WAITER_RUNNING) { + continue; + } + + waiter.get_waker().wake(); + } }