From d0ec0ed2fcb5d67a28587dcb778606e64f5b7b83 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Tue, 30 May 2017 15:51:45 -0400 Subject: [PATCH] Use workqueue in rculfhash The RCU lock-free hash table currently requires that the destroy function should not be called from within RCU read-side critical sections. This is caused by the lazy resize, which uses the call_rcu worker thread, even though all it really needs is a workqueue/worker thread scheme. Use the new internal workqueue API instead of call_rcu in rculfhash to overcome this limitation. Signed-off-by: Mathieu Desnoyers --- include/urcu/map/urcu-bp.h | 5 + include/urcu/map/urcu-qsbr.h | 5 + include/urcu/map/urcu.h | 15 +++ include/urcu/rculfhash.h | 15 ++- src/rculfhash-internal.h | 2 +- src/rculfhash.c | 228 +++++++++++++++++++++++++---------- src/urcu-call-rcu-impl.h | 37 ++++++ src/urcu-flavor.h | 15 +++ 8 files changed, 254 insertions(+), 68 deletions(-) diff --git a/include/urcu/map/urcu-bp.h b/include/urcu/map/urcu-bp.h index 67ba5c3..1476924 100644 --- a/include/urcu/map/urcu-bp.h +++ b/include/urcu/map/urcu-bp.h @@ -77,4 +77,9 @@ #define rcu_yield_active rcu_yield_active_bp #define rcu_rand_yield rcu_rand_yield_bp +#define urcu_register_rculfhash_atfork \ + urcu_register_rculfhash_atfork_bp +#define urcu_unregister_rculfhash_atfork \ + urcu_unregister_rculfhash_atfork_bp + #endif /* _URCU_BP_MAP_H */ diff --git a/include/urcu/map/urcu-qsbr.h b/include/urcu/map/urcu-qsbr.h index 9e90e3c..bf38c82 100644 --- a/include/urcu/map/urcu-qsbr.h +++ b/include/urcu/map/urcu-qsbr.h @@ -76,4 +76,9 @@ #define rcu_flavor rcu_flavor_qsbr +#define urcu_register_rculfhash_atfork \ + urcu_register_rculfhash_atfork_qsbr +#define urcu_unregister_rculfhash_atfork \ + urcu_unregister_rculfhash_atfork_qsbr + #endif /* _URCU_QSBR_MAP_H */ diff --git a/include/urcu/map/urcu.h b/include/urcu/map/urcu.h index 449513e..b12fa74 100644 --- a/include/urcu/map/urcu.h +++ b/include/urcu/map/urcu.h @@ -80,6 +80,11 @@ #define rcu_flavor rcu_flavor_memb +#define urcu_register_rculfhash_atfork \ + urcu_register_rculfhash_atfork_memb +#define urcu_unregister_rculfhash_atfork \ + urcu_unregister_rculfhash_atfork_memb + #elif defined(RCU_SIGNAL) #define rcu_read_lock rcu_read_lock_sig @@ -122,6 +127,11 @@ #define rcu_flavor rcu_flavor_sig +#define urcu_register_rculfhash_atfork \ + urcu_register_rculfhash_atfork_sig +#define urcu_unregister_rculfhash_atfork \ + urcu_unregister_rculfhash_atfork_sig + #elif defined(RCU_MB) #define rcu_read_lock rcu_read_lock_mb @@ -164,6 +174,11 @@ #define rcu_flavor rcu_flavor_mb +#define urcu_register_rculfhash_atfork \ + urcu_register_rculfhash_atfork_mb +#define urcu_unregister_rculfhash_atfork \ + urcu_unregister_rculfhash_atfork_mb + #else #error "Undefined selection" diff --git a/include/urcu/rculfhash.h b/include/urcu/rculfhash.h index 9934422..0789aa5 100644 --- a/include/urcu/rculfhash.h +++ b/include/urcu/rculfhash.h @@ -176,10 +176,17 @@ struct cds_lfht *cds_lfht_new(unsigned long init_size, * need to be informed of the value passed to cds_lfht_new(). * * Return 0 on success, negative error value on error. - * Threads calling this API need to be registered RCU read-side threads. - * cds_lfht_destroy should *not* be called from a RCU read-side critical - * section. It should *not* be called from a call_rcu thread context - * neither. + + * Prior to liburcu 0.10: + * - Threads calling this API need to be registered RCU read-side + * threads. + * - cds_lfht_destroy should *not* be called from a RCU read-side + * critical section. It should *not* be called from a call_rcu thread + * context neither. + * + * Starting from liburcu 0.10, rculfhash implements its own worker + * thread to handle resize operations, which removes RCU requirements on + * cds_lfht_destroy. */ extern int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr); diff --git a/src/rculfhash-internal.h b/src/rculfhash-internal.h index d7cec95..0f8df97 100644 --- a/src/rculfhash-internal.h +++ b/src/rculfhash-internal.h @@ -82,7 +82,7 @@ struct cds_lfht { */ pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */ pthread_attr_t *resize_attr; /* Resize threads attributes */ - unsigned int in_progress_resize, in_progress_destroy; + unsigned int in_progress_destroy; unsigned long resize_target; int resize_initiated; diff --git a/src/rculfhash.c b/src/rculfhash.c index d7a1f23..0bd1384 100644 --- a/src/rculfhash.c +++ b/src/rculfhash.c @@ -64,7 +64,7 @@ * - Split-counters are used to keep track of the number of * nodes within the hash table for automatic resize triggering. * - Resize operation initiated by long chain detection is executed by a - * call_rcu thread, which keeps lock-freedom of add and remove. + * worker thread, which keeps lock-freedom of add and remove. * - Resize operations are protected by a mutex. * - The removal operation is split in two parts: first, a "removed" * flag is set in the next pointer within the node to remove. Then, @@ -276,6 +276,9 @@ #include #include #include +#include +#include "workqueue.h" +#include "urcu-die.h" /* * Split-counters lazily update the global counter each 1024 @@ -335,11 +338,11 @@ struct ht_items_count { } __attribute__((aligned(CAA_CACHE_LINE_SIZE))); /* - * rcu_resize_work: Contains arguments passed to RCU worker thread + * resize_work: Contains arguments passed to worker thread * responsible for performing lazy resize. */ -struct rcu_resize_work { - struct rcu_head head; +struct resize_work { + struct urcu_work work; struct cds_lfht *ht; }; @@ -356,6 +359,27 @@ struct partition_resize_work { unsigned long start, unsigned long len); }; +static struct urcu_workqueue *cds_lfht_workqueue; +static unsigned long cds_lfht_workqueue_user_count; + +/* + * Mutex ensuring mutual exclusion between workqueue initialization and + * fork handlers. cds_lfht_fork_mutex nests inside call_rcu_mutex. + */ +static pthread_mutex_t cds_lfht_fork_mutex = PTHREAD_MUTEX_INITIALIZER; + +static struct urcu_atfork cds_lfht_atfork; + +/* + * atfork handler nesting counters. Handle being registered to many urcu + * flavors, thus being possibly invoked more than once in the + * pthread_atfork list of callbacks. + */ +static int cds_lfht_workqueue_atfork_nesting; + +static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor); +static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor); + /* * Algorithm to reverse bits in a word by lookup table, extended to * 64-bit words. @@ -1224,14 +1248,12 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, if (start == 0 && nr_threads > 0) return; fallback: - ht->flavor->thread_online(); fct(ht, i, start, len); - ht->flavor->thread_offline(); } /* * Holding RCU read lock to protect _cds_lfht_add against memory - * reclaim that could be performed by other call_rcu worker threads (ABA + * reclaim that could be performed by other worker threads (ABA * problem). * * When we reach a certain length, we can split this population phase over @@ -1308,7 +1330,7 @@ void init_table(struct cds_lfht *ht, /* * Holding RCU read lock to protect _cds_lfht_remove against memory - * reclaim that could be performed by other call_rcu worker threads (ABA + * reclaim that could be performed by other worker threads (ABA * problem). * For a single level, we logically remove and garbage collect each node. * @@ -1320,8 +1342,9 @@ void init_table(struct cds_lfht *ht, * * Concurrent removal and add operations are helping us perform garbage * collection of logically removed nodes. We guarantee that all logically - * removed nodes have been garbage-collected (unlinked) before call_rcu is - * invoked to free a hole level of bucket nodes (after a grace period). + * removed nodes have been garbage-collected (unlinked) before work + * enqueue is invoked to free a hole level of bucket nodes (after a + * grace period). * * Logical removal and garbage collection can therefore be done in batch * or on a node-per-node basis, as long as the guarantee above holds. @@ -1513,6 +1536,9 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size, if (!max_nr_buckets || (max_nr_buckets & (max_nr_buckets - 1))) return NULL; + if (flags & CDS_LFHT_AUTO_RESIZE) + cds_lfht_init_worker(flavor); + min_nr_alloc_buckets = max(min_nr_alloc_buckets, MIN_TABLE_SIZE); init_size = max(init_size, MIN_TABLE_SIZE); max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets); @@ -1772,25 +1798,14 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) */ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) { - int ret, was_online; - - /* Wait for in-flight resize operations to complete */ - _CMM_STORE_SHARED(ht->in_progress_destroy, 1); - cmm_smp_mb(); /* Store destroy before load resize */ - was_online = ht->flavor->read_ongoing(); - if (was_online) - ht->flavor->thread_offline(); - /* Calling with RCU read-side held is an error. */ - if (ht->flavor->read_ongoing()) { - ret = -EINVAL; - if (was_online) - ht->flavor->thread_online(); - goto end; + int ret; + + if (ht->flags & CDS_LFHT_AUTO_RESIZE) { + /* Cancel ongoing resize operations. */ + _CMM_STORE_SHARED(ht->in_progress_destroy, 1); + /* Wait for in-flight resize operations to complete */ + urcu_workqueue_flush_queued_work(cds_lfht_workqueue); } - while (uatomic_read(&ht->in_progress_resize)) - poll(NULL, 0, 100); /* wait for 100ms */ - if (was_online) - ht->flavor->thread_online(); ret = cds_lfht_delete_bucket(ht); if (ret) return ret; @@ -1800,8 +1815,9 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) ret = pthread_mutex_destroy(&ht->resize_mutex); if (ret) ret = -EBUSY; + if (ht->flags & CDS_LFHT_AUTO_RESIZE) + cds_lfht_fini_worker(ht->flavor); poison_free(ht); -end: return ret; } @@ -1897,7 +1913,6 @@ void _do_cds_lfht_resize(struct cds_lfht *ht) * Resize table, re-do if the target size has changed under us. */ do { - assert(uatomic_read(&ht->in_progress_resize)); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) break; ht->resize_initiated = 1; @@ -1930,71 +1945,47 @@ void resize_target_update_count(struct cds_lfht *ht, void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size) { - int was_online; - - was_online = ht->flavor->read_ongoing(); - if (was_online) - ht->flavor->thread_offline(); - /* Calling with RCU read-side held is an error. */ - if (ht->flavor->read_ongoing()) { - static int print_once; - - if (!CMM_LOAD_SHARED(print_once)) - fprintf(stderr, "[error] rculfhash: cds_lfht_resize " - "called with RCU read-side lock held.\n"); - CMM_STORE_SHARED(print_once, 1); - assert(0); - goto end; - } resize_target_update_count(ht, new_size); CMM_STORE_SHARED(ht->resize_initiated, 1); pthread_mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); pthread_mutex_unlock(&ht->resize_mutex); -end: - if (was_online) - ht->flavor->thread_online(); } static -void do_resize_cb(struct rcu_head *head) +void do_resize_cb(struct urcu_work *work) { - struct rcu_resize_work *work = - caa_container_of(head, struct rcu_resize_work, head); - struct cds_lfht *ht = work->ht; + struct resize_work *resize_work = + caa_container_of(work, struct resize_work, work); + struct cds_lfht *ht = resize_work->ht; - ht->flavor->thread_offline(); + ht->flavor->register_thread(); pthread_mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); pthread_mutex_unlock(&ht->resize_mutex); - ht->flavor->thread_online(); + ht->flavor->unregister_thread(); poison_free(work); - cmm_smp_mb(); /* finish resize before decrement */ - uatomic_dec(&ht->in_progress_resize); } static void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht) { - struct rcu_resize_work *work; + struct resize_work *work; /* Store resize_target before read resize_initiated */ cmm_smp_mb(); if (!CMM_LOAD_SHARED(ht->resize_initiated)) { - uatomic_inc(&ht->in_progress_resize); - cmm_smp_mb(); /* increment resize count before load destroy */ if (CMM_LOAD_SHARED(ht->in_progress_destroy)) { - uatomic_dec(&ht->in_progress_resize); return; } work = malloc(sizeof(*work)); if (work == NULL) { dbg_printf("error allocating resize work, bailing out\n"); - uatomic_dec(&ht->in_progress_resize); return; } work->ht = ht; - ht->flavor->update_call_rcu(&work->head, do_resize_cb); + urcu_workqueue_queue_work(cds_lfht_workqueue, + &work->work, do_resize_cb); CMM_STORE_SHARED(ht->resize_initiated, 1); } } @@ -2045,3 +2036,114 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size, } __cds_lfht_resize_lazy_launch(ht); } + +static void mutex_lock(pthread_mutex_t *mutex) +{ + int ret; + +#ifndef DISTRUST_SIGNALS_EXTREME + ret = pthread_mutex_lock(mutex); + if (ret) + urcu_die(ret); +#else /* #ifndef DISTRUST_SIGNALS_EXTREME */ + while ((ret = pthread_mutex_trylock(mutex)) != 0) { + if (ret != EBUSY && ret != EINTR) + urcu_die(ret); + if (CMM_LOAD_SHARED(URCU_TLS(rcu_reader).need_mb)) { + cmm_smp_mb(); + _CMM_STORE_SHARED(URCU_TLS(rcu_reader).need_mb, 0); + cmm_smp_mb(); + } + (void) poll(NULL, 0, 10); + } +#endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */ +} + +static void mutex_unlock(pthread_mutex_t *mutex) +{ + int ret; + + ret = pthread_mutex_unlock(mutex); + if (ret) + urcu_die(ret); +} + +static void cds_lfht_before_fork(void *priv) +{ + if (cds_lfht_workqueue_atfork_nesting++) + return; + mutex_lock(&cds_lfht_fork_mutex); + if (!cds_lfht_workqueue) + return; + urcu_workqueue_pause_worker(cds_lfht_workqueue); +} + +static void cds_lfht_after_fork_parent(void *priv) +{ + if (--cds_lfht_workqueue_atfork_nesting) + return; + if (!cds_lfht_workqueue) + goto end; + urcu_workqueue_resume_worker(cds_lfht_workqueue); +end: + mutex_unlock(&cds_lfht_fork_mutex); +} + +static void cds_lfht_after_fork_child(void *priv) +{ + if (--cds_lfht_workqueue_atfork_nesting) + return; + if (!cds_lfht_workqueue) + goto end; + urcu_workqueue_create_worker(cds_lfht_workqueue); +end: + mutex_unlock(&cds_lfht_fork_mutex); +} + +static struct urcu_atfork cds_lfht_atfork = { + .before_fork = cds_lfht_before_fork, + .after_fork_parent = cds_lfht_after_fork_parent, + .after_fork_child = cds_lfht_after_fork_child, +}; + +/* Block all signals to ensure we don't disturb the application. */ +static void cds_lfht_worker_init(struct urcu_workqueue *workqueue, + void *priv) +{ + int ret; + sigset_t mask; + + /* Block signal for entire process, so only our thread processes it. */ + ret = sigfillset(&mask); + if (ret) + urcu_die(errno); + ret = pthread_sigmask(SIG_BLOCK, &mask, NULL); + if (ret) + urcu_die(ret); +} + +static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor) +{ + flavor->register_rculfhash_atfork(&cds_lfht_atfork); + + mutex_lock(&cds_lfht_fork_mutex); + if (cds_lfht_workqueue_user_count++) + goto end; + cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL, + NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL); +end: + mutex_unlock(&cds_lfht_fork_mutex); +} + +static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor) +{ + mutex_lock(&cds_lfht_fork_mutex); + if (--cds_lfht_workqueue_user_count) + goto end; + urcu_workqueue_destroy(cds_lfht_workqueue); + cds_lfht_workqueue = NULL; +end: + mutex_unlock(&cds_lfht_fork_mutex); + + flavor->unregister_rculfhash_atfork(&cds_lfht_atfork); +} diff --git a/src/urcu-call-rcu-impl.h b/src/urcu-call-rcu-impl.h index bfa53f8..4562ba4 100644 --- a/src/urcu-call-rcu-impl.h +++ b/src/urcu-call-rcu-impl.h @@ -99,6 +99,9 @@ static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER; static struct call_rcu_data *default_call_rcu_data; +static struct urcu_atfork *registered_rculfhash_atfork; +static unsigned long registered_rculfhash_atfork_refcount; + /* * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are * available, then we can have call_rcu threads assigned to individual @@ -907,9 +910,14 @@ online: void call_rcu_before_fork(void) { struct call_rcu_data *crdp; + struct urcu_atfork *atfork; call_rcu_lock(&call_rcu_mutex); + atfork = registered_rculfhash_atfork; + if (atfork) + atfork->before_fork(atfork->priv); + cds_list_for_each_entry(crdp, &call_rcu_data_list, list) { uatomic_or(&crdp->flags, URCU_CALL_RCU_PAUSE); cmm_smp_mb__after_uatomic_or(); @@ -929,6 +937,7 @@ void call_rcu_before_fork(void) void call_rcu_after_fork_parent(void) { struct call_rcu_data *crdp; + struct urcu_atfork *atfork; cds_list_for_each_entry(crdp, &call_rcu_data_list, list) uatomic_and(&crdp->flags, ~URCU_CALL_RCU_PAUSE); @@ -936,6 +945,9 @@ void call_rcu_after_fork_parent(void) while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_PAUSED) != 0) (void) poll(NULL, 0, 1); } + atfork = registered_rculfhash_atfork; + if (atfork) + atfork->after_fork_parent(atfork->priv); call_rcu_unlock(&call_rcu_mutex); } @@ -947,10 +959,15 @@ void call_rcu_after_fork_parent(void) void call_rcu_after_fork_child(void) { struct call_rcu_data *crdp, *next; + struct urcu_atfork *atfork; /* Release the mutex. */ call_rcu_unlock(&call_rcu_mutex); + atfork = registered_rculfhash_atfork; + if (atfork) + atfork->after_fork_child(atfork->priv); + /* Do nothing when call_rcu() has not been used */ if (cds_list_empty(&call_rcu_data_list)) return; @@ -980,3 +997,23 @@ void call_rcu_after_fork_child(void) call_rcu_data_free(crdp); } } + +void urcu_register_rculfhash_atfork(struct urcu_atfork *atfork) +{ + call_rcu_lock(&call_rcu_mutex); + if (registered_rculfhash_atfork_refcount++) + goto end; + registered_rculfhash_atfork = atfork; +end: + call_rcu_unlock(&call_rcu_mutex); +} + +void urcu_unregister_rculfhash_atfork(struct urcu_atfork *atfork) +{ + call_rcu_lock(&call_rcu_mutex); + if (--registered_rculfhash_atfork_refcount) + goto end; + registered_rculfhash_atfork = NULL; +end: + call_rcu_unlock(&call_rcu_mutex); +} diff --git a/src/urcu-flavor.h b/src/urcu-flavor.h index 5e7f292..9cfbd6a 100644 --- a/src/urcu-flavor.h +++ b/src/urcu-flavor.h @@ -27,6 +27,16 @@ extern "C" { #endif +struct urcu_atfork { + void (*before_fork)(void *priv); + void (*after_fork_parent)(void *priv); + void (*after_fork_child)(void *priv); + void *priv; +}; + +void urcu_register_rculfhash_atfork(struct urcu_atfork *atfork); +void urcu_unregister_rculfhash_atfork(struct urcu_atfork *atfork); + struct rcu_flavor_struct { void (*read_lock)(void); void (*read_unlock)(void); @@ -43,6 +53,9 @@ struct rcu_flavor_struct { void (*unregister_thread)(void); void (*barrier)(void); + + void (*register_rculfhash_atfork)(struct urcu_atfork *atfork); + void (*unregister_rculfhash_atfork)(struct urcu_atfork *atfork); }; #define DEFINE_RCU_FLAVOR(x) \ @@ -59,6 +72,8 @@ const struct rcu_flavor_struct x = { \ .register_thread = rcu_register_thread, \ .unregister_thread = rcu_unregister_thread,\ .barrier = rcu_barrier, \ + .register_rculfhash_atfork = urcu_register_rculfhash_atfork, \ + .unregister_rculfhash_atfork = urcu_unregister_rculfhash_atfork,\ } extern const struct rcu_flavor_struct rcu_flavor; -- 2.34.1