X-Git-Url: https://git.lttng.org/?a=blobdiff_plain;f=src%2Fcommon%2Fwaiter.cpp;h=d185e299720617e64e7514d114e33cea4a0eb386;hb=75f62e5383c6ea1f62fb488a94f4a8f98400db71;hp=3ddb68feb7c76dd91f09bec9d7664c02224627a2;hpb=c9e313bc594f40a86eed237dce222c0fc99c957f;p=lttng-tools.git diff --git a/src/common/waiter.cpp b/src/common/waiter.cpp index 3ddb68feb..d185e2997 100644 --- a/src/common/waiter.cpp +++ b/src/common/waiter.cpp @@ -6,11 +6,13 @@ * */ -#include "waiter.hpp" -#include -#include #include "error.hpp" +#include "macros.hpp" +#include "waiter.hpp" + #include +#include +#include /* * Number of busy-loop attempts before waiting on futex. @@ -19,11 +21,11 @@ enum waiter_state { /* WAITER_WAITING is compared directly (futex compares it). */ - WAITER_WAITING = 0, + WAITER_WAITING = 0, /* non-zero are used as masks. */ - WAITER_WOKEN_UP = (1 << 0), - WAITER_RUNNING = (1 << 1), - WAITER_TEARDOWN = (1 << 2), + WAITER_WOKEN_UP = (1 << 0), + WAITER_RUNNING = (1 << 1), + WAITER_TEARDOWN = (1 << 2), }; void lttng_waiter_init(struct lttng_waiter *waiter) @@ -40,24 +42,39 @@ void lttng_waiter_wait(struct lttng_waiter *waiter) { unsigned int i; - DBG("Beginning of waiter wait period"); - /* Load and test condition before read state */ + DBG("Beginning of waiter \"wait\" period"); + + /* Load and test condition before read state. */ cmm_smp_rmb(); for (i = 0; i < WAIT_ATTEMPTS; i++) { if (uatomic_read(&waiter->state) != WAITER_WAITING) { goto skip_futex_wait; } + caa_cpu_relax(); } - while (futex_noasync(&waiter->state, FUTEX_WAIT, WAITER_WAITING, - NULL, NULL, 0)) { + + while (uatomic_read(&waiter->state) == WAITER_WAITING) { + if (!futex_noasync( + &waiter->state, FUTEX_WAIT, WAITER_WAITING, nullptr, nullptr, 0)) { + /* + * Prior queued wakeups queued by unrelated code + * using the same address can cause futex wait to + * return 0 even through the futex value is still + * WAITER_WAITING (spurious wakeups). Check + * the value again in user-space to validate + * whether it really differs from WAITER_WAITING. + */ + continue; + } + switch (errno) { - case EWOULDBLOCK: + case EAGAIN: /* Value already changed. */ goto skip_futex_wait; case EINTR: /* Retry if interrupted by signal. */ - break; /* Get out of switch. */ + break; /* Get out of switch. Check again. */ default: /* Unexpected error. */ PERROR("futex_noasync"); @@ -77,13 +94,16 @@ skip_futex_wait: if (uatomic_read(&waiter->state) & WAITER_TEARDOWN) { break; } + caa_cpu_relax(); } + while (!(uatomic_read(&waiter->state) & WAITER_TEARDOWN)) { - poll(NULL, 0, 10); + poll(nullptr, 0, 10); } + LTTNG_ASSERT(uatomic_read(&waiter->state) & WAITER_TEARDOWN); - DBG("End of waiter wait period"); + DBG("End of waiter \"wait\" period"); } /* @@ -91,18 +111,49 @@ skip_futex_wait: * execution. In this scheme, the waiter owns the node memory, and we only allow * it to free this memory when it sees the WAITER_TEARDOWN flag. */ -void lttng_waiter_wake_up(struct lttng_waiter *waiter) +void lttng_waiter_wake(struct lttng_waiter *waiter) { cmm_smp_mb(); LTTNG_ASSERT(uatomic_read(&waiter->state) == WAITER_WAITING); uatomic_set(&waiter->state, WAITER_WOKEN_UP); if (!(uatomic_read(&waiter->state) & WAITER_RUNNING)) { - if (futex_noasync(&waiter->state, FUTEX_WAKE, 1, - NULL, NULL, 0) < 0) { + if (futex_noasync(&waiter->state, FUTEX_WAKE, 1, nullptr, nullptr, 0) < 0) { PERROR("futex_noasync"); abort(); } } + /* Allow teardown of struct urcu_wait memory. */ uatomic_or(&waiter->state, WAITER_TEARDOWN); } + +void lttng_wait_queue_init(struct lttng_wait_queue *queue) +{ + cds_wfs_init(&queue->stack); +} + +void lttng_wait_queue_add(struct lttng_wait_queue *queue, struct lttng_waiter *waiter) +{ + (void) cds_wfs_push(&queue->stack, &waiter->wait_queue_node); +} + +void lttng_wait_queue_wake_all(struct lttng_wait_queue *queue) +{ + cds_wfs_head *waiters; + cds_wfs_node *iter, *iter_n; + + /* Move all waiters from the queue to our local stack. */ + waiters = __cds_wfs_pop_all(&queue->stack); + + /* Wake all waiters in our stack head. */ + cds_wfs_for_each_blocking_safe (waiters, iter, iter_n) { + auto *waiter = lttng::utils::container_of(iter, <tng_waiter::wait_queue_node); + + /* Don't wake already running threads. */ + if (waiter->state & WAITER_RUNNING) { + continue; + } + + lttng_waiter_wake(waiter); + } +}