X-Git-Url: https://git.lttng.org/?p=urcu.git;a=blobdiff_plain;f=urcu-bp.c;h=cbdebf407a892c2d201d42d948025723702414e7;hp=190a5693dff5aaa8ee683faa18f1eeecca9658db;hb=7937ae1cd35218d2485f58757a0b9cb910a28e28;hpb=95b94246b745a49ec7e5a33661ef638b0dd2950b diff --git a/urcu-bp.c b/urcu-bp.c index 190a569..cbdebf4 100644 --- a/urcu-bp.c +++ b/urcu-bp.c @@ -36,6 +36,7 @@ #include #include +#include "urcu/arch.h" #include "urcu/wfcqueue.h" #include "urcu/map/urcu-bp.h" #include "urcu/static/urcu-bp.h" @@ -79,8 +80,8 @@ void *mremap_wrapper(void *old_address, size_t old_size, } #endif -/* Sleep delay in us */ -#define RCU_SLEEP_DELAY 1000 +/* Sleep delay in ms */ +#define RCU_SLEEP_DELAY_MS 10 #define INIT_NR_THREADS 8 #define ARENA_INIT_ALLOC \ sizeof(struct registry_chunk) \ @@ -91,14 +92,48 @@ void *mremap_wrapper(void *old_address, size_t old_size, */ #define RCU_QS_ACTIVE_ATTEMPTS 100 +static +int rcu_bp_refcount; + +/* If the headers do not support membarrier system call, fall back smp_mb. */ +#ifdef __NR_membarrier +# define membarrier(...) syscall(__NR_membarrier, __VA_ARGS__) +#else +# define membarrier(...) -ENOSYS +#endif + +enum membarrier_cmd { + MEMBARRIER_CMD_QUERY = 0, + MEMBARRIER_CMD_SHARED = (1 << 0), +}; + +static +void __attribute__((constructor)) rcu_bp_init(void); +static void __attribute__((destructor)) rcu_bp_exit(void); +int urcu_bp_has_sys_membarrier; + +/* + * rcu_gp_lock ensures mutual exclusion between threads calling + * synchronize_rcu(). + */ static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER; +/* + * rcu_registry_lock ensures mutual exclusion between threads + * registering and unregistering themselves to/from the registry, and + * with threads reading that registry from synchronize_rcu(). However, + * this lock is not held all the way through the completion of awaiting + * for the grace period. It is sporadically released between iterations + * on the registry. + * rcu_registry_lock may nest inside rcu_gp_lock. + */ +static pthread_mutex_t rcu_registry_lock = PTHREAD_MUTEX_INITIALIZER; -#ifdef DEBUG_YIELD -unsigned int rcu_yield_active; -DEFINE_URCU_TLS(unsigned int, rcu_rand_yield); -#endif +static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER; +static int initialized; + +static pthread_key_t urcu_bp_key; struct rcu_gp rcu_gp = { .ctr = RCU_GP_COUNT }; @@ -112,7 +147,7 @@ static CDS_LIST_HEAD(registry); struct registry_chunk { size_t data_len; /* data length */ - size_t used; /* data used */ + size_t used; /* amount of data used */ struct cds_list_head node; /* chunk_list node */ char data[]; }; @@ -128,8 +163,6 @@ static struct registry_arena registry_arena = { /* Saved fork signal mask, protected by rcu_gp_lock */ static sigset_t saved_fork_signal_mask; -static void rcu_gc_registry(void); - static void mutex_lock(pthread_mutex_t *mutex) { int ret; @@ -156,11 +189,23 @@ static void mutex_unlock(pthread_mutex_t *mutex) urcu_die(ret); } +static void smp_mb_master(void) +{ + if (caa_likely(urcu_bp_has_sys_membarrier)) + (void) membarrier(MEMBARRIER_CMD_SHARED, 0); + else + cmm_smp_mb(); +} + +/* + * Always called with rcu_registry lock held. Releases this lock between + * iterations and grabs it again. Holds the lock when it returns. + */ static void wait_for_readers(struct cds_list_head *input_readers, struct cds_list_head *cur_snap_readers, struct cds_list_head *qsreaders) { - int wait_loops = 0; + unsigned int wait_loops = 0; struct rcu_reader *index, *tmp; /* @@ -169,7 +214,9 @@ static void wait_for_readers(struct cds_list_head *input_readers, * rcu_gp.ctr value. */ for (;;) { - wait_loops++; + if (wait_loops < RCU_QS_ACTIVE_ATTEMPTS) + wait_loops++; + cds_list_for_each_entry_safe(index, tmp, input_readers, node) { switch (rcu_reader_state(&index->ctr)) { case RCU_READER_ACTIVE_CURRENT: @@ -196,10 +243,14 @@ static void wait_for_readers(struct cds_list_head *input_readers, if (cds_list_empty(input_readers)) { break; } else { - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) - usleep(RCU_SLEEP_DELAY); + /* Temporarily unlock the registry lock. */ + mutex_unlock(&rcu_registry_lock); + if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS) + (void) poll(NULL, 0, RCU_SLEEP_DELAY_MS); else caa_cpu_relax(); + /* Re-lock the registry lock before the next loop. */ + mutex_lock(&rcu_registry_lock); } } } @@ -218,19 +269,20 @@ void synchronize_rcu(void) mutex_lock(&rcu_gp_lock); + mutex_lock(&rcu_registry_lock); + if (cds_list_empty(®istry)) goto out; /* All threads should read qparity before accessing data structure * where new ptr points to. */ /* Write new ptr before changing the qparity */ - cmm_smp_mb(); - - /* Remove old registry elements */ - rcu_gc_registry(); + smp_mb_master(); /* * Wait for readers to observe original parity or be quiescent. + * wait_for_readers() can release and grab again rcu_registry_lock + * interally. */ wait_for_readers(®istry, &cur_snap_readers, &qsreaders); @@ -260,6 +312,8 @@ void synchronize_rcu(void) /* * Wait for readers to observe new parity or be quiescent. + * wait_for_readers() can release and grab again rcu_registry_lock + * interally. */ wait_for_readers(&cur_snap_readers, NULL, &qsreaders); @@ -272,8 +326,9 @@ void synchronize_rcu(void) * Finish waiting for reader threads before letting the old ptr being * freed. */ - cmm_smp_mb(); + smp_mb_master(); out: + mutex_unlock(&rcu_registry_lock); mutex_unlock(&rcu_gp_lock); ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); assert(!ret); @@ -324,7 +379,7 @@ void expand_arena(struct registry_arena *arena) -1, 0); if (new_chunk == MAP_FAILED) abort(); - bzero(new_chunk, new_chunk_len); + memset(new_chunk, 0, new_chunk_len); new_chunk->data_len = new_chunk_len - sizeof(struct registry_chunk); cds_list_add_tail(&new_chunk->node, &arena->chunk_list); @@ -344,7 +399,7 @@ void expand_arena(struct registry_arena *arena) if (new_chunk != MAP_FAILED) { /* Should not have moved. */ assert(new_chunk == last_chunk); - bzero((char *) last_chunk + old_chunk_len, + memset((char *) last_chunk + old_chunk_len, 0, new_chunk_len - old_chunk_len); last_chunk->data_len = new_chunk_len - sizeof(struct registry_chunk); @@ -358,7 +413,7 @@ void expand_arena(struct registry_arena *arena) -1, 0); if (new_chunk == MAP_FAILED) abort(); - bzero(new_chunk, new_chunk_len); + memset(new_chunk, 0, new_chunk_len); new_chunk->data_len = new_chunk_len - sizeof(struct registry_chunk); cds_list_add_tail(&new_chunk->node, &arena->chunk_list); @@ -402,10 +457,14 @@ static void add_thread(void) { struct rcu_reader *rcu_reader_reg; + int ret; rcu_reader_reg = arena_alloc(®istry_arena); if (!rcu_reader_reg) abort(); + ret = pthread_setspecific(urcu_bp_key, rcu_reader_reg); + if (ret) + abort(); /* Add to registry */ rcu_reader_reg->tid = pthread_self(); @@ -418,33 +477,39 @@ void add_thread(void) URCU_TLS(rcu_reader) = rcu_reader_reg; } -/* Called with signals off and mutex locked */ -static void rcu_gc_registry(void) +/* Called with mutex locked */ +static +void cleanup_thread(struct registry_chunk *chunk, + struct rcu_reader *rcu_reader_reg) +{ + rcu_reader_reg->ctr = 0; + cds_list_del(&rcu_reader_reg->node); + rcu_reader_reg->tid = 0; + rcu_reader_reg->alloc = 0; + chunk->used -= sizeof(struct rcu_reader); +} + +static +struct registry_chunk *find_chunk(struct rcu_reader *rcu_reader_reg) { struct registry_chunk *chunk; - struct rcu_reader *rcu_reader_reg; cds_list_for_each_entry(chunk, ®istry_arena.chunk_list, node) { - for (rcu_reader_reg = (struct rcu_reader *) &chunk->data[0]; - rcu_reader_reg < (struct rcu_reader *) &chunk->data[chunk->data_len]; - rcu_reader_reg++) { - pthread_t tid; - int ret; - - if (!rcu_reader_reg->alloc) - continue; - tid = rcu_reader_reg->tid; - ret = pthread_kill(tid, 0); - assert(ret != EINVAL); - if (ret == ESRCH) { - cds_list_del(&rcu_reader_reg->node); - rcu_reader_reg->ctr = 0; - rcu_reader_reg->alloc = 0; - chunk->used -= sizeof(struct rcu_reader); - } - - } + if (rcu_reader_reg < (struct rcu_reader *) &chunk->data[0]) + continue; + if (rcu_reader_reg >= (struct rcu_reader *) &chunk->data[chunk->data_len]) + continue; + return chunk; } + return NULL; +} + +/* Called with signals off and mutex locked */ +static +void remove_thread(struct rcu_reader *rcu_reader_reg) +{ + cleanup_thread(find_chunk(rcu_reader_reg), rcu_reader_reg); + URCU_TLS(rcu_reader) = NULL; } /* Disable signals, take mutex, add to registry */ @@ -454,38 +519,112 @@ void rcu_bp_register(void) int ret; ret = sigfillset(&newmask); - assert(!ret); + if (ret) + abort(); ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); - assert(!ret); + if (ret) + abort(); /* * Check if a signal concurrently registered our thread since - * the check in rcu_read_lock(). */ + * the check in rcu_read_lock(). + */ if (URCU_TLS(rcu_reader)) goto end; - mutex_lock(&rcu_gp_lock); + /* + * Take care of early registration before urcu_bp constructor. + */ + rcu_bp_init(); + + mutex_lock(&rcu_registry_lock); add_thread(); - mutex_unlock(&rcu_gp_lock); + mutex_unlock(&rcu_registry_lock); end: ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); - assert(!ret); + if (ret) + abort(); } -void rcu_bp_exit(void) +/* Disable signals, take mutex, remove from registry */ +static +void rcu_bp_unregister(struct rcu_reader *rcu_reader_reg) { - struct registry_chunk *chunk, *tmp; + sigset_t newmask, oldmask; + int ret; - cds_list_for_each_entry_safe(chunk, tmp, - ®istry_arena.chunk_list, node) { - munmap(chunk, chunk->data_len + sizeof(struct registry_chunk)); + ret = sigfillset(&newmask); + if (ret) + abort(); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + if (ret) + abort(); + + mutex_lock(&rcu_registry_lock); + remove_thread(rcu_reader_reg); + mutex_unlock(&rcu_registry_lock); + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + if (ret) + abort(); + rcu_bp_exit(); +} + +/* + * Remove thread from the registry when it exits, and flag it as + * destroyed so garbage collection can take care of it. + */ +static +void urcu_bp_thread_exit_notifier(void *rcu_key) +{ + rcu_bp_unregister(rcu_key); +} + +static +void rcu_bp_init(void) +{ + mutex_lock(&init_lock); + if (!rcu_bp_refcount++) { + int ret; + + ret = pthread_key_create(&urcu_bp_key, + urcu_bp_thread_exit_notifier); + if (ret) + abort(); + ret = membarrier(MEMBARRIER_CMD_QUERY, 0); + if (ret >= 0 && (ret & MEMBARRIER_CMD_SHARED)) { + urcu_bp_has_sys_membarrier = 1; + } + initialized = 1; } + mutex_unlock(&init_lock); +} + +static +void rcu_bp_exit(void) +{ + mutex_lock(&init_lock); + if (!--rcu_bp_refcount) { + struct registry_chunk *chunk, *tmp; + int ret; + + cds_list_for_each_entry_safe(chunk, tmp, + ®istry_arena.chunk_list, node) { + munmap(chunk, chunk->data_len + + sizeof(struct registry_chunk)); + } + CDS_INIT_LIST_HEAD(®istry_arena.chunk_list); + ret = pthread_key_delete(urcu_bp_key); + if (ret) + abort(); + } + mutex_unlock(&init_lock); } /* - * Holding the rcu_gp_lock across fork will make sure we fork() don't race with - * a concurrent thread executing with this same lock held. This ensures that the - * registry is in a coherent state in the child. + * Holding the rcu_gp_lock and rcu_registry_lock across fork will make + * sure we fork() don't race with a concurrent thread executing with + * any of those locks held. This ensures that the registry and data + * protected by rcu_gp_lock are in a coherent state in the child. */ void rcu_bp_before_fork(void) { @@ -497,6 +636,7 @@ void rcu_bp_before_fork(void) ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); assert(!ret); mutex_lock(&rcu_gp_lock); + mutex_lock(&rcu_registry_lock); saved_fork_signal_mask = oldmask; } @@ -506,18 +646,43 @@ void rcu_bp_after_fork_parent(void) int ret; oldmask = saved_fork_signal_mask; + mutex_unlock(&rcu_registry_lock); mutex_unlock(&rcu_gp_lock); ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); assert(!ret); } +/* + * Prune all entries from registry except our own thread. Fits the Linux + * fork behavior. Called with rcu_gp_lock and rcu_registry_lock held. + */ +static +void urcu_bp_prune_registry(void) +{ + struct registry_chunk *chunk; + struct rcu_reader *rcu_reader_reg; + + cds_list_for_each_entry(chunk, ®istry_arena.chunk_list, node) { + for (rcu_reader_reg = (struct rcu_reader *) &chunk->data[0]; + rcu_reader_reg < (struct rcu_reader *) &chunk->data[chunk->data_len]; + rcu_reader_reg++) { + if (!rcu_reader_reg->alloc) + continue; + if (rcu_reader_reg->tid == pthread_self()) + continue; + cleanup_thread(chunk, rcu_reader_reg); + } + } +} + void rcu_bp_after_fork_child(void) { sigset_t oldmask; int ret; - rcu_gc_registry(); + urcu_bp_prune_registry(); oldmask = saved_fork_signal_mask; + mutex_unlock(&rcu_registry_lock); mutex_unlock(&rcu_gp_lock); ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); assert(!ret);