From b57aee663af988b7f686c076ce6aef2a0d2487c8 Mon Sep 17 00:00:00 2001 From: "Paul E. McKenney" Date: Tue, 8 Mar 2011 21:48:49 -0500 Subject: [PATCH 1/1] Add call_rcu() interface Adds call_rcu(), with RCU threads to invoke the callbacks. By default, there will be one such RCU thread per process, created the first time that call_rcu() is invoked. On systems supporting sched_getcpu(), it is possible to create one RCU thread per CPU by calling create_all_cpu_call_rcu_data(). This version includes feedback from Mathieu Desnoyers. Signed-off-by: Paul E. McKenney Signed-off-by: Mathieu Desnoyers --- Makefile.am | 6 +- configure.ac | 2 +- tests/Makefile.am | 25 +-- tests/rcutorture.h | 68 ++++++- urcu-call-rcu.c | 450 ++++++++++++++++++++++++++++++++++++++++++ urcu-call-rcu.h | 80 ++++++++ urcu-defer.h | 8 - urcu/wfqueue-static.h | 12 +- urcu/wfstack-static.h | 1 + 9 files changed, 623 insertions(+), 29 deletions(-) create mode 100644 urcu-call-rcu.c create mode 100644 urcu-call-rcu.h diff --git a/Makefile.am b/Makefile.am index 79a7152..7956e7e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,6 +1,7 @@ INCLUDES = -I$(top_builddir)/urcu AM_LDFLAGS=-lpthread +AM_CFLAGS=-Wall SUBDIRS = . tests @@ -29,8 +30,8 @@ COMPAT+=compat_futex.c endif lib_LTLIBRARIES = liburcu.la liburcu-qsbr.la liburcu-mb.la liburcu-signal.la \ - liburcu-bp.la liburcu-defer.la libwfqueue.la libwfstack.la \ - librculfqueue.la librculfstack.la + liburcu-bp.la liburcu-defer.la liburcu-call.la \ + libwfqueue.la libwfstack.la librculfqueue.la librculfstack.la liburcu_la_SOURCES = urcu.c urcu-pointer.c $(COMPAT) @@ -44,6 +45,7 @@ liburcu_signal_la_CFLAGS = -DRCU_SIGNAL liburcu_bp_la_SOURCES = urcu-bp.c urcu-pointer.c $(COMPAT) +liburcu_call_la_SOURCES = urcu-call-rcu.c $(COMPAT) liburcu_defer_la_SOURCES = urcu-defer.c $(COMPAT) libwfqueue_la_SOURCES = wfqueue.c $(COMPAT) diff --git a/configure.ac b/configure.ac index 187cf43..3c61abc 100644 --- a/configure.ac +++ b/configure.ac @@ -35,7 +35,7 @@ AC_TYPE_SIZE_T # Checks for library functions. AC_FUNC_MALLOC AC_FUNC_MMAP -AC_CHECK_FUNCS([bzero gettimeofday munmap strtoul]) +AC_CHECK_FUNCS([bzero gettimeofday munmap sched_getcpu strtoul sysconf]) # Find arch type case $host_cpu in diff --git a/tests/Makefile.am b/tests/Makefile.am index a43dd75..3c025a4 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -1,5 +1,5 @@ AM_LDFLAGS=-lpthread -AM_CFLAGS=-I$(top_srcdir) -I$(top_builddir) +AM_CFLAGS=-I$(top_srcdir) -I$(top_builddir) -g noinst_PROGRAMS = test_urcu test_urcu_dynamic_link test_urcu_timing \ test_urcu_signal test_urcu_signal_dynamic_link test_urcu_signal_timing \ @@ -28,20 +28,21 @@ if COMPAT_FUTEX COMPAT+=$(top_srcdir)/compat_futex.c endif -URCU=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(COMPAT) -URCU_QSBR=$(top_srcdir)/urcu-qsbr.c $(top_srcdir)/urcu-pointer.c $(COMPAT) +URCU=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT) +URCU_QSBR=$(top_srcdir)/urcu-qsbr.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT) # URCU_MB uses urcu.c but -DRCU_MB must be defined -URCU_MB=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(COMPAT) +URCU_MB=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT) # URCU_SIGNAL uses urcu.c but -DRCU_SIGNAL must be defined -URCU_SIGNAL=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(COMPAT) -URCU_BP=$(top_srcdir)/urcu-bp.c $(top_srcdir)/urcu-pointer.c $(COMPAT) -URCU_DEFER=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-defer.c $(top_srcdir)/urcu-pointer.c $(COMPAT) +URCU_SIGNAL=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT) +URCU_BP=$(top_srcdir)/urcu-bp.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT) +URCU_DEFER=$(top_srcdir)/urcu.c $(top_srcdir)/urcu-defer.c $(top_srcdir)/urcu-pointer.c $(top_srcdir)/urcu-call-rcu.c $(top_srcdir)/wfqueue.c $(COMPAT) URCU_LIB=$(top_builddir)/liburcu.la URCU_QSBR_LIB=$(top_builddir)/liburcu-qsbr.la URCU_MB_LIB=$(top_builddir)/liburcu-mb.la URCU_SIGNAL_LIB=$(top_builddir)/liburcu-signal.la URCU_BP_LIB=$(top_builddir)/liburcu-bp.la +URCU_CALL_LIB=$(top_builddir)/liburcu-call.la WFQUEUE_LIB=$(top_builddir)/libwfqueue.la WFSTACK_LIB=$(top_builddir)/libwfstack.la RCULFQUEUE_LIB=$(top_builddir)/librculfqueue.la @@ -95,23 +96,23 @@ test_perthreadlock_SOURCES = test_perthreadlock.c $(URCU_SIGNAL) rcutorture_urcu_SOURCES = urcutorture.c rcutorture_urcu_CFLAGS = -DTORTURE_URCU $(AM_CFLAGS) -rcutorture_urcu_LDADD = $(URCU) +rcutorture_urcu_LDADD = $(URCU) $(URCU_CALL_LIB) $(WFQUEUE_LIB) rcutorture_urcu_mb_SOURCES = urcutorture.c rcutorture_urcu_mb_CFLAGS = -DTORTURE_URCU_MB $(AM_CFLAGS) -rcutorture_urcu_mb_LDADD = $(URCU_MB_LIB) +rcutorture_urcu_mb_LDADD = $(URCU_MB_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB) rcutorture_qsbr_SOURCES = urcutorture.c rcutorture_qsbr_CFLAGS = -DTORTURE_QSBR $(AM_CFLAGS) -rcutorture_qsbr_LDADD = $(URCU_QSBR_LIB) +rcutorture_qsbr_LDADD = $(URCU_QSBR_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB) rcutorture_urcu_signal_SOURCES = urcutorture.c rcutorture_urcu_signal_CFLAGS = -DTORTURE_URCU_SIGNAL $(AM_CFLAGS) -rcutorture_urcu_signal_LDADD = $(URCU_SIGNAL_LIB) +rcutorture_urcu_signal_LDADD = $(URCU_SIGNAL_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB) rcutorture_urcu_bp_SOURCES = urcutorture.c rcutorture_urcu_bp_CFLAGS = -DTORTURE_URCU_BP $(AM_CFLAGS) -rcutorture_urcu_bp_LDADD = $(URCU_BP_LIB) +rcutorture_urcu_bp_LDADD = $(URCU_BP_LIB) $(URCU_CALL_LIB) $(WFQUEUE_LIB) test_mutex_SOURCES = test_mutex.c $(URCU) diff --git a/tests/rcutorture.h b/tests/rcutorture.h index 4dac2f2..b42b8ab 100644 --- a/tests/rcutorture.h +++ b/tests/rcutorture.h @@ -65,6 +65,9 @@ * Test variables. */ +#include +#include "../urcu-call-rcu.h" + DEFINE_PER_THREAD(long long, n_reads_pt); DEFINE_PER_THREAD(long long, n_updates_pt); @@ -147,6 +150,16 @@ void *rcu_update_perf_test(void *arg) { long long n_updates_local = 0; + if ((random() & 0xf00) == 0) { + struct call_rcu_data *crdp; + + crdp = create_call_rcu_data(0); + if (crdp != NULL) { + fprintf(stderr, + "Using per-thread call_rcu() worker.\n"); + set_thread_call_rcu_data(crdp); + } + } uatomic_inc(&nthreadsrunning); while (goflag == GOFLAG_INIT) poll(NULL, 0, 1); @@ -296,10 +309,30 @@ void *rcu_read_stress_test(void *arg) return (NULL); } +static pthread_mutex_t call_rcu_test_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t call_rcu_test_cond = PTHREAD_COND_INITIALIZER; + +void rcu_update_stress_test_rcu(struct rcu_head *head) +{ + if (pthread_mutex_lock(&call_rcu_test_mutex) != 0) { + perror("pthread_mutex_lock"); + exit(-1); + } + if (pthread_cond_signal(&call_rcu_test_cond) != 0) { + perror("pthread_cond_signal"); + exit(-1); + } + if (pthread_mutex_unlock(&call_rcu_test_mutex) != 0) { + perror("pthread_mutex_unlock"); + exit(-1); + } +} + void *rcu_update_stress_test(void *arg) { int i; struct rcu_stress *p; + struct rcu_head rh; while (goflag == GOFLAG_INIT) poll(NULL, 0, 1); @@ -317,7 +350,24 @@ void *rcu_update_stress_test(void *arg) for (i = 0; i < RCU_STRESS_PIPE_LEN; i++) if (i != rcu_stress_idx) rcu_stress_array[i].pipe_count++; - synchronize_rcu(); + if (n_updates & 0x1) + synchronize_rcu(); + else { + if (pthread_mutex_lock(&call_rcu_test_mutex) != 0) { + perror("pthread_mutex_lock"); + exit(-1); + } + call_rcu(&rh, rcu_update_stress_test_rcu); + if (pthread_cond_wait(&call_rcu_test_cond, + &call_rcu_test_mutex) != 0) { + perror("pthread_cond_wait"); + exit(-1); + } + if (pthread_mutex_unlock(&call_rcu_test_mutex) != 0) { + perror("pthread_mutex_unlock"); + exit(-1); + } + } n_updates++; } return NULL; @@ -325,6 +375,16 @@ void *rcu_update_stress_test(void *arg) void *rcu_fake_update_stress_test(void *arg) { + if ((random() & 0xf00) == 0) { + struct call_rcu_data *crdp; + + crdp = create_call_rcu_data(0); + if (crdp != NULL) { + fprintf(stderr, + "Using per-thread call_rcu() worker.\n"); + set_thread_call_rcu_data(crdp); + } + } while (goflag == GOFLAG_INIT) poll(NULL, 0, 1); while (goflag == GOFLAG_RUN) { @@ -396,6 +456,12 @@ int main(int argc, char *argv[]) smp_init(); //rcu_init(); + srandom(time(NULL)); + if (random() & 0x100) { + fprintf(stderr, "Allocating per-CPU call_rcu threads.\n"); + if (create_all_cpu_call_rcu_data(0)) + perror("create_all_cpu_call_rcu_data"); + } #ifdef DEBUG_YIELD yield_active |= YIELD_READ; diff --git a/urcu-call-rcu.c b/urcu-call-rcu.c new file mode 100644 index 0000000..5c003aa --- /dev/null +++ b/urcu-call-rcu.c @@ -0,0 +1,450 @@ +/* + * urcu-call-rcu.c + * + * Userspace RCU library - batch memory reclamation with kernel API + * + * Copyright (c) 2010 Paul E. McKenney + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" +#include "urcu/wfqueue.h" +#include "urcu-call-rcu.h" +#include "urcu-pointer.h" + +/* Data structure that identifies a call_rcu thread. */ + +struct call_rcu_data { + struct cds_wfq_queue cbs; + unsigned long flags; + pthread_mutex_t mtx; + pthread_cond_t cond; + unsigned long qlen; + pthread_t tid; +} __attribute__((aligned(CAA_CACHE_LINE_SIZE))); + +/* Link a thread using call_rcu() to its call_rcu thread. */ + +static __thread struct call_rcu_data *thread_call_rcu_data; + +/* Guard call_rcu thread creation. */ + +static pthread_mutex_t call_rcu_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* If a given thread does not have its own call_rcu thread, this is default. */ + +static struct call_rcu_data *default_call_rcu_data; + +extern void synchronize_rcu(void); + +/* + * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are + * available, then we can have call_rcu threads assigned to individual + * CPUs rather than only to specific threads. + */ + +#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) + +/* + * Pointer to array of pointers to per-CPU call_rcu_data structures + * and # CPUs. + */ + +static struct call_rcu_data **per_cpu_call_rcu_data; +static long maxcpus; + +/* Allocate the array if it has not already been allocated. */ + +static void alloc_cpu_call_rcu_data(void) +{ + struct call_rcu_data **p; + static int warned = 0; + + if (maxcpus != 0) + return; + maxcpus = sysconf(_SC_NPROCESSORS_CONF); + if (maxcpus <= 0) { + return; + } + p = malloc(maxcpus * sizeof(*per_cpu_call_rcu_data)); + if (p != NULL) { + memset(p, '\0', maxcpus * sizeof(*per_cpu_call_rcu_data)); + per_cpu_call_rcu_data = p; + } else { + if (!warned) { + fprintf(stderr, "[error] liburcu: unable to allocate per-CPU pointer array\n"); + } + warned = 1; + } +} + +#else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */ + +static const struct call_rcu_data **per_cpu_call_rcu_data = NULL; +static const long maxcpus = -1; + +static void alloc_cpu_call_rcu_data(void) +{ +} + +static int sched_getcpu(void) +{ + return -1; +} + +#endif /* #else #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */ + +/* Acquire the specified pthread mutex. */ + +static void call_rcu_lock(pthread_mutex_t *pmp) +{ + if (pthread_mutex_lock(pmp) != 0) { + perror("pthread_mutex_lock"); + exit(-1); + } +} + +/* Release the specified pthread mutex. */ + +static void call_rcu_unlock(pthread_mutex_t *pmp) +{ + if (pthread_mutex_unlock(pmp) != 0) { + perror("pthread_mutex_unlock"); + exit(-1); + } +} + +/* This is the code run by each call_rcu thread. */ + +static void *call_rcu_thread(void *arg) +{ + unsigned long cbcount; + struct cds_wfq_node *cbs; + struct cds_wfq_node **cbs_tail; + struct call_rcu_data *crdp = (struct call_rcu_data *)arg; + struct rcu_head *rhp; + + thread_call_rcu_data = crdp; + for (;;) { + if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { + while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) + poll(NULL, 0, 1); + _CMM_STORE_SHARED(crdp->cbs.head, NULL); + cbs_tail = (struct cds_wfq_node **) + uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head); + synchronize_rcu(); + cbcount = 0; + do { + while (cbs->next == NULL && + &cbs->next != cbs_tail) + poll(NULL, 0, 1); + if (cbs == &crdp->cbs.dummy) { + cbs = cbs->next; + continue; + } + rhp = (struct rcu_head *)cbs; + cbs = cbs->next; + rhp->func(rhp); + cbcount++; + } while (cbs != NULL); + uatomic_sub(&crdp->qlen, cbcount); + } + if (crdp->flags & URCU_CALL_RCU_RT) + poll(NULL, 0, 10); + else { + call_rcu_lock(&crdp->mtx); + _CMM_STORE_SHARED(crdp->flags, + crdp->flags & ~URCU_CALL_RCU_RUNNING); + if (&crdp->cbs.head == + _CMM_LOAD_SHARED(crdp->cbs.tail) && + pthread_cond_wait(&crdp->cond, &crdp->mtx) != 0) { + perror("pthread_cond_wait"); + exit(-1); + } + _CMM_STORE_SHARED(crdp->flags, + crdp->flags | URCU_CALL_RCU_RUNNING); + poll(NULL, 0, 10); + call_rcu_unlock(&crdp->mtx); + } + } + return NULL; /* NOTREACHED */ +} + +/* + * Create both a call_rcu thread and the corresponding call_rcu_data + * structure, linking the structure in as specified. + */ + +void call_rcu_data_init(struct call_rcu_data **crdpp, unsigned long flags) +{ + struct call_rcu_data *crdp; + + crdp = malloc(sizeof(*crdp)); + if (crdp == NULL) { + fprintf(stderr, "Out of memory.\n"); + exit(-1); + } + memset(crdp, '\0', sizeof(*crdp)); + cds_wfq_init(&crdp->cbs); + crdp->qlen = 0; + if (pthread_mutex_init(&crdp->mtx, NULL) != 0) { + perror("pthread_mutex_init"); + exit(-1); + } + if (pthread_cond_init(&crdp->cond, NULL) != 0) { + perror("pthread_cond_init"); + exit(-1); + } + crdp->flags = flags | URCU_CALL_RCU_RUNNING; + cmm_smp_mb(); /* Structure initialized before pointer is planted. */ + *crdpp = crdp; + if (pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp) != 0) { + perror("pthread_create"); + exit(-1); + } +} + +/* + * Return a pointer to the call_rcu_data structure for the specified + * CPU, returning NULL if there is none. We cannot automatically + * created it because the platform we are running on might not define + * sched_getcpu(). + */ + +struct call_rcu_data *get_cpu_call_rcu_data(int cpu) +{ + static int warned = 0; + + if (per_cpu_call_rcu_data == NULL) + return NULL; + if (!warned && maxcpus > 0 && (cpu < 0 || maxcpus <= cpu)) { + fprintf(stderr, "[error] liburcu: get CPU # out of range\n"); + warned = 1; + } + if (cpu < 0 || maxcpus <= cpu) + return NULL; + return per_cpu_call_rcu_data[cpu]; +} + +/* + * Return the tid corresponding to the call_rcu thread whose + * call_rcu_data structure is specified. + */ + +pthread_t get_call_rcu_thread(struct call_rcu_data *crdp) +{ + return crdp->tid; +} + +/* + * Create a call_rcu_data structure (with thread) and return a pointer. + */ + +struct call_rcu_data *create_call_rcu_data(unsigned long flags) +{ + struct call_rcu_data *crdp; + + call_rcu_data_init(&crdp, flags); + return crdp; +} + +/* + * Set the specified CPU to use the specified call_rcu_data structure. + */ + +int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp) +{ + int warned = 0; + + call_rcu_lock(&call_rcu_mutex); + if (cpu < 0 || maxcpus <= cpu) { + if (!warned) { + fprintf(stderr, "[error] liburcu: set CPU # out of range\n"); + warned = 1; + } + call_rcu_unlock(&call_rcu_mutex); + errno = EINVAL; + return -EINVAL; + } + alloc_cpu_call_rcu_data(); + call_rcu_unlock(&call_rcu_mutex); + if (per_cpu_call_rcu_data == NULL) { + errno = ENOMEM; + return -ENOMEM; + } + per_cpu_call_rcu_data[cpu] = crdp; + return 0; +} + +/* + * Return a pointer to the default call_rcu_data structure, creating + * one if need be. Because we never free call_rcu_data structures, + * we don't need to be in an RCU read-side critical section. + */ + +struct call_rcu_data *get_default_call_rcu_data(void) +{ + if (default_call_rcu_data != NULL) + return rcu_dereference(default_call_rcu_data); + call_rcu_lock(&call_rcu_mutex); + if (default_call_rcu_data != NULL) { + call_rcu_unlock(&call_rcu_mutex); + return default_call_rcu_data; + } + call_rcu_data_init(&default_call_rcu_data, 0); + call_rcu_unlock(&call_rcu_mutex); + return default_call_rcu_data; +} + +/* + * Return the call_rcu_data structure that applies to the currently + * running thread. Any call_rcu_data structure assigned specifically + * to this thread has first priority, followed by any call_rcu_data + * structure assigned to the CPU on which the thread is running, + * followed by the default call_rcu_data structure. If there is not + * yet a default call_rcu_data structure, one will be created. + */ +struct call_rcu_data *get_call_rcu_data(void) +{ + int curcpu; + static int warned = 0; + + if (thread_call_rcu_data != NULL) + return thread_call_rcu_data; + if (maxcpus <= 0) + return get_default_call_rcu_data(); + curcpu = sched_getcpu(); + if (!warned && (curcpu < 0 || maxcpus <= curcpu)) { + fprintf(stderr, "[error] liburcu: gcrd CPU # out of range\n"); + warned = 1; + } + if (curcpu >= 0 && maxcpus > curcpu && + per_cpu_call_rcu_data != NULL && + per_cpu_call_rcu_data[curcpu] != NULL) + return per_cpu_call_rcu_data[curcpu]; + return get_default_call_rcu_data(); +} + +/* + * Return a pointer to this task's call_rcu_data if there is one. + */ + +struct call_rcu_data *get_thread_call_rcu_data(void) +{ + return thread_call_rcu_data; +} + +/* + * Set this task's call_rcu_data structure as specified, regardless + * of whether or not this task already had one. (This allows switching + * to and from real-time call_rcu threads, for example.) + */ + +void set_thread_call_rcu_data(struct call_rcu_data *crdp) +{ + thread_call_rcu_data = crdp; +} + +/* + * Create a separate call_rcu thread for each CPU. This does not + * replace a pre-existing call_rcu thread -- use the set_cpu_call_rcu_data() + * function if you want that behavior. + */ + +int create_all_cpu_call_rcu_data(unsigned long flags) +{ + int i; + struct call_rcu_data *crdp; + int ret; + + call_rcu_lock(&call_rcu_mutex); + alloc_cpu_call_rcu_data(); + call_rcu_unlock(&call_rcu_mutex); + if (maxcpus <= 0) { + errno = EINVAL; + return -EINVAL; + } + if (per_cpu_call_rcu_data == NULL) { + errno = ENOMEM; + return -ENOMEM; + } + for (i = 0; i < maxcpus; i++) { + call_rcu_lock(&call_rcu_mutex); + if (get_cpu_call_rcu_data(i)) { + call_rcu_unlock(&call_rcu_mutex); + continue; + } + crdp = create_call_rcu_data(flags); + if (crdp == NULL) { + call_rcu_unlock(&call_rcu_mutex); + errno = ENOMEM; + return -ENOMEM; + } + call_rcu_unlock(&call_rcu_mutex); + if ((ret = set_cpu_call_rcu_data(i, crdp)) != 0) { + /* FIXME: Leaks crdp for now. */ + return ret; /* Can happen on race. */ + } + } + return 0; +} + +/* + * Schedule a function to be invoked after a following grace period. + * This is the only function that must be called -- the others are + * only present to allow applications to tune their use of RCU for + * maximum performance. + * + * Note that unless a call_rcu thread has not already been created, + * the first invocation of call_rcu() will create one. So, if you + * need the first invocation of call_rcu() to be fast, make sure + * to create a call_rcu thread first. One way to accomplish this is + * "get_call_rcu_data();", and another is create_all_cpu_call_rcu_data(). + */ + +void call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head)) +{ + struct call_rcu_data *crdp; + + cds_wfq_node_init(&head->next); + head->func = func; + crdp = get_call_rcu_data(); + cds_wfq_enqueue(&crdp->cbs, &head->next); + uatomic_inc(&crdp->qlen); + if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RT)) { + call_rcu_lock(&crdp->mtx); + if (!(_CMM_LOAD_SHARED(crdp->flags) & URCU_CALL_RCU_RUNNING)) { + if (pthread_cond_signal(&crdp->cond) != 0) { + perror("pthread_cond_signal"); + exit(-1); + } + } + call_rcu_unlock(&crdp->mtx); + } +} diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h new file mode 100644 index 0000000..2c13388 --- /dev/null +++ b/urcu-call-rcu.h @@ -0,0 +1,80 @@ +#ifndef _URCU_CALL_RCU_H +#define _URCU_CALL_RCU_H + +/* + * urcu-call-rcu.h + * + * Userspace RCU header - deferred execution + * + * Copyright (c) 2009 Mathieu Desnoyers + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation. + * + * LGPL-compatible code should include this header with : + * + * #define _LGPL_SOURCE + * #include + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* Note that struct call_rcu_data is opaque to callers. */ + +struct call_rcu_data; + +/* Flag values. */ + +#define URCU_CALL_RCU_RT 0x1 +#define URCU_CALL_RCU_RUNNING 0x2 + +/* + * The rcu_head data structure is placed in the structure to be freed + * via call_rcu(). + */ + +struct rcu_head { + struct cds_wfq_node next; + void (*func)(struct rcu_head *head); +}; + +/* + * Exported functions + */ +void call_rcu_data_init(struct call_rcu_data **crdpp, unsigned long flags); +struct call_rcu_data *get_cpu_call_rcu_data(int cpu); +pthread_t get_call_rcu_thread(struct call_rcu_data *crdp); +struct call_rcu_data *create_call_rcu_data(unsigned long flags); +int set_cpu_call_rcu_data(int cpu, struct call_rcu_data *crdp); +struct call_rcu_data *get_default_call_rcu_data(void); +struct call_rcu_data *get_call_rcu_data(void); +struct call_rcu_data *get_thread_call_rcu_data(void); +void set_thread_call_rcu_data(struct call_rcu_data *crdp); +int create_all_cpu_call_rcu_data(unsigned long flags); +void call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head)); + +#ifdef __cplusplus +} +#endif + +#endif /* _URCU_CALL_RCU_H */ diff --git a/urcu-defer.h b/urcu-defer.h index 2019e13..3710081 100644 --- a/urcu-defer.h +++ b/urcu-defer.h @@ -52,14 +52,6 @@ extern "C" { extern void defer_rcu(void (*fct)(void *p), void *p); -/* - * call_rcu will eventually be implemented with an API similar to the Linux - * kernel call_rcu(), which will allow its use within RCU read-side C.S. - * Generate an error if used for now. - */ - -#define call_rcu __error_call_rcu_not_implemented_please_use_defer_rcu - /* * Thread registration for reclamation. */ diff --git a/urcu/wfqueue-static.h b/urcu/wfqueue-static.h index 30d6e96..790931b 100644 --- a/urcu/wfqueue-static.h +++ b/urcu/wfqueue-static.h @@ -28,6 +28,7 @@ #include #include +#include #include #include @@ -47,12 +48,12 @@ extern "C" { #define WFQ_ADAPT_ATTEMPTS 10 /* Retry if being set */ #define WFQ_WAIT 10 /* Wait 10 ms if being set */ -void _cds_wfq_node_init(struct cds_wfq_node *node) +static inline void _cds_wfq_node_init(struct cds_wfq_node *node) { node->next = NULL; } -void _cds_wfq_init(struct cds_wfq_queue *q) +static inline void _cds_wfq_init(struct cds_wfq_queue *q) { int ret; @@ -64,7 +65,8 @@ void _cds_wfq_init(struct cds_wfq_queue *q) assert(!ret); } -void _cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node) +static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q, + struct cds_wfq_node *node) { struct cds_wfq_node **old_tail; @@ -90,7 +92,7 @@ void _cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node) * thread to be scheduled. The queue appears empty until tail->next is set by * enqueue. */ -struct cds_wfq_node * +static inline struct cds_wfq_node * ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) { struct cds_wfq_node *node, *next; @@ -128,7 +130,7 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) return node; } -struct cds_wfq_node * +static inline struct cds_wfq_node * _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) { struct cds_wfq_node *retnode; diff --git a/urcu/wfstack-static.h b/urcu/wfstack-static.h index eed83da..ff18c4a 100644 --- a/urcu/wfstack-static.h +++ b/urcu/wfstack-static.h @@ -28,6 +28,7 @@ #include #include +#include #include #include -- 2.34.1