From: Mathieu Desnoyers Date: Fri, 10 Jun 2011 00:31:29 +0000 (-0400) Subject: cds containers: lfqueue and lfstack: don't depend on a particular rcu flavor X-Git-Tag: v0.6.0~21 X-Git-Url: https://git.lttng.org/?p=urcu.git;a=commitdiff_plain;h=d9b52143813e104eeee1e3d90061042230b7b5bf cds containers: lfqueue and lfstack: don't depend on a particular rcu flavor Remove rcu_read lock/unlock from the code, require the caller to ensure protection. First move towards a single .so for all data containers. This changes the lfqueue API, which is not very much used yet AFAIK. Signed-off-by: Mathieu Desnoyers --- diff --git a/rculfqueue.c b/rculfqueue.c index 508e05c..c844525 100644 --- a/rculfqueue.c +++ b/rculfqueue.c @@ -20,8 +20,11 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#define _LGPL_SOURCE /* Use the urcu symbols to select the appropriate rcu flavor at link time */ #include "urcu.h" + +#undef _LGPL_SOURCE /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #include "urcu/rculfqueue.h" #include "urcu/rculfqueue-static.h" @@ -35,9 +38,10 @@ void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) _cds_lfq_node_init_rcu(node); } -void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q) +void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, + void (*release)(struct urcu_ref *ref)) { - _cds_lfq_init_rcu(q); + _cds_lfq_init_rcu(q, release); } void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node) @@ -46,7 +50,7 @@ void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *n } struct cds_lfq_node_rcu * -cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q, void (*release)(struct urcu_ref *)) +cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) { - return _cds_lfq_dequeue_rcu(q, release); + return _cds_lfq_dequeue_rcu(q); } diff --git a/rculfstack.c b/rculfstack.c index 2c26046..ca50232 100644 --- a/rculfstack.c +++ b/rculfstack.c @@ -21,7 +21,10 @@ */ /* Use the urcu symbols to select the appropriate rcu flavor at link time */ +#define _LGPL_SOURCE #include "urcu.h" + +#undef _LGPL_SOURCE /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #include "urcu/rculfstack.h" #include "urcu/rculfstack-static.h" diff --git a/tests/test_urcu_lfq.c b/tests/test_urcu_lfq.c index 7aaad99..15023f6 100644 --- a/tests/test_urcu_lfq.c +++ b/tests/test_urcu_lfq.c @@ -177,7 +177,9 @@ void *thr_enqueuer(void *_count) if (!node) goto fail; cds_lfq_node_init_rcu(node); + rcu_read_lock(); cds_lfq_enqueue_rcu(&q, node); + rcu_read_unlock(); nr_successful_enqueues++; if (unlikely(wdelay)) @@ -200,12 +202,18 @@ fail: } -static void rcu_release_node(struct urcu_ref *ref) +static void rcu_free_node(struct rcu_head *head) { - struct cds_lfq_node_rcu *node = caa_container_of(ref, struct cds_lfq_node_rcu, ref); - defer_rcu(free, node); - //synchronize_rcu(); - //free(node); + struct cds_lfq_node_rcu *node = + caa_container_of(head, struct cds_lfq_node_rcu, rcu_head); + free(node); +} + +static void ref_release_node(struct urcu_ref *ref) +{ + struct cds_lfq_node_rcu *node = + caa_container_of(ref, struct cds_lfq_node_rcu, ref); + call_rcu(&node->rcu_head, rcu_free_node); } void *thr_dequeuer(void *_count) @@ -231,11 +239,14 @@ void *thr_dequeuer(void *_count) cmm_smp_mb(); for (;;) { - struct cds_lfq_node_rcu *node = cds_lfq_dequeue_rcu(&q, - rcu_release_node); + struct cds_lfq_node_rcu *node; + + rcu_read_lock(); + node = cds_lfq_dequeue_rcu(&q); + rcu_read_unlock(); if (node) { - urcu_ref_put(&node->ref, rcu_release_node); + urcu_ref_put(&node->ref, ref_release_node); nr_successful_dequeues++; } @@ -269,7 +280,9 @@ void test_end(struct cds_lfq_queue_rcu *q, unsigned long long *nr_dequeues) struct cds_lfq_node_rcu *node; do { - node = cds_lfq_dequeue_rcu(q, release_node); + rcu_read_lock(); + node = cds_lfq_dequeue_rcu(q); + rcu_read_unlock(); if (node) { urcu_ref_put(&node->ref, release_node); (*nr_dequeues)++; @@ -368,7 +381,7 @@ int main(int argc, char **argv) tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers); count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers); count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers); - cds_lfq_init_rcu(&q); + cds_lfq_init_rcu(&q, ref_release_node); next_aff = 0; diff --git a/tests/test_urcu_lfs.c b/tests/test_urcu_lfs.c index 834c72e..71596fd 100644 --- a/tests/test_urcu_lfs.c +++ b/tests/test_urcu_lfs.c @@ -177,6 +177,7 @@ void *thr_enqueuer(void *_count) if (!node) goto fail; cds_lfs_node_init_rcu(node); + /* No rcu read-side is needed for push */ cds_lfs_push_rcu(&s, node); nr_successful_enqueues++; @@ -223,13 +224,15 @@ void *thr_dequeuer(void *_count) cmm_smp_mb(); for (;;) { - struct cds_lfs_node_rcu *node = cds_lfs_pop_rcu(&s); + struct cds_lfs_node_rcu *node; + rcu_read_lock(); + node = cds_lfs_pop_rcu(&s); + rcu_read_unlock(); if (node) { defer_rcu(free, node); nr_successful_dequeues++; } - nr_dequeues++; if (unlikely(!test_duration_dequeue())) break; diff --git a/urcu/rculfqueue-static.h b/urcu/rculfqueue-static.h index d7c359f..be6a4bc 100644 --- a/urcu/rculfqueue-static.h +++ b/urcu/rculfqueue-static.h @@ -53,17 +53,24 @@ void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) urcu_ref_init(&node->ref); } -void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q) +void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, + void (*release)(struct urcu_ref *ref)) { _cds_lfq_node_init_rcu(&q->init); /* Make sure the initial node is never freed. */ urcu_ref_set(&q->init.ref, URCU_LFQ_PERMANENT_REF); q->head = q->tail = &q->init; + q->release = release; } -void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node) +/* + * Should be called under rcu read lock critical section. + */ +void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, + struct cds_lfq_node_rcu *node) { urcu_ref_get(&node->ref); + node->queue = q; /* * uatomic_cmpxchg() implicit memory barrier orders earlier stores to @@ -73,7 +80,6 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu * for (;;) { struct cds_lfq_node_rcu *tail, *next; - rcu_read_lock(); tail = rcu_dereference(q->tail); /* * Typically expect tail->next to be NULL. @@ -87,7 +93,6 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu * * us to it, that's fine). */ (void) uatomic_cmpxchg(&q->tail, tail, node); - rcu_read_unlock(); return; } else { /* @@ -95,44 +100,39 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu * * further and retry. */ (void) uatomic_cmpxchg(&q->tail, tail, next); - rcu_read_unlock(); continue; } } } /* - * The entry returned by dequeue must be taken care of by doing a urcu_ref_put, - * which calls the release primitive when the reference count drops to zero. A - * grace period must be waited after execution of the release callback before - * performing the actual memory reclamation or modifying the cds_lfq_node_rcu - * structure. + * Should be called under rcu read lock critical section. + * + * The entry returned by dequeue must be taken care of by doing a + * urcu_ref_put after a grace period passes. + * * In other words, the entry lfq node returned by dequeue must not be * modified/re-used/freed until the reference count reaches zero and a grace - * period has elapsed (after the refcount reached 0). + * period has elapsed. */ struct cds_lfq_node_rcu * -_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q, void (*release)(struct urcu_ref *)) +_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) { for (;;) { struct cds_lfq_node_rcu *head, *next; - rcu_read_lock(); head = rcu_dereference(q->head); next = rcu_dereference(head->next); if (next) { if (uatomic_cmpxchg(&q->head, head, next) == head) { - rcu_read_unlock(); - urcu_ref_put(&head->ref, release); + urcu_ref_put(&head->ref, q->release); return next; } else { /* Concurrently pushed, retry */ - rcu_read_unlock(); continue; } } else { /* Empty */ - rcu_read_unlock(); return NULL; } } diff --git a/urcu/rculfqueue.h b/urcu/rculfqueue.h index 7b3f81a..55f15f8 100644 --- a/urcu/rculfqueue.h +++ b/urcu/rculfqueue.h @@ -40,14 +40,19 @@ extern "C" { * which point their reference count will be decremented. */ +struct cds_lfq_queue_rcu; + struct cds_lfq_node_rcu { struct cds_lfq_node_rcu *next; struct urcu_ref ref; + struct cds_lfq_queue_rcu *queue; + struct rcu_head rcu_head; }; struct cds_lfq_queue_rcu { struct cds_lfq_node_rcu *head, *tail; struct cds_lfq_node_rcu init; /* Dummy initialization node */ + void (*release)(struct urcu_ref *ref); }; #ifdef _LGPL_SOURCE @@ -62,21 +67,28 @@ struct cds_lfq_queue_rcu { #else /* !_LGPL_SOURCE */ extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node); -extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q); -extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node); +extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, + void (*release)(struct urcu_ref *ref)); + +/* + * Should be called under rcu read lock critical section. + */ +extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, + struct cds_lfq_node_rcu *node); /* - * The entry returned by dequeue must be taken care of by doing a urcu_ref_put, - * which calls the release primitive when the reference count drops to zero. A - * grace period must be waited after execution of the release callback before - * performing the actual memory reclamation or modifying the cds_lfq_node_rcu - * structure. + * Should be called under rcu read lock critical section. + * + * The entry returned by dequeue must be taken care of by doing a + * urcu_delayed_ref_put, which calls the release primitive after the + * reference count drops to zero _and_ a following grace period passes. + * * In other words, the entry lfq node returned by dequeue must not be * modified/re-used/freed until the reference count reaches zero and a grace * period has elapsed (after the refcount reached 0). */ extern struct cds_lfq_node_rcu * -cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q, void (*release)(struct urcu_ref *)); +cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q); #endif /* !_LGPL_SOURCE */ diff --git a/urcu/rculfstack-static.h b/urcu/rculfstack-static.h index f541d40..7caf3c8 100644 --- a/urcu/rculfstack-static.h +++ b/urcu/rculfstack-static.h @@ -61,6 +61,8 @@ void _cds_lfs_push_rcu(struct cds_lfs_stack_rcu *s, struct cds_lfs_node_rcu *nod } /* + * Should be called under rcu read-side lock. + * * The caller must wait for a grace period to pass before freeing the returned * node or modifying the cds_lfs_node_rcu structure. * Returns NULL if stack is empty. @@ -71,22 +73,18 @@ _cds_lfs_pop_rcu(struct cds_lfs_stack_rcu *s) for (;;) { struct cds_lfs_node_rcu *head; - rcu_read_lock(); head = rcu_dereference(s->head); if (head) { struct cds_lfs_node_rcu *next = rcu_dereference(head->next); if (uatomic_cmpxchg(&s->head, head, next) == head) { - rcu_read_unlock(); return head; } else { /* Concurrent modification. Retry. */ - rcu_read_unlock(); continue; } } else { /* Empty stack */ - rcu_read_unlock(); return NULL; } } diff --git a/urcu/rculfstack.h b/urcu/rculfstack.h index 20ac26c..d759854 100644 --- a/urcu/rculfstack.h +++ b/urcu/rculfstack.h @@ -51,6 +51,8 @@ extern void cds_lfs_init_rcu(struct cds_lfs_stack_rcu *s); extern void cds_lfs_push_rcu(struct cds_lfs_stack_rcu *s, struct cds_lfs_node_rcu *node); /* + * Should be called under rcu read lock critical section. + * * The caller must wait for a grace period to pass before freeing the returned * node or modifying the cds_lfs_node_rcu structure. * Returns NULL if stack is empty.