4 * Userspace RCU QSBR library
6 * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License as published by the Free Software Foundation; either
12 * version 2.1 of the License, or (at your option) any later version.
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Lesser General Public License for more details.
19 * You should have received a copy of the GNU Lesser General Public
20 * License along with this library; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 * IBM's contributions to this file may be relicensed under LGPLv2 or later.
38 #include "urcu/wfcqueue.h"
39 #include "urcu/wfstack.h"
40 #include "urcu/map/urcu-qsbr.h"
41 #define BUILD_QSBR_LIB
42 #include "urcu/static/urcu-qsbr.h"
43 #include "urcu-pointer.h"
44 #include "urcu/tls-compat.h"
48 /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
50 #include "urcu-qsbr.h"
53 void __attribute__((destructor
)) rcu_exit(void);
55 static pthread_mutex_t rcu_gp_lock
= PTHREAD_MUTEX_INITIALIZER
;
60 * Global grace period counter.
62 unsigned long rcu_gp_ctr
= RCU_GP_ONLINE
;
65 * Active attempts to check for reader Q.S. before calling futex().
67 #define RCU_QS_ACTIVE_ATTEMPTS 100
70 * Written to only by each individual reader. Read by both the reader and the
73 DEFINE_URCU_TLS(struct rcu_reader
, rcu_reader
);
76 unsigned int rcu_yield_active
;
77 DEFINE_URCU_TLS(unsigned int, rcu_rand_yield
);
80 static CDS_LIST_HEAD(registry
);
83 * Number of busy-loop attempts before waiting on futex for grace period
86 #define RCU_AWAKE_ATTEMPTS 1000
88 enum adapt_wakeup_state
{
89 /* AWAKE_WAITING is compared directly (futex compares it). */
91 /* non-zero are used as masks. */
92 AWAKE_WAKEUP
= (1 << 0),
93 AWAKE_AWAKENED
= (1 << 1),
94 AWAKE_TEARDOWN
= (1 << 2),
97 struct gp_waiters_thread
{
98 struct cds_wfs_node node
;
103 * Stack keeping threads awaiting to wait for a grace period. Contains
104 * struct gp_waiters_thread objects.
106 static struct cds_wfs_stack gp_waiters
= {
108 .lock
= PTHREAD_MUTEX_INITIALIZER
,
111 static void mutex_lock(pthread_mutex_t
*mutex
)
115 #ifndef DISTRUST_SIGNALS_EXTREME
116 ret
= pthread_mutex_lock(mutex
);
119 #else /* #ifndef DISTRUST_SIGNALS_EXTREME */
120 while ((ret
= pthread_mutex_trylock(mutex
)) != 0) {
121 if (ret
!= EBUSY
&& ret
!= EINTR
)
125 #endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
128 static void mutex_unlock(pthread_mutex_t
*mutex
)
132 ret
= pthread_mutex_unlock(mutex
);
138 * synchronize_rcu() waiting. Single thread.
140 static void wait_gp(void)
142 /* Read reader_gp before read futex */
144 if (uatomic_read(&rcu_gp_futex
) == -1)
145 futex_noasync(&rcu_gp_futex
, FUTEX_WAIT
, -1,
150 * Note: urcu_adaptative_wake_up needs "value" to stay allocated
151 * throughout its execution. In this scheme, the waiter owns the futex
152 * memory, and we only allow it to free this memory when it receives the
153 * AWAKE_TEARDOWN flag.
155 static void urcu_adaptative_wake_up(int32_t *value
)
158 assert(uatomic_read(value
) == AWAKE_WAITING
);
159 uatomic_set(value
, AWAKE_WAKEUP
);
160 if (!(uatomic_read(value
) & AWAKE_AWAKENED
))
161 futex_noasync(value
, FUTEX_WAKE
, 1, NULL
, NULL
, 0);
162 /* Allow teardown of "value" memory. */
163 uatomic_or(value
, AWAKE_TEARDOWN
);
167 * Caller must initialize "value" to AWAKE_WAITING before passing its
168 * memory to waker thread.
170 static void urcu_adaptative_busy_wait(int32_t *value
)
174 /* Load and test condition before read futex */
176 for (i
= 0; i
< RCU_AWAKE_ATTEMPTS
; i
++) {
177 if (uatomic_read(value
) != AWAKE_WAITING
)
178 goto skip_futex_wait
;
181 futex_noasync(value
, FUTEX_WAIT
, AWAKE_WAITING
, NULL
, NULL
, 0);
184 /* Tell waker thread than we are awakened. */
185 uatomic_or(value
, AWAKE_AWAKENED
);
188 * Wait until waker thread lets us know it's ok to tear down
189 * memory allocated for value.
191 for (i
= 0; i
< RCU_AWAKE_ATTEMPTS
; i
++) {
192 if (uatomic_read(value
) & AWAKE_TEARDOWN
)
196 while (!(uatomic_read(value
) & AWAKE_TEARDOWN
))
198 assert(uatomic_read(value
) & AWAKE_TEARDOWN
);
201 static void wait_for_readers(struct cds_list_head
*input_readers
,
202 struct cds_list_head
*cur_snap_readers
,
203 struct cds_list_head
*qsreaders
)
206 struct rcu_reader
*index
, *tmp
;
209 * Wait for each thread URCU_TLS(rcu_reader).ctr to either
210 * indicate quiescence (offline), or for them to observe the
211 * current rcu_gp_ctr value.
215 if (wait_loops
>= RCU_QS_ACTIVE_ATTEMPTS
) {
216 uatomic_set(&rcu_gp_futex
, -1);
218 * Write futex before write waiting (the other side
219 * reads them in the opposite order).
222 cds_list_for_each_entry(index
, input_readers
, node
) {
223 _CMM_STORE_SHARED(index
->waiting
, 1);
225 /* Write futex before read reader_gp */
228 cds_list_for_each_entry_safe(index
, tmp
, input_readers
, node
) {
229 switch (rcu_reader_state(&index
->ctr
)) {
230 case RCU_READER_ACTIVE_CURRENT
:
231 if (cur_snap_readers
) {
232 cds_list_move(&index
->node
,
237 case RCU_READER_INACTIVE
:
238 cds_list_move(&index
->node
, qsreaders
);
240 case RCU_READER_ACTIVE_OLD
:
242 * Old snapshot. Leaving node in
243 * input_readers will make us busy-loop
244 * until the snapshot becomes current or
245 * the reader becomes inactive.
251 if (cds_list_empty(input_readers
)) {
252 if (wait_loops
>= RCU_QS_ACTIVE_ATTEMPTS
) {
253 /* Read reader_gp before write futex */
255 uatomic_set(&rcu_gp_futex
, 0);
259 if (wait_loops
>= RCU_QS_ACTIVE_ATTEMPTS
) {
262 #ifndef HAS_INCOHERENT_CACHES
264 #else /* #ifndef HAS_INCOHERENT_CACHES */
266 #endif /* #else #ifndef HAS_INCOHERENT_CACHES */
273 * Using a two-subphases algorithm for architectures with smaller than 64-bit
274 * long-size to ensure we do not encounter an overflow bug.
277 #if (CAA_BITS_PER_LONG < 64)
278 void synchronize_rcu(void)
280 CDS_LIST_HEAD(cur_snap_readers
);
281 CDS_LIST_HEAD(qsreaders
);
282 unsigned long was_online
;
283 struct gp_waiters_thread gp_waiters_thread
;
284 struct cds_wfs_head
*gp_waiters_head
;
285 struct cds_wfs_node
*waiters_iter
, *waiters_iter_n
;
287 was_online
= URCU_TLS(rcu_reader
).ctr
;
289 /* All threads should read qparity before accessing data structure
290 * where new ptr points to. In the "then" case, rcu_thread_offline
291 * includes a memory barrier.
293 * Mark the writer thread offline to make sure we don't wait for
294 * our own quiescent state. This allows using synchronize_rcu()
295 * in threads registered as readers.
298 rcu_thread_offline();
303 * Add ourself to gp_waiters stack of threads awaiting to wait
304 * for a grace period. Proceed to perform the grace period only
305 * if we are the first thread added into the stack.
307 cds_wfs_node_init(&gp_waiters_thread
.node
);
308 gp_waiters_thread
.wait_futex
= AWAKE_WAITING
;
309 if (cds_wfs_push(&gp_waiters
, &gp_waiters_node
) != 0) {
310 /* Not first in stack: will be awakened by another thread. */
311 urcu_adaptative_busy_wait(&gp_waiters_thread
.wait_futex
);
315 mutex_lock(&rcu_gp_lock
);
318 * Pop all waiters into our local stack head.
320 gp_waiters_head
= __cds_wfs_pop_all(&gp_waiters
);
322 if (cds_list_empty(®istry
))
326 * Wait for readers to observe original parity or be quiescent.
328 wait_for_readers(®istry
, &cur_snap_readers
, &qsreaders
);
331 * Must finish waiting for quiescent state for original parity
332 * before committing next rcu_gp_ctr update to memory. Failure
333 * to do so could result in the writer waiting forever while new
334 * readers are always accessing data (no progress). Enforce
335 * compiler-order of load URCU_TLS(rcu_reader).ctr before store
341 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
342 * model easier to understand. It does not have a big performance impact
343 * anyway, given this is the write-side.
347 /* Switch parity: 0 -> 1, 1 -> 0 */
348 CMM_STORE_SHARED(rcu_gp_ctr
, rcu_gp_ctr
^ RCU_GP_CTR
);
351 * Must commit rcu_gp_ctr update to memory before waiting for
352 * quiescent state. Failure to do so could result in the writer
353 * waiting forever while new readers are always accessing data
354 * (no progress). Enforce compiler-order of store to rcu_gp_ctr
355 * before load URCU_TLS(rcu_reader).ctr.
360 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
361 * model easier to understand. It does not have a big performance impact
362 * anyway, given this is the write-side.
367 * Wait for readers to observe new parity or be quiescent.
369 wait_for_readers(&cur_snap_readers
, NULL
, &qsreaders
);
372 * Put quiescent reader list back into registry.
374 cds_list_splice(&qsreaders
, ®istry
);
376 mutex_unlock(&rcu_gp_lock
);
378 /* Wake all waiters in our stack head, excluding ourself. */
379 cds_wfs_for_each_blocking_safe(gp_waiters_head
, waiters_iter
,
381 struct gp_waiters_thread
*wt
;
383 wt
= caa_container_of(waiters_iter
,
384 struct gp_waiters_thread
, node
);
385 if (wt
== &gp_waiters_thread
)
387 urcu_adaptative_wake_up(&wt
->wait_futex
);
392 * Finish waiting for reader threads before letting the old ptr being
400 #else /* !(CAA_BITS_PER_LONG < 64) */
401 void synchronize_rcu(void)
403 CDS_LIST_HEAD(qsreaders
);
404 unsigned long was_online
;
405 struct gp_waiters_thread gp_waiters_thread
;
406 struct cds_wfs_head
*gp_waiters_head
;
407 struct cds_wfs_node
*waiters_iter
, *waiters_iter_n
;
409 was_online
= URCU_TLS(rcu_reader
).ctr
;
412 * Mark the writer thread offline to make sure we don't wait for
413 * our own quiescent state. This allows using synchronize_rcu()
414 * in threads registered as readers.
417 rcu_thread_offline();
422 * Add ourself to gp_waiters stack of threads awaiting to wait
423 * for a grace period. Proceed to perform the grace period only
424 * if we are the first thread added into the stack.
426 cds_wfs_node_init(&gp_waiters_thread
.node
);
427 gp_waiters_thread
.wait_futex
= AWAKE_WAITING
;
428 if (cds_wfs_push(&gp_waiters
, &gp_waiters_thread
.node
) != 0) {
429 /* Not first in stack: will be awakened by another thread. */
430 urcu_adaptative_busy_wait(&gp_waiters_thread
.wait_futex
);
434 mutex_lock(&rcu_gp_lock
);
437 * Pop all waiters into our local stack head.
439 gp_waiters_head
= __cds_wfs_pop_all(&gp_waiters
);
441 if (cds_list_empty(®istry
))
444 /* Increment current G.P. */
445 CMM_STORE_SHARED(rcu_gp_ctr
, rcu_gp_ctr
+ RCU_GP_CTR
);
448 * Must commit rcu_gp_ctr update to memory before waiting for
449 * quiescent state. Failure to do so could result in the writer
450 * waiting forever while new readers are always accessing data
451 * (no progress). Enforce compiler-order of store to rcu_gp_ctr
452 * before load URCU_TLS(rcu_reader).ctr.
457 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
458 * model easier to understand. It does not have a big performance impact
459 * anyway, given this is the write-side.
464 * Wait for readers to observe new count of be quiescent.
466 wait_for_readers(®istry
, NULL
, &qsreaders
);
469 * Put quiescent reader list back into registry.
471 cds_list_splice(&qsreaders
, ®istry
);
473 mutex_unlock(&rcu_gp_lock
);
475 /* Wake all waiters in our stack head, excluding ourself. */
476 cds_wfs_for_each_blocking_safe(gp_waiters_head
, waiters_iter
,
478 struct gp_waiters_thread
*wt
;
480 wt
= caa_container_of(waiters_iter
,
481 struct gp_waiters_thread
, node
);
482 if (wt
== &gp_waiters_thread
)
484 urcu_adaptative_wake_up(&wt
->wait_futex
);
493 #endif /* !(CAA_BITS_PER_LONG < 64) */
496 * library wrappers to be used by non-LGPL compatible source code.
499 void rcu_read_lock(void)
504 void rcu_read_unlock(void)
509 void rcu_quiescent_state(void)
511 _rcu_quiescent_state();
514 void rcu_thread_offline(void)
516 _rcu_thread_offline();
519 void rcu_thread_online(void)
521 _rcu_thread_online();
524 void rcu_register_thread(void)
526 URCU_TLS(rcu_reader
).tid
= pthread_self();
527 assert(URCU_TLS(rcu_reader
).ctr
== 0);
529 mutex_lock(&rcu_gp_lock
);
530 cds_list_add(&URCU_TLS(rcu_reader
).node
, ®istry
);
531 mutex_unlock(&rcu_gp_lock
);
532 _rcu_thread_online();
535 void rcu_unregister_thread(void)
538 * We have to make the thread offline otherwise we end up dealocking
539 * with a waiting writer.
541 _rcu_thread_offline();
542 mutex_lock(&rcu_gp_lock
);
543 cds_list_del(&URCU_TLS(rcu_reader
).node
);
544 mutex_unlock(&rcu_gp_lock
);
550 * Assertion disabled because call_rcu threads are now rcu
551 * readers, and left running at exit.
552 * assert(cds_list_empty(®istry));
556 DEFINE_RCU_FLAVOR(rcu_flavor
);
558 #include "urcu-call-rcu-impl.h"
559 #include "urcu-defer-impl.h"