Revert "CDS API: removal of rcu_read lock/unlock dep, removal of call_rcu argument...
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 3 Sep 2011 13:15:14 +0000 (09:15 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 3 Sep 2011 13:15:14 +0000 (09:15 -0400)
This reverts commit 7618919ae496bda84a2efa4f2ad0abe569892a9e.

Rationale:

I thought about it some more, and had discussions with various people,
and there are a few reasons to go for a scheme where rcu read lock
should be taken by the caller, and to pass call_rcu as a parameter to
the data structure init function:

A) The advantage, as Paul E. McKenney pointed out, is that one single .so
   is enough to support all RCU flavors. Very convenient for external data
   structure containers.

B) It clearly documents where rcu read-side locks are needed, so the user
   keep control and in-depth understanding of their read-side locks.

C) When multiple API functions that require RCU read-side lock to be
   held (sometimes even the same lock) throughout a sequence of API
   calls, we have no choice but to let the caller hold the read-side
   lock.

D) Due to support of multiple nesting of rcu read-side lock, any
   "improvement" we could get by releasing the read-side lock in
   retry loops would vanish in the cases where we are called within
   nested C.S..

E) If a library uses synchronize_rcu, this should be clearly documented,
   and even frowned upon, because this involves important limitations on
   the design of the caller, and important performance hit. There are
   usually ways to reach the same result through use of call_rcu, which
   should really be used thoroughout these libraries.

F) It clearly documents when a data structure needs to use call_rcu
   internally.

G) Some very early benchmark results show that there is indeed not
   much performance gain to achieve by inlining call_rcu, even if it is
   a version with a cache for the "call_rcu structure" lookup
   (per-cpu/per-thread/global). So passing it as a parameter to
   the data structure init function should be fine, even in cases
   where it is called very often.

H) For use-cases where applications would like to use more than one
   RCU flavor concurrently (which is now supported), leaving management
   of RCU read-side C.S. to the reader allows the application to take
   more than one RCU read-side lock across API calls. It also lets the
   application specify its own call_rcu function that could handle more
   than one RCU flavor.

So for all these reasons, I reverting back to the API we have in our
last release (0.6.4).

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
rculfqueue.c
tests/test_urcu_lfq.c
tests/test_urcu_lfs.c
urcu/rculfqueue.h
urcu/rculfstack.h
urcu/static/rculfqueue.h
urcu/static/rculfstack.h

index c579e75a759e4e25d0f1780ecabd5c0b1bcd3e2a..09b858791d8a8babc0bff46741e188b423fd23ea 100644 (file)
@@ -44,9 +44,11 @@ void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
        _cds_lfq_node_init_rcu(node);
 }
 
-void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
+void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+                     void queue_call_rcu(struct rcu_head *head,
+                               void (*func)(struct rcu_head *head)))
 {
-       _cds_lfq_init_rcu(q);
+       _cds_lfq_init_rcu(q, queue_call_rcu);
 }
 
 int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q)
index 82a90b0cdecc23f05c3572cf6b6819f00ac72531..b61a7d4815788af2bcd9e197dc319a8eb053791a 100644 (file)
@@ -180,7 +180,9 @@ void *thr_enqueuer(void *_count)
                if (!node)
                        goto fail;
                cds_lfq_node_init_rcu(node);
+               rcu_read_lock();
                cds_lfq_enqueue_rcu(&q, node);
+               rcu_read_unlock();
                nr_successful_enqueues++;
 
                if (unlikely(wdelay))
