#include <syscall.h>
#include <unistd.h>
+#include "urcu/urcu-futex.h"
#include "urcu-defer-static.h"
/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
#include "urcu-defer.h"
-#define futex(...) syscall(__NR_futex, __VA_ARGS__)
-#define FUTEX_WAIT 0
-#define FUTEX_WAKE 1
-
-void __attribute__((destructor)) urcu_defer_exit(void);
+void __attribute__((destructor)) rcu_defer_exit(void);
extern void synchronize_rcu(void);
/*
- * urcu_defer_mutex nests inside defer_thread_mutex.
+ * rcu_defer_mutex nests inside defer_thread_mutex.
*/
-static pthread_mutex_t urcu_defer_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t rcu_defer_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t defer_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
static int defer_thread_futex;
* the reclamation tread.
*/
static struct defer_queue __thread defer_queue;
-
-/* Thread IDs of registered deferers */
-#define INIT_NUM_THREADS 4
-
-struct deferer_registry {
- pthread_t tid;
- struct defer_queue *defer_queue;
- unsigned long last_head;
-};
-
-static struct deferer_registry *registry;
-static int num_deferers, alloc_deferers;
-
+static LIST_HEAD(registry);
static pthread_t tid_defer;
-static void internal_urcu_lock(pthread_mutex_t *mutex)
+static void mutex_lock(pthread_mutex_t *mutex)
{
int ret;
#endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
}
-static void internal_urcu_unlock(pthread_mutex_t *mutex)
+static void mutex_unlock(pthread_mutex_t *mutex)
{
int ret;
*/
static void wake_up_defer(void)
{
- if (unlikely(atomic_read(&defer_thread_futex) == -1)) {
- atomic_set(&defer_thread_futex, 0);
- futex(&defer_thread_futex, FUTEX_WAKE, 0,
+ if (unlikely(uatomic_read(&defer_thread_futex) == -1)) {
+ uatomic_set(&defer_thread_futex, 0);
+ futex_noasync(&defer_thread_futex, FUTEX_WAKE, 1,
NULL, NULL, 0);
}
}
static unsigned long rcu_defer_num_callbacks(void)
{
unsigned long num_items = 0, head;
- struct deferer_registry *index;
+ struct defer_queue *index;
- internal_urcu_lock(&urcu_defer_mutex);
- for (index = registry; index < registry + num_deferers; index++) {
- head = LOAD_SHARED(index->defer_queue->head);
- num_items += head - index->defer_queue->tail;
+ mutex_lock(&rcu_defer_mutex);
+ list_for_each_entry(index, ®istry, list) {
+ head = LOAD_SHARED(index->head);
+ num_items += head - index->tail;
}
- internal_urcu_unlock(&urcu_defer_mutex);
+ mutex_unlock(&rcu_defer_mutex);
return num_items;
}
*/
static void wait_defer(void)
{
- atomic_dec(&defer_thread_futex);
+ uatomic_dec(&defer_thread_futex);
smp_mb(); /* Write futex before read queue */
if (rcu_defer_num_callbacks()) {
smp_mb(); /* Read queue before write futex */
/* Callbacks are queued, don't wait. */
- atomic_set(&defer_thread_futex, 0);
+ uatomic_set(&defer_thread_futex, 0);
} else {
smp_rmb(); /* Read queue before read futex */
- if (atomic_read(&defer_thread_futex) == -1)
- futex(&defer_thread_futex, FUTEX_WAIT, -1,
+ if (uatomic_read(&defer_thread_futex) == -1)
+ futex_noasync(&defer_thread_futex, FUTEX_WAIT, -1,
NULL, NULL, 0);
}
}
rcu_defer_barrier_queue(&defer_queue, head);
}
-
void rcu_defer_barrier_thread(void)
{
- internal_urcu_lock(&urcu_defer_mutex);
+ mutex_lock(&rcu_defer_mutex);
_rcu_defer_barrier_thread();
- internal_urcu_unlock(&urcu_defer_mutex);
+ mutex_unlock(&rcu_defer_mutex);
}
/*
void rcu_defer_barrier(void)
{
- struct deferer_registry *index;
+ struct defer_queue *index;
unsigned long num_items = 0;
- if (!registry)
+ if (list_empty(®istry))
return;
- internal_urcu_lock(&urcu_defer_mutex);
- for (index = registry; index < registry + num_deferers; index++) {
- index->last_head = LOAD_SHARED(index->defer_queue->head);
- num_items += index->last_head - index->defer_queue->tail;
+ mutex_lock(&rcu_defer_mutex);
+ list_for_each_entry(index, ®istry, list) {
+ index->last_head = LOAD_SHARED(index->head);
+ num_items += index->last_head - index->tail;
}
if (likely(!num_items)) {
/*
goto end;
}
synchronize_rcu();
- for (index = registry; index < registry + num_deferers; index++)
- rcu_defer_barrier_queue(index->defer_queue,
- index->last_head);
+ list_for_each_entry(index, ®istry, list)
+ rcu_defer_barrier_queue(index, index->last_head);
end:
- internal_urcu_unlock(&urcu_defer_mutex);
+ mutex_unlock(&rcu_defer_mutex);
}
/*
- * _rcu_defer_queue - Queue a RCU callback.
+ * _defer_rcu - Queue a RCU callback.
*/
-void _rcu_defer_queue(void (*fct)(void *p), void *p)
+void _defer_rcu(void (*fct)(void *p), void *p)
{
unsigned long head, tail;
tail = LOAD_SHARED(defer_queue.tail);
/*
- * If queue is full, empty it ourself.
+ * If queue is full, or reached threshold. Empty queue ourself.
* Worse-case: must allow 2 supplementary entries for fct pointer.
*/
- if (unlikely(head - tail >= DEFER_QUEUE_SIZE - 2)) {
+ if (unlikely(sync || (head - tail >= DEFER_QUEUE_SIZE - 2))) {
assert(head - tail <= DEFER_QUEUE_SIZE);
rcu_defer_barrier_thread();
assert(head - LOAD_SHARED(defer_queue.tail) == 0);
* library wrappers to be used by non-LGPL compatible source code.
*/
-void rcu_defer_queue(void (*fct)(void *p), void *p)
-{
- _rcu_defer_queue(fct, p);
-}
-
-static void rcu_add_deferer(pthread_t id)
-{
- struct deferer_registry *oldarray;
-
- if (!registry) {
- alloc_deferers = INIT_NUM_THREADS;
- num_deferers = 0;
- registry =
- malloc(sizeof(struct deferer_registry) * alloc_deferers);
- }
- if (alloc_deferers < num_deferers + 1) {
- oldarray = registry;
- registry = malloc(sizeof(struct deferer_registry)
- * (alloc_deferers << 1));
- memcpy(registry, oldarray,
- sizeof(struct deferer_registry) * alloc_deferers);
- alloc_deferers <<= 1;
- free(oldarray);
- }
- registry[num_deferers].tid = id;
- /* reference to the TLS of _this_ deferer thread. */
- registry[num_deferers].defer_queue = &defer_queue;
- registry[num_deferers].last_head = 0;
- num_deferers++;
-}
-
-/*
- * Never shrink (implementation limitation).
- * This is O(nb threads). Eventually use a hash table.
- */
-static void rcu_remove_deferer(pthread_t id)
+void defer_rcu(void (*fct)(void *p), void *p)
{
- struct deferer_registry *index;
-
- assert(registry != NULL);
- for (index = registry; index < registry + num_deferers; index++) {
- if (pthread_equal(index->tid, id)) {
- memcpy(index, ®istry[num_deferers - 1],
- sizeof(struct deferer_registry));
- registry[num_deferers - 1].tid = 0;
- registry[num_deferers - 1].defer_queue = NULL;
- registry[num_deferers - 1].last_head = 0;
- num_deferers--;
- return;
- }
- }
- /* Hrm not found, forgot to register ? */
- assert(0);
+ _defer_rcu(fct, p);
}
static void start_defer_thread(void)
{
int ret;
- ret = pthread_create(&tid_defer, NULL, thr_defer,
- NULL);
+ ret = pthread_create(&tid_defer, NULL, thr_defer, NULL);
assert(!ret);
}
void rcu_defer_register_thread(void)
{
- int deferers;
+ int was_empty;
- internal_urcu_lock(&defer_thread_mutex);
- internal_urcu_lock(&urcu_defer_mutex);
+ assert(defer_queue.last_head == 0);
+ assert(defer_queue.q == NULL);
defer_queue.q = malloc(sizeof(void *) * DEFER_QUEUE_SIZE);
- rcu_add_deferer(pthread_self());
- deferers = num_deferers;
- internal_urcu_unlock(&urcu_defer_mutex);
- if (deferers == 1)
+ mutex_lock(&defer_thread_mutex);
+ mutex_lock(&rcu_defer_mutex);
+ was_empty = list_empty(®istry);
+ list_add(&defer_queue.list, ®istry);
+ mutex_unlock(&rcu_defer_mutex);
+
+ if (was_empty)
start_defer_thread();
- internal_urcu_unlock(&defer_thread_mutex);
+ mutex_unlock(&defer_thread_mutex);
}
void rcu_defer_unregister_thread(void)
{
- int deferers;
+ int is_empty;
- internal_urcu_lock(&defer_thread_mutex);
- internal_urcu_lock(&urcu_defer_mutex);
- rcu_remove_deferer(pthread_self());
+ mutex_lock(&defer_thread_mutex);
+ mutex_lock(&rcu_defer_mutex);
+ list_del(&defer_queue.list);
_rcu_defer_barrier_thread();
free(defer_queue.q);
defer_queue.q = NULL;
- deferers = num_deferers;
- internal_urcu_unlock(&urcu_defer_mutex);
+ is_empty = list_empty(®istry);
+ mutex_unlock(&rcu_defer_mutex);
- if (deferers == 0)
+ if (is_empty)
stop_defer_thread();
- internal_urcu_unlock(&defer_thread_mutex);
+ mutex_unlock(&defer_thread_mutex);
}
-void urcu_defer_exit(void)
+void rcu_defer_exit(void)
{
- free(registry);
+ assert(list_empty(®istry));
}