If I remove the URCU_CALL_RCU_RT flag from the rbtree single writer
test, thus using the pthread_cond_signal mechanism, there is a huge
slowdown: without cpu affinity for the worker threads, it crawls to 129
updates/s (looks like mutex contention between the thread calling
call_rcu and the call_rcu thread). Adding CPU affinity to the per-cpu
call_rcu threads, I get 546 updates/s, which is slightly better (better
cache locality, and maybe the mutex contention is not as bad thanks to
the two threads sharing the same CPU).
So I decided to try replacing pthread_cond_wait/signal with my
futex-based implementation I use for the rest of the urcu lib: it has
the advantage of removing the mutex from the call_rcu() execution
entirely, sampling the "futex" variable without any mutex whatsoever for
the case where no wakeup is needed.
Disabling URCU_CALL_RCU_RT flag, with per-cpu affined call_rcu threads,
with my futex-based wakeup implementation, I get 55754 updates/s (even
better than with URCU_CALL_RCU_RT flag!).
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
#include "urcu-call-rcu.h"
#include "urcu-pointer.h"
#include "urcu/list.h"
#include "urcu-call-rcu.h"
#include "urcu-pointer.h"
#include "urcu/list.h"
+#include "urcu/urcu-futex.h"
/* Data structure that identifies a call_rcu thread. */
/* Data structure that identifies a call_rcu thread. */
struct cds_wfq_queue cbs;
unsigned long flags;
pthread_mutex_t mtx;
struct cds_wfq_queue cbs;
unsigned long flags;
pthread_mutex_t mtx;
unsigned long qlen;
pthread_t tid;
int cpu_affinity;
unsigned long qlen;
pthread_t tid;
int cpu_affinity;
static struct call_rcu_data **per_cpu_call_rcu_data;
static long maxcpus;
static struct call_rcu_data **per_cpu_call_rcu_data;
static long maxcpus;
+static void call_rcu_wait(struct call_rcu_data *crdp)
+{
+ /* Read call_rcu list before read futex */
+ cmm_smp_mb();
+ if (uatomic_read(&crdp->futex) == -1)
+ futex_async(&crdp->futex, FUTEX_WAIT, -1,
+ NULL, NULL, 0);
+}
+
+static void call_rcu_wake_up(struct call_rcu_data *crdp)
+{
+ /* Write to call_rcu list before reading/writing futex */
+ cmm_smp_mb();
+ if (unlikely(uatomic_read(&crdp->futex) == -1)) {
+ uatomic_set(&crdp->futex, 0);
+ futex_async(&crdp->futex, FUTEX_WAKE, 1,
+ NULL, NULL, 0);
+ }
+}
+
/* Allocate the array if it has not already been allocated. */
static void alloc_cpu_call_rcu_data(void)
/* Allocate the array if it has not already been allocated. */
static void alloc_cpu_call_rcu_data(void)
if (crdp->flags & URCU_CALL_RCU_RT)
poll(NULL, 0, 10);
else {
if (crdp->flags & URCU_CALL_RCU_RT)
poll(NULL, 0, 10);
else {
- call_rcu_lock(&crdp->mtx);
- _CMM_STORE_SHARED(crdp->flags,
- crdp->flags & ~URCU_CALL_RCU_RUNNING);
- if (&crdp->cbs.head ==
- _CMM_LOAD_SHARED(crdp->cbs.tail) &&
- pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) {
- perror("pthread_cond_wait");
- exit(-1);
- }
- _CMM_STORE_SHARED(crdp->flags,
- crdp->flags | URCU_CALL_RCU_RUNNING);
+ if (&crdp->cbs.head == _CMM_LOAD_SHARED(crdp->cbs.tail))
+ call_rcu_wait(crdp);
- call_rcu_unlock(&crdp->mtx);
}
}
call_rcu_lock(&crdp->mtx);
}
}
call_rcu_lock(&crdp->mtx);
perror("pthread_mutex_init");
exit(-1);
}
perror("pthread_mutex_init");
exit(-1);
}
- if (pthread_cond_init(&crdp->cond, NULL) != 0) {
- perror("pthread_cond_init");
- exit(-1);
- }
- crdp->flags = flags | URCU_CALL_RCU_RUNNING;
+ crdp->futex = 0;
+ crdp->flags = flags;
cds_list_add(&crdp->list, &call_rcu_data_list);
crdp->cpu_affinity = cpu_affinity;
cmm_smp_mb(); /* Structure initialized before pointer is planted. */
cds_list_add(&crdp->list, &call_rcu_data_list);
crdp->cpu_affinity = cpu_affinity;
cmm_smp_mb(); /* Structure initialized before pointer is planted. */
*/
static void wake_call_rcu_thread(struct call_rcu_data *crdp)
{
*/
static void wake_call_rcu_thread(struct call_rcu_data *crdp)
{
- if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) {
- call_rcu_lock(&crdp->mtx);
- if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) {
- if (pthread_cond_signal(&crdp->cond) != 0) {
- perror("pthread_cond_signal");
- exit(-1);
- }
- }
- call_rcu_unlock(&crdp->mtx);
- }
+ if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT))
+ call_rcu_wake_up(crdp);