urcu-qsbr: batch concurrent synchronize_rcu()
[urcu.git] / urcu-qsbr.c
CommitLineData
9f1621ca 1/*
7ac06cef 2 * urcu-qsbr.c
9f1621ca 3 *
7ac06cef 4 * Userspace RCU QSBR library
9f1621ca 5 *
6982d6d7 6 * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9f1621ca
MD
7 * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License as published by the Free Software Foundation; either
12 * version 2.1 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Lesser General Public License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public
20 * License along with this library; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 *
23 * IBM's contributions to this file may be relicensed under LGPLv2 or later.
24 */
25
c1d2c60b 26#define _GNU_SOURCE
71c811bf 27#define _LGPL_SOURCE
9f1621ca
MD
28#include <stdio.h>
29#include <pthread.h>
30#include <signal.h>
31#include <assert.h>
32#include <stdlib.h>
6d841bc2 33#include <stdint.h>
9f1621ca
MD
34#include <string.h>
35#include <errno.h>
36#include <poll.h>
37
d73fb81f 38#include "urcu/wfcqueue.h"
6362f68f 39#include "urcu/wfstack.h"
57760d44 40#include "urcu/map/urcu-qsbr.h"
727f819d 41#define BUILD_QSBR_LIB
af7c2dbe 42#include "urcu/static/urcu-qsbr.h"
618b2595 43#include "urcu-pointer.h"
bd252a04 44#include "urcu/tls-compat.h"
71c811bf 45
4a6d7378
MD
46#include "urcu-die.h"
47
9f1621ca 48/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
71c811bf 49#undef _LGPL_SOURCE
7ac06cef 50#include "urcu-qsbr.h"
71c811bf 51#define _LGPL_SOURCE
9f1621ca 52
f6d18c64
MD
53void __attribute__((destructor)) rcu_exit(void);
54
6abb4bd5 55static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER;
9f1621ca 56
1de4df4b 57int32_t rcu_gp_futex;
bc6c15bb 58
9f1621ca
MD
59/*
60 * Global grace period counter.
61 */
02be5561 62unsigned long rcu_gp_ctr = RCU_GP_ONLINE;
9f1621ca 63
408f6d92
PB
64/*
65 * Active attempts to check for reader Q.S. before calling futex().
66 */
67#define RCU_QS_ACTIVE_ATTEMPTS 100
68
9f1621ca
MD
69/*
70 * Written to only by each individual reader. Read by both the reader and the
71 * writers.
72 */
bd252a04 73DEFINE_URCU_TLS(struct rcu_reader, rcu_reader);
9f1621ca
MD
74
75#ifdef DEBUG_YIELD
1de4df4b
MD
76unsigned int rcu_yield_active;
77DEFINE_URCU_TLS(unsigned int, rcu_rand_yield);
9f1621ca
MD
78#endif
79
16aa9ee8 80static CDS_LIST_HEAD(registry);
9f1621ca 81
6362f68f
MD
82/*
83 * Number of busy-loop attempts before waiting on futex for grace period
84 * batching.
85 */
86#define RCU_AWAKE_ATTEMPTS 1000
87
88enum adapt_wakeup_state {
89 /* AWAKE_WAITING is compared directly (futex compares it). */
90 AWAKE_WAITING = 0,
91 /* non-zero are used as masks. */
92 AWAKE_WAKEUP = (1 << 0),
93 AWAKE_AWAKENED = (1 << 1),
94 AWAKE_TEARDOWN = (1 << 2),
95};
96
97struct gp_waiters_thread {
98 struct cds_wfs_node node;
99 int32_t wait_futex;
100};
101
102/*
103 * Stack keeping threads awaiting to wait for a grace period. Contains
104 * struct gp_waiters_thread objects.
105 */
106static struct cds_wfs_stack gp_waiters = {
107 .head = CDS_WFS_END,
108 .lock = PTHREAD_MUTEX_INITIALIZER,
109};
110
6abb4bd5 111static void mutex_lock(pthread_mutex_t *mutex)
9f1621ca
MD
112{
113 int ret;
114
115#ifndef DISTRUST_SIGNALS_EXTREME
6abb4bd5 116 ret = pthread_mutex_lock(mutex);
4a6d7378
MD
117 if (ret)
118 urcu_die(ret);
9f1621ca 119#else /* #ifndef DISTRUST_SIGNALS_EXTREME */
6abb4bd5 120 while ((ret = pthread_mutex_trylock(mutex)) != 0) {
4a6d7378
MD
121 if (ret != EBUSY && ret != EINTR)
122 urcu_die(ret);
9f1621ca
MD
123 poll(NULL,0,10);
124 }
125#endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
126}
127
6abb4bd5 128static void mutex_unlock(pthread_mutex_t *mutex)
9f1621ca
MD
129{
130 int ret;
131
6abb4bd5 132 ret = pthread_mutex_unlock(mutex);
4a6d7378
MD
133 if (ret)
134 urcu_die(ret);
9f1621ca
MD
135}
136
bc6c15bb
MD
137/*
138 * synchronize_rcu() waiting. Single thread.
139 */
4d703340 140static void wait_gp(void)
bc6c15bb 141{
4d703340 142 /* Read reader_gp before read futex */
5481ddb3 143 cmm_smp_rmb();
1de4df4b
MD
144 if (uatomic_read(&rcu_gp_futex) == -1)
145 futex_noasync(&rcu_gp_futex, FUTEX_WAIT, -1,
4d703340 146 NULL, NULL, 0);
bc6c15bb
MD
147}
148
6362f68f
MD
149/*
150 * Note: urcu_adaptative_wake_up needs "value" to stay allocated
151 * throughout its execution. In this scheme, the waiter owns the futex
152 * memory, and we only allow it to free this memory when it receives the
153 * AWAKE_TEARDOWN flag.
154 */
155static void urcu_adaptative_wake_up(int32_t *value)
156{
157 cmm_smp_mb();
158 assert(uatomic_read(value) == AWAKE_WAITING);
159 uatomic_set(value, AWAKE_WAKEUP);
160 if (!(uatomic_read(value) & AWAKE_AWAKENED))
161 futex_noasync(value, FUTEX_WAKE, 1, NULL, NULL, 0);
162 /* Allow teardown of "value" memory. */
163 uatomic_or(value, AWAKE_TEARDOWN);
164}
165
166/*
167 * Caller must initialize "value" to AWAKE_WAITING before passing its
168 * memory to waker thread.
169 */
170static void urcu_adaptative_busy_wait(int32_t *value)
171{
172 unsigned int i;
173
174 /* Load and test condition before read futex */
175 cmm_smp_rmb();
176 for (i = 0; i < RCU_AWAKE_ATTEMPTS; i++) {
177 if (uatomic_read(value) != AWAKE_WAITING)
178 goto skip_futex_wait;
179 caa_cpu_relax();
180 }
181 futex_noasync(value, FUTEX_WAIT, AWAKE_WAITING, NULL, NULL, 0);
182skip_futex_wait:
183
184 /* Tell waker thread than we are awakened. */
185 uatomic_or(value, AWAKE_AWAKENED);
186
187 /*
188 * Wait until waker thread lets us know it's ok to tear down
189 * memory allocated for value.
190 */
191 for (i = 0; i < RCU_AWAKE_ATTEMPTS; i++) {
192 if (uatomic_read(value) & AWAKE_TEARDOWN)
193 break;
194 caa_cpu_relax();
195 }
196 while (!(uatomic_read(value) & AWAKE_TEARDOWN))
197 poll(NULL, 0, 10);
198 assert(uatomic_read(value) & AWAKE_TEARDOWN);
199}
200
708d89f0
MD
201static void wait_for_readers(struct cds_list_head *input_readers,
202 struct cds_list_head *cur_snap_readers,
203 struct cds_list_head *qsreaders)
9f1621ca 204{
4d703340 205 int wait_loops = 0;
02be5561 206 struct rcu_reader *index, *tmp;
9f1621ca 207
9f1621ca 208 /*
f6b42f9c
MD
209 * Wait for each thread URCU_TLS(rcu_reader).ctr to either
210 * indicate quiescence (offline), or for them to observe the
211 * current rcu_gp_ctr value.
9f1621ca 212 */
4d703340
MD
213 for (;;) {
214 wait_loops++;
83a2c421 215 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
1de4df4b 216 uatomic_set(&rcu_gp_futex, -1);
83a2c421
PB
217 /*
218 * Write futex before write waiting (the other side
219 * reads them in the opposite order).
220 */
221 cmm_smp_wmb();
708d89f0 222 cds_list_for_each_entry(index, input_readers, node) {
83a2c421
PB
223 _CMM_STORE_SHARED(index->waiting, 1);
224 }
4d703340 225 /* Write futex before read reader_gp */
5481ddb3 226 cmm_smp_mb();
4d703340 227 }
708d89f0
MD
228 cds_list_for_each_entry_safe(index, tmp, input_readers, node) {
229 switch (rcu_reader_state(&index->ctr)) {
230 case RCU_READER_ACTIVE_CURRENT:
231 if (cur_snap_readers) {
232 cds_list_move(&index->node,
233 cur_snap_readers);
234 break;
235 }
236 /* Fall-through */
237 case RCU_READER_INACTIVE:
238 cds_list_move(&index->node, qsreaders);
239 break;
240 case RCU_READER_ACTIVE_OLD:
241 /*
242 * Old snapshot. Leaving node in
243 * input_readers will make us busy-loop
244 * until the snapshot becomes current or
245 * the reader becomes inactive.
246 */
247 break;
248 }
4d703340 249 }
bc6c15bb 250
708d89f0 251 if (cds_list_empty(input_readers)) {
83a2c421 252 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
4d703340 253 /* Read reader_gp before write futex */
5481ddb3 254 cmm_smp_mb();
1de4df4b 255 uatomic_set(&rcu_gp_futex, 0);
4d703340
MD
256 }
257 break;
258 } else {
83a2c421 259 if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) {
4d703340 260 wait_gp();
bc6c15bb 261 } else {
9f1621ca 262#ifndef HAS_INCOHERENT_CACHES
06f22bdb 263 caa_cpu_relax();
9f1621ca 264#else /* #ifndef HAS_INCOHERENT_CACHES */
5481ddb3 265 cmm_smp_mb();
9f1621ca 266#endif /* #else #ifndef HAS_INCOHERENT_CACHES */
bc6c15bb
MD
267 }
268 }
9f1621ca
MD
269 }
270}
271
47d2f29e
MD
272/*
273 * Using a two-subphases algorithm for architectures with smaller than 64-bit
274 * long-size to ensure we do not encounter an overflow bug.
275 */
276
b39e1761 277#if (CAA_BITS_PER_LONG < 64)
47d2f29e
MD
278void synchronize_rcu(void)
279{
708d89f0
MD
280 CDS_LIST_HEAD(cur_snap_readers);
281 CDS_LIST_HEAD(qsreaders);
bc49c323 282 unsigned long was_online;
6362f68f
MD
283 struct gp_waiters_thread gp_waiters_thread;
284 struct cds_wfs_head *gp_waiters_head;
285 struct cds_wfs_node *waiters_iter, *waiters_iter_n;
bc49c323 286
bd252a04 287 was_online = URCU_TLS(rcu_reader).ctr;
bc49c323 288
47d2f29e 289 /* All threads should read qparity before accessing data structure
27b940e7
PB
290 * where new ptr points to. In the "then" case, rcu_thread_offline
291 * includes a memory barrier.
292 *
bc49c323 293 * Mark the writer thread offline to make sure we don't wait for
5e77fc1f
PM
294 * our own quiescent state. This allows using synchronize_rcu()
295 * in threads registered as readers.
bc49c323 296 */
27b940e7
PB
297 if (was_online)
298 rcu_thread_offline();
299 else
300 cmm_smp_mb();
bc49c323 301
6362f68f
MD
302 /*
303 * Add ourself to gp_waiters stack of threads awaiting to wait
304 * for a grace period. Proceed to perform the grace period only
305 * if we are the first thread added into the stack.
306 */
307 cds_wfs_node_init(&gp_waiters_thread.node);
308 gp_waiters_thread.wait_futex = AWAKE_WAITING;
309 if (cds_wfs_push(&gp_waiters, &gp_waiters_node) != 0) {
310 /* Not first in stack: will be awakened by another thread. */
311 urcu_adaptative_busy_wait(&gp_waiters_thread.wait_futex);
312 goto gp_end;
313 }
314
6abb4bd5 315 mutex_lock(&rcu_gp_lock);
47d2f29e 316
6362f68f
MD
317 /*
318 * Pop all waiters into our local stack head.
319 */
320 gp_waiters_head = __cds_wfs_pop_all(&gp_waiters);
321
16aa9ee8 322 if (cds_list_empty(&registry))
2dfb8b5e 323 goto out;
47d2f29e
MD
324
325 /*
f6b42f9c 326 * Wait for readers to observe original parity or be quiescent.
47d2f29e 327 */
708d89f0 328 wait_for_readers(&registry, &cur_snap_readers, &qsreaders);
47d2f29e
MD
329
330 /*
f6b42f9c
MD
331 * Must finish waiting for quiescent state for original parity
332 * before committing next rcu_gp_ctr update to memory. Failure
333 * to do so could result in the writer waiting forever while new
5e77fc1f 334 * readers are always accessing data (no progress). Enforce
f6b42f9c
MD
335 * compiler-order of load URCU_TLS(rcu_reader).ctr before store
336 * to rcu_gp_ctr.
47d2f29e 337 */
5481ddb3 338 cmm_barrier();
47d2f29e 339
47d2f29e 340 /*
5481ddb3 341 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
2dfb8b5e
MD
342 * model easier to understand. It does not have a big performance impact
343 * anyway, given this is the write-side.
47d2f29e 344 */
5481ddb3 345 cmm_smp_mb();
47d2f29e 346
f6b42f9c
MD
347 /* Switch parity: 0 -> 1, 1 -> 0 */
348 CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
349
47d2f29e 350 /*
f6b42f9c
MD
351 * Must commit rcu_gp_ctr update to memory before waiting for
352 * quiescent state. Failure to do so could result in the writer
353 * waiting forever while new readers are always accessing data
354 * (no progress). Enforce compiler-order of store to rcu_gp_ctr
355 * before load URCU_TLS(rcu_reader).ctr.
47d2f29e 356 */
f6b42f9c
MD
357 cmm_barrier();
358
359 /*
360 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
361 * model easier to understand. It does not have a big performance impact
362 * anyway, given this is the write-side.
363 */
364 cmm_smp_mb();
365
366 /*
367 * Wait for readers to observe new parity or be quiescent.
368 */
708d89f0
MD
369 wait_for_readers(&cur_snap_readers, NULL, &qsreaders);
370
371 /*
372 * Put quiescent reader list back into registry.
373 */
374 cds_list_splice(&qsreaders, &registry);
2dfb8b5e 375out:
6abb4bd5 376 mutex_unlock(&rcu_gp_lock);
47d2f29e 377
6362f68f
MD
378 /* Wake all waiters in our stack head, excluding ourself. */
379 cds_wfs_for_each_blocking_safe(gp_waiters_head, waiters_iter,
380 waiters_iter_n) {
381 struct gp_waiters_thread *wt;
382
383 wt = caa_container_of(waiters_iter,
384 struct gp_waiters_thread, node);
385 if (wt == &gp_waiters_thread)
386 continue;
387 urcu_adaptative_wake_up(&wt->wait_futex);
388 }
389
390gp_end:
bc49c323
MD
391 /*
392 * Finish waiting for reader threads before letting the old ptr being
47d2f29e
MD
393 * freed.
394 */
bc49c323 395 if (was_online)
27b940e7
PB
396 rcu_thread_online();
397 else
398 cmm_smp_mb();
47d2f29e 399}
b39e1761 400#else /* !(CAA_BITS_PER_LONG < 64) */
9f1621ca
MD
401void synchronize_rcu(void)
402{
708d89f0 403 CDS_LIST_HEAD(qsreaders);
f0f7dbdd 404 unsigned long was_online;
6362f68f
MD
405 struct gp_waiters_thread gp_waiters_thread;
406 struct cds_wfs_head *gp_waiters_head;
407 struct cds_wfs_node *waiters_iter, *waiters_iter_n;
ff2f67a0 408
bd252a04 409 was_online = URCU_TLS(rcu_reader).ctr;
ff2f67a0
MD
410
411 /*
412 * Mark the writer thread offline to make sure we don't wait for
5e77fc1f
PM
413 * our own quiescent state. This allows using synchronize_rcu()
414 * in threads registered as readers.
ff2f67a0 415 */
27b940e7
PB
416 if (was_online)
417 rcu_thread_offline();
418 else
419 cmm_smp_mb();
ff2f67a0 420
6362f68f
MD
421 /*
422 * Add ourself to gp_waiters stack of threads awaiting to wait
423 * for a grace period. Proceed to perform the grace period only
424 * if we are the first thread added into the stack.
425 */
426 cds_wfs_node_init(&gp_waiters_thread.node);
427 gp_waiters_thread.wait_futex = AWAKE_WAITING;
428 if (cds_wfs_push(&gp_waiters, &gp_waiters_thread.node) != 0) {
429 /* Not first in stack: will be awakened by another thread. */
430 urcu_adaptative_busy_wait(&gp_waiters_thread.wait_futex);
431 goto gp_end;
432 }
433
6abb4bd5 434 mutex_lock(&rcu_gp_lock);
6362f68f
MD
435
436 /*
437 * Pop all waiters into our local stack head.
438 */
439 gp_waiters_head = __cds_wfs_pop_all(&gp_waiters);
440
16aa9ee8 441 if (cds_list_empty(&registry))
2dfb8b5e 442 goto out;
f6b42f9c
MD
443
444 /* Increment current G.P. */
445 CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
446
447 /*
448 * Must commit rcu_gp_ctr update to memory before waiting for
449 * quiescent state. Failure to do so could result in the writer
450 * waiting forever while new readers are always accessing data
451 * (no progress). Enforce compiler-order of store to rcu_gp_ctr
452 * before load URCU_TLS(rcu_reader).ctr.
453 */
454 cmm_barrier();
455
456 /*
457 * Adding a cmm_smp_mb() which is _not_ formally required, but makes the
458 * model easier to understand. It does not have a big performance impact
459 * anyway, given this is the write-side.
460 */
461 cmm_smp_mb();
462
463 /*
464 * Wait for readers to observe new count of be quiescent.
465 */
708d89f0
MD
466 wait_for_readers(&registry, NULL, &qsreaders);
467
468 /*
469 * Put quiescent reader list back into registry.
470 */
471 cds_list_splice(&qsreaders, &registry);
2dfb8b5e 472out:
6abb4bd5 473 mutex_unlock(&rcu_gp_lock);
ff2f67a0 474
6362f68f
MD
475 /* Wake all waiters in our stack head, excluding ourself. */
476 cds_wfs_for_each_blocking_safe(gp_waiters_head, waiters_iter,
477 waiters_iter_n) {
478 struct gp_waiters_thread *wt;
479
480 wt = caa_container_of(waiters_iter,
481 struct gp_waiters_thread, node);
482 if (wt == &gp_waiters_thread)
483 continue;
484 urcu_adaptative_wake_up(&wt->wait_futex);
485 }
486
487gp_end:
ff2f67a0 488 if (was_online)
27b940e7
PB
489 rcu_thread_online();
490 else
491 cmm_smp_mb();
9f1621ca 492}
b39e1761 493#endif /* !(CAA_BITS_PER_LONG < 64) */
9f1621ca
MD
494
495/*
496 * library wrappers to be used by non-LGPL compatible source code.
497 */
498
499void rcu_read_lock(void)
500{
501 _rcu_read_lock();
502}
503
504void rcu_read_unlock(void)
505{
506 _rcu_read_unlock();
507}
508
7ac06cef
MD
509void rcu_quiescent_state(void)
510{
511 _rcu_quiescent_state();
512}
513
514void rcu_thread_offline(void)
515{
516 _rcu_thread_offline();
517}
518
519void rcu_thread_online(void)
520{
521 _rcu_thread_online();
522}
523
9f1621ca
MD
524void rcu_register_thread(void)
525{
bd252a04
MD
526 URCU_TLS(rcu_reader).tid = pthread_self();
527 assert(URCU_TLS(rcu_reader).ctr == 0);
4f8e3380 528
6abb4bd5 529 mutex_lock(&rcu_gp_lock);
bd252a04 530 cds_list_add(&URCU_TLS(rcu_reader).node, &registry);
6abb4bd5 531 mutex_unlock(&rcu_gp_lock);
5f373c84 532 _rcu_thread_online();
9f1621ca
MD
533}
534
535void rcu_unregister_thread(void)
536{
76f3022f
MD
537 /*
538 * We have to make the thread offline otherwise we end up dealocking
539 * with a waiting writer.
540 */
541 _rcu_thread_offline();
6abb4bd5 542 mutex_lock(&rcu_gp_lock);
bd252a04 543 cds_list_del(&URCU_TLS(rcu_reader).node);
6abb4bd5 544 mutex_unlock(&rcu_gp_lock);
9f1621ca 545}
f6d18c64
MD
546
547void rcu_exit(void)
548{
01cadde4
MD
549 /*
550 * Assertion disabled because call_rcu threads are now rcu
551 * readers, and left running at exit.
552 * assert(cds_list_empty(&registry));
553 */
f6d18c64 554}
5e77fc1f 555
5e6b23a6 556DEFINE_RCU_FLAVOR(rcu_flavor);
541d828d 557
5e77fc1f 558#include "urcu-call-rcu-impl.h"
0376e7b2 559#include "urcu-defer-impl.h"
This page took 0.061364 seconds and 4 git commands to generate.