X-Git-Url: https://git.lttng.org/?p=urcu.git;a=blobdiff_plain;f=urcu-call-rcu-impl.h;h=f0751f538b4b66c829d4d988b0d5102192ad889b;hp=f6625ba14e2d05689443caec171605c13b120a1f;hb=fbc55dd1caeba6fdce2b781537b785eaf58279ed;hpb=e85451a199f995091d2f740b6fd0ad2cd8fc2dc7 diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h index f6625ba..f0751f5 100644 --- a/urcu-call-rcu-impl.h +++ b/urcu-call-rcu-impl.h @@ -64,6 +64,16 @@ struct call_rcu_data { struct cds_list_head list; } __attribute__((aligned(CAA_CACHE_LINE_SIZE))); +struct call_rcu_completion { + int barrier_count; + int32_t futex; +}; + +struct call_rcu_completion_work { + struct rcu_head head; + struct call_rcu_completion *completion; +}; + /* * List of all call_rcu_data structures to keep valgrind happy. * Protected by call_rcu_mutex. @@ -90,7 +100,23 @@ static struct call_rcu_data *default_call_rcu_data; * CPUs rather than only to specific threads. */ -#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) +#ifdef HAVE_SCHED_GETCPU + +static int urcu_sched_getcpu(void) +{ + return sched_getcpu(); +} + +#else /* #ifdef HAVE_SCHED_GETCPU */ + +static int urcu_sched_getcpu(void) +{ + return -1; +} + +#endif /* #else #ifdef HAVE_SCHED_GETCPU */ + +#if defined(HAVE_SYSCONF) && defined(HAVE_SCHED_GETCPU) /* * Pointer to array of pointers to per-CPU call_rcu_data structures @@ -133,7 +159,7 @@ static void alloc_cpu_call_rcu_data(void) } } -#else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */ +#else /* #if defined(HAVE_SYSCONF) && defined(HAVE_SCHED_GETCPU) */ /* * per_cpu_call_rcu_data should be constant, but some functions below, used both @@ -151,12 +177,7 @@ static void alloc_cpu_call_rcu_data(void) { } -static int sched_getcpu(void) -{ - return -1; -} - -#endif /* #else #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */ +#endif /* #else #if defined(HAVE_SYSCONF) && defined(HAVE_SCHED_GETCPU) */ /* Acquire the specified pthread mutex. */ @@ -225,6 +246,26 @@ static void call_rcu_wake_up(struct call_rcu_data *crdp) } } +static void call_rcu_completion_wait(struct call_rcu_completion *completion) +{ + /* Read completion barrier count before read futex */ + cmm_smp_mb(); + if (uatomic_read(&completion->futex) == -1) + futex_async(&completion->futex, FUTEX_WAIT, -1, + NULL, NULL, 0); +} + +static void call_rcu_completion_wake_up(struct call_rcu_completion *completion) +{ + /* Write to completion barrier count before reading/writing futex */ + cmm_smp_mb(); + if (caa_unlikely(uatomic_read(&completion->futex) == -1)) { + uatomic_set(&completion->futex, 0); + futex_async(&completion->futex, FUTEX_WAKE, 1, + NULL, NULL, 0); + } +} + /* This is the code run by each call_rcu thread. */ static void *call_rcu_thread(void *arg) @@ -267,6 +308,8 @@ static void *call_rcu_thread(void *arg) uatomic_or(&crdp->flags, URCU_CALL_RCU_PAUSED); while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_PAUSE) != 0) poll(NULL, 0, 1); + uatomic_and(&crdp->flags, ~URCU_CALL_RCU_PAUSED); + cmm_smp_mb__after_uatomic_and(); rcu_register_thread(); } @@ -357,7 +400,7 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp, * Return a pointer to the call_rcu_data structure for the specified * CPU, returning NULL if there is none. We cannot automatically * created it because the platform we are running on might not define - * sched_getcpu(). + * urcu_sched_getcpu(). * * The call to this function and use of the returned call_rcu_data * should be protected by RCU read-side lock. @@ -499,7 +542,7 @@ struct call_rcu_data *get_call_rcu_data(void) return URCU_TLS(thread_call_rcu_data); if (maxcpus > 0) { - crd = get_cpu_call_rcu_data(sched_getcpu()); + crd = get_cpu_call_rcu_data(urcu_sched_getcpu()); if (crd) return crd; } @@ -593,6 +636,17 @@ static void wake_call_rcu_thread(struct call_rcu_data *crdp) call_rcu_wake_up(crdp); } +static void _call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head), + struct call_rcu_data *crdp) +{ + cds_wfcq_node_init(&head->next); + head->func = func; + cds_wfcq_enqueue(&crdp->cbs_head, &crdp->cbs_tail, &head->next); + uatomic_inc(&crdp->qlen); + wake_call_rcu_thread(crdp); +} + /* * Schedule a function to be invoked after a following grace period. * This is the only function that must be called -- the others are @@ -607,20 +661,15 @@ static void wake_call_rcu_thread(struct call_rcu_data *crdp) * * call_rcu must be called by registered RCU read-side threads. */ - void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *head)) { struct call_rcu_data *crdp; - cds_wfcq_node_init(&head->next); - head->func = func; /* Holding rcu read-side lock across use of per-cpu crdp */ rcu_read_lock(); crdp = get_call_rcu_data(); - cds_wfcq_enqueue(&crdp->cbs_head, &crdp->cbs_tail, &head->next); - uatomic_inc(&crdp->qlen); - wake_call_rcu_thread(crdp); + _call_rcu(head, func, crdp); rcu_read_unlock(); } @@ -719,6 +768,79 @@ void free_all_cpu_call_rcu_data(void) free(crdp); } +static +void _rcu_barrier_complete(struct rcu_head *head) +{ + struct call_rcu_completion_work *work; + struct call_rcu_completion *completion; + + work = caa_container_of(head, struct call_rcu_completion_work, head); + completion = work->completion; + uatomic_dec(&completion->barrier_count); + call_rcu_completion_wake_up(completion); + free(work); +} + +/* + * Wait for all in-flight call_rcu callbacks to complete execution. + */ +void rcu_barrier(void) +{ + struct call_rcu_data *crdp; + struct call_rcu_completion completion; + int count = 0; + int was_online; + + /* Put in offline state in QSBR. */ + was_online = rcu_read_ongoing(); + if (was_online) + rcu_thread_offline(); + /* + * Calling a rcu_barrier() within a RCU read-side critical + * section is an error. + */ + if (rcu_read_ongoing()) { + static int warned = 0; + + if (!warned) { + fprintf(stderr, "[error] liburcu: rcu_barrier() called from within RCU read-side critical section.\n"); + } + warned = 1; + goto online; + } + + call_rcu_lock(&call_rcu_mutex); + cds_list_for_each_entry(crdp, &call_rcu_data_list, list) + count++; + + completion.barrier_count = count; + completion.futex = 0; + + cds_list_for_each_entry(crdp, &call_rcu_data_list, list) { + struct call_rcu_completion_work *work; + + work = calloc(sizeof(*work), 1); + if (!work) + urcu_die(errno); + work->completion = &completion; + _call_rcu(&work->head, _rcu_barrier_complete, crdp); + } + call_rcu_unlock(&call_rcu_mutex); + + /* Wait for them */ + for (;;) { + uatomic_dec(&completion.futex); + /* Decrement futex before reading barrier_count */ + cmm_smp_mb(); + if (!uatomic_read(&completion.barrier_count)) + break; + call_rcu_completion_wait(&completion); + } +online: + if (was_online) + rcu_thread_online(); +} + /* * Acquire the call_rcu_mutex in order to ensure that the child sees * all of the call_rcu() data structures in a consistent state. Ensure @@ -753,6 +875,10 @@ void call_rcu_after_fork_parent(void) cds_list_for_each_entry(crdp, &call_rcu_data_list, list) uatomic_and(&crdp->flags, ~URCU_CALL_RCU_PAUSE); + cds_list_for_each_entry(crdp, &call_rcu_data_list, list) { + while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_PAUSED) != 0) + poll(NULL, 0, 1); + } call_rcu_unlock(&call_rcu_mutex); }