@@ -228,7 +230,10 @@ void *thr_dequeuer(void *_count)
        for (;;) {
                struct cds_lfq_node_rcu *node;
 
+               rcu_read_lock();
                node = cds_lfq_dequeue_rcu(&q);
+               rcu_read_unlock();
+
                if (node) {
                        defer_rcu(free, node);
                        nr_successful_dequeues++;
@@ -257,7 +262,9 @@ void test_end(struct cds_lfq_queue_rcu *q, unsigned long long *nr_dequeues)
        struct cds_lfq_node_rcu *node;
 
        do {
+               rcu_read_lock();
                node = cds_lfq_dequeue_rcu(q);
+               rcu_read_unlock();
                if (node) {
                        free(node);     /* no more concurrent access */
                        (*nr_dequeues)++;
@@ -356,7 +363,7 @@ int main(int argc, char **argv)
        tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
        count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers);
        count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers);
-       cds_lfq_init_rcu(&q);
+       cds_lfq_init_rcu(&q, call_rcu);
 
        next_aff = 0;
 
index a7f9af3abd5086ee21e73aa0f3269530e612886a..252454d29aeb46d4758e30b2a7f6b864cc8e3b65 100644 (file)
@@ -229,7 +229,9 @@ void *thr_dequeuer(void *_count)
        for (;;) {
                struct cds_lfs_node_rcu *node;
 
+               rcu_read_lock();
                node = cds_lfs_pop_rcu(&s);
+               rcu_read_unlock();
                if (node) {
                        defer_rcu(free, node);
                        nr_successful_dequeues++;
index 1582694e92abbcfc5bfc8104f2016fb480f41aff..e1d64f13d5229e251674919189c8f28d780c8d50 100644 (file)
@@ -39,6 +39,8 @@ struct cds_lfq_node_rcu {
 
 struct cds_lfq_queue_rcu {
        struct cds_lfq_node_rcu *head, *tail;
+       void (*queue_call_rcu)(struct rcu_head *head,
+               void (*func)(struct rcu_head *head));
 };
 
 #ifdef _LGPL_SOURCE
@@ -78,7 +80,9 @@ struct cds_lfq_queue_rcu {
 #else /* !_LGPL_SOURCE */
 
 extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node);
-extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q);
+extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+                            void queue_call_rcu(struct rcu_head *head,
+                                       void (*func)(struct rcu_head *head)));
 /*
  * The queue should be emptied before calling destroy.
  *
@@ -87,13 +91,13 @@ extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q);
 extern int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q);
 
 /*
- * Acts as a RCU reader.
+ * Should be called under rcu read lock critical section.
  */
 extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
                                struct cds_lfq_node_rcu *node);
 
 /*
- * Acts as a RCU reader.
+ * Should be called under rcu read lock critical section.
  *
  * The caller must wait for a grace period to pass before freeing the returned
  * node or modifying the cds_lfq_node_rcu structure.
index cf00daf26cc65cf09d300dafb91dd73467cf6f42..7d359d4ce34ce7dc5a02cfd0c3207f1ec9c1d720 100644 (file)
@@ -72,7 +72,7 @@ extern int cds_lfs_push_rcu(struct cds_lfs_stack_rcu *s,
                        struct cds_lfs_node_rcu *node);
 
 /*
- * Acts as a RCU reader.
+ * Should be called under rcu read lock critical section.
  *
  * The caller must wait for a grace period to pass before freeing the returned
  * node or modifying the cds_lfs_node_rcu structure.
index 99335c41ec41d33078279e2f843431111cf307e8..fea6110b4f466cb011d7f3e36f8b923bbd1f75b2 100644 (file)
@@ -88,7 +88,7 @@ void rcu_free_dummy(struct cds_lfq_node_rcu *node)
 
        assert(node->dummy);
        dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent);
-       call_rcu(&dummy->head, free_dummy_cb);
+       dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb);
 }
 
 static inline
@@ -109,10 +109,13 @@ void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
 }
 
 static inline
-void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
+void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
+                      void queue_call_rcu(struct rcu_head *head,
+                               void (*func)(struct rcu_head *head)))
 {
        q->tail = make_dummy(q, NULL);
        q->head = q->tail;
+       q->queue_call_rcu = queue_call_rcu;
 }
 
 /*
@@ -133,7 +136,7 @@ int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q)
 }
 
 /*
- * Acts as a RCU reader.
+ * Should be called under rcu read lock critical section.
  */
 static inline
 void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
@@ -147,7 +150,6 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
        for (;;) {
                struct cds_lfq_node_rcu *tail, *next;
 
-               rcu_read_lock();
                tail = rcu_dereference(q->tail);
                next = uatomic_cmpxchg(&tail->next, NULL, node);
                if (next == NULL) {
@@ -157,7 +159,6 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
                         * enqueue might beat us to it, that's fine).
                         */
                        (void) uatomic_cmpxchg(&q->tail, tail, node);
-                       rcu_read_unlock();
                        return;
                } else {
                        /*
@@ -165,7 +166,6 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
                         * Help moving tail further and retry.
                         */
                        (void) uatomic_cmpxchg(&q->tail, tail, next);
-                       rcu_read_unlock();
                        continue;
                }
        }
@@ -182,7 +182,7 @@ void enqueue_dummy(struct cds_lfq_queue_rcu *q)
 }
 
 /*
- * Acts as a RCU reader.
+ * Should be called under rcu read lock critical section.
  *
  * The caller must wait for a grace period to pass before freeing the returned
  * node or modifying the cds_lfq_node_rcu structure.
@@ -194,13 +194,10 @@ struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
        for (;;) {
                struct cds_lfq_node_rcu *head, *next;
 
-               rcu_read_lock();
                head = rcu_dereference(q->head);
                next = rcu_dereference(head->next);
-               if (head->dummy && next == NULL) {
-                       rcu_read_unlock();
+               if (head->dummy && next == NULL)
                        return NULL;    /* empty */
-               }
                /*
                 * We never, ever allow dequeue to get to a state where
                 * the queue is empty (we need at least one node in the
@@ -212,17 +209,13 @@ struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
                        enqueue_dummy(q);
                        next = rcu_dereference(head->next);
                }
-               if (uatomic_cmpxchg(&q->head, head, next) != head) {
-                       rcu_read_unlock();
+               if (uatomic_cmpxchg(&q->head, head, next) != head)
                        continue;       /* Concurrently pushed. */
-               }
                if (head->dummy) {
                        /* Free dummy after grace period. */
                        rcu_free_dummy(head);
-                       rcu_read_unlock();
                        continue;       /* try again */
                }
-               rcu_read_unlock();
                return head;
        }
 }
index 1df121b461389fbb0b621e6eeefb37c4b4e131eb..4259e0c00bb9e821568b3720c0d46fe8d23ea8ca 100644 (file)
@@ -90,7 +90,7 @@ int _cds_lfs_push_rcu(struct cds_lfs_stack_rcu *s,
 }
 
 /*
- * Acts as a RCU reader.
+ * Should be called under rcu read-side lock.
  *
  * The caller must wait for a grace period to pass before freeing the returned
  * node or modifying the cds_lfs_node_rcu structure.
@@ -103,22 +103,18 @@ _cds_lfs_pop_rcu(struct cds_lfs_stack_rcu *s)
        for (;;) {
                struct cds_lfs_node_rcu *head;
 
-               rcu_read_lock();
                head = rcu_dereference(s->head);
                if (head) {
                        struct cds_lfs_node_rcu *next = rcu_dereference(head->next);
 
                        if (uatomic_cmpxchg(&s->head, head, next) == head) {
-                               rcu_read_unlock();
                                return head;
                        } else {
                                /* Concurrent modification. Retry. */
-                               rcu_read_unlock();
                                continue;
                        }
                } else {
                        /* Empty stack */
-                       rcu_read_unlock();
                        return NULL;
                }
        }
This page took 0.030837 seconds and 4 git commands to generate.