Implement Lai Jiangshan's algorithm in RCU lock-free queue
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 25 Jul 2011 18:34:05 +0000 (14:34 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 25 Jul 2011 18:34:05 +0000 (14:34 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
rculfqueue.c
tests/test_urcu_lfq.c
urcu/rculfqueue.h
urcu/static/rculfqueue.h

index 09ba9cf41fd8cde6f0cc58becba35066c95fada1..eac5e8b39c3350d5ca98d0f82b17303bfe65aa57 100644 (file)
@@ -38,11 +38,14 @@ void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
        _cds_lfq_node_init_rcu(node);
 }
 
-void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
-                     void queue_call_rcu(struct rcu_head *head,
-                               void (*func)(struct rcu_head *head)))
+void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
 {
-       _cds_lfq_init_rcu(q, queue_call_rcu);
+       _cds_lfq_init_rcu(q);
+}
+
+int cds_lfq_is_empty(struct cds_lfq_queue_rcu *q)
+{
+       return _cds_lfq_is_empty(q);
 }
 
 int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q)
index b61a7d4815788af2bcd9e197dc319a8eb053791a..0132d020b3883323909bd85f124e3c00dd358ae6 100644 (file)
@@ -363,7 +363,7 @@ int main(int argc, char **argv)
        tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers);
        count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers);
        count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers);
-       cds_lfq_init_rcu(&q, call_rcu);
+       cds_lfq_init_rcu(&q);
 
        next_aff = 0;
 
index 598fa5071a2ca727e43d07a67cb2a62eb4980632..c2014aa702a7696bb294188baf7b832155bf59f8 100644 (file)
@@ -6,7 +6,8 @@
  *
  * Userspace RCU library - Lock-Free RCU Queue
  *
- * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright 2011 - Lai Jiangshan <laijs@cn.fujitsu.com>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -33,14 +34,12 @@ extern "C" {
 struct cds_lfq_queue_rcu;
 
 struct cds_lfq_node_rcu {
-       struct cds_lfq_node_rcu *next;
-       int dummy;
+       unsigned long next;
 };
 
 struct cds_lfq_queue_rcu {
-       struct cds_lfq_node_rcu *head, *tail;
-       void (*queue_call_rcu)(struct rcu_head *head,
-               void (*func)(struct rcu_head *head));
+       unsigned long tail;
+       struct cds_lfq_node_rcu head;
 };
 
 #ifdef _LGPL_SOURCE
@@ -49,6 +48,7 @@ struct cds_lfq_queue_rcu {
 
 #define cds_lfq_node_init_rcu          _cds_lfq_node_init_rcu
 #define cds_lfq_init_rcu               _cds_lfq_init_rcu
+#define cds_lfq_is_empty               _cds_lfq_is_empty
 #define cds_lfq_destroy_rcu            _cds_lfq_destroy_rcu
 #define cds_lfq_enqueue_rcu            _cds_lfq_enqueue_rcu
 #define cds_lfq_dequeue_rcu            _cds_lfq_dequeue_rcu
@@ -56,9 +56,10 @@ struct cds_lfq_queue_rcu {
 #else /* !_LGPL_SOURCE */
 
 extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node);
-extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
-                            void queue_call_rcu(struct rcu_head *head,
-                                       void (*func)(struct rcu_head *head)));
+extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q);
+
+extern int cds_lfq_is_empty(struct cds_lfq_queue_rcu *q);
+
 /*
  * The queue should be emptied before calling destroy.
  *
index fea6110b4f466cb011d7f3e36f8b923bbd1f75b2..988f982c19b558f78129d77b6237d1bc7bc4d573 100644 (file)
@@ -6,7 +6,8 @@
  *
  * Userspace RCU library - Lock-Free RCU Queue
  *
- * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright 2011 - Lai Jiangshan <laijs@cn.fujitsu.com>
  *
  * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See rculfqueue.h for linking
  * dynamically with the userspace rcu library.
 extern "C" {
 #endif
 
-struct cds_lfq_node_rcu_dummy {
-       struct cds_lfq_node_rcu parent;
-       struct rcu_head head;
-       struct cds_lfq_queue_rcu *q;
+enum node_type {
+       NODE_NODE = 0,
+       NODE_HEAD = 1,
+       NODE_NULL = 2, /* transitional */
 };
 
+#define NODE_TYPE_BITS 2
+#define NODE_TYPE_MASK ((1UL << NODE_TYPE_BITS) - 1)
+
 /*
- * Lock-free RCU queue. Enqueue and dequeue operations hold a RCU read
- * lock to deal with cmpxchg ABA problem. This queue is *not* circular:
- * head points to the oldest node, tail points to the newest node.
- * A dummy node is kept to ensure enqueue and dequeue can always proceed
- * concurrently. Keeping a separate head and tail helps with large
- * queues: enqueue and dequeue can proceed concurrently without
- * wrestling for exclusive access to the same variables.
+ * Lock-free RCU queue.
+ *
+ * Node addresses must be allocated on multiples of 4 bytes, because the
+ * two bottom bits are used internally.  "Special" HEAD and NULL node
+ * references use a sequence counter (rather than an address).  The
+ * sequence count is incremented as elements are enqueued.  Enqueue and
+ * dequeue operations hold a RCU read lock to deal with uatomic_cmpxchg
+ * ABA problem on standard node addresses. The sequence count of HEAD
+ * and NULL nodes deals with ABA problem with these nodes.
+ *
+ * Keeping a sequence count throughout the list allows dealing with
+ * dequeue-the-last/enqueue-the-first operations without need for adding
+ * any dummy node in the queue.
  *
- * Dequeue retry if it detects that it would be dequeueing the last node
- * (it means a dummy node dequeue-requeue is in progress). This ensures
- * that there is always at least one node in the queue.
+ * This queue is not circular.  The head node is located prior to the
+ * oldest node, tail points to the newest node.
  *
- * In the dequeue operation, we internally reallocate the dummy node
- * upon dequeue/requeue and use call_rcu to free the old one after a
- * grace period.
+ * Keeping a separate head and tail helps with large queues: enqueue and
+ * dequeue can proceed concurrently without wrestling for exclusive
+ * access to the same variables.
  */
 
 static inline
-struct cds_lfq_node_rcu *make_dummy(struct cds_lfq_queue_rcu *q,
-                                   struct cds_lfq_node_rcu *next)
+enum node_type queue_node_type(unsigned long node)
 {
-       struct cds_lfq_node_rcu_dummy *dummy;
-
-       dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy));
-       assert(dummy);
-       dummy->parent.next = next;
-       dummy->parent.dummy = 1;
-       dummy->q = q;
-       return &dummy->parent;
+       return node & NODE_TYPE_MASK;
 }
 
 static inline
-void free_dummy_cb(struct rcu_head *head)
+unsigned long queue_node_seq(unsigned long node)
 {
-       struct cds_lfq_node_rcu_dummy *dummy =
-               caa_container_of(head, struct cds_lfq_node_rcu_dummy, head);
-       free(dummy);
+       assert(queue_node_type(node) == NODE_HEAD
+               || queue_node_type(node) == NODE_NULL);
+       return node >> NODE_TYPE_BITS;
 }
 
 static inline
-void rcu_free_dummy(struct cds_lfq_node_rcu *node)
+struct cds_lfq_node_rcu *queue_node_node(unsigned long node)
 {
-       struct cds_lfq_node_rcu_dummy *dummy;
+       assert(queue_node_type(node) == NODE_NODE);
+       return (void *) (node & ~NODE_TYPE_MASK);
+}
 
-       assert(node->dummy);
-       dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent);
-       dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb);
+static inline
+unsigned long queue_make_node(struct cds_lfq_node_rcu *node)
+{
+       return ((unsigned long) node) | NODE_NODE;
 }
 
 static inline
-void free_dummy(struct cds_lfq_node_rcu *node)
+unsigned long queue_make_head(unsigned long seq)
 {
-       struct cds_lfq_node_rcu_dummy *dummy;
+       return (seq << NODE_TYPE_BITS) | NODE_HEAD;
+}
 
-       assert(node->dummy);
-       dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent);
-       free(dummy);
+static inline
+unsigned long queue_make_null(unsigned long seq)
+{
+       return (seq << NODE_TYPE_BITS) | NODE_NULL;
 }
 
+
 static inline
 void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
 {
-       node->next = NULL;
-       node->dummy = 0;
+       /* Kept here for object debugging. */
+}
+
+static inline
+void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
+{
+       q->head.next = queue_make_head(0);
+       q->tail = queue_make_head(0);
 }
 
 static inline
-void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
-                      void queue_call_rcu(struct rcu_head *head,
-                               void (*func)(struct rcu_head *head)))
+int _cds_lfq_is_empty(struct cds_lfq_queue_rcu *q)
 {
-       q->tail = make_dummy(q, NULL);
-       q->head = q->tail;
-       q->queue_call_rcu = queue_call_rcu;
+       unsigned long head, next;
+       struct cds_lfq_node_rcu *phead;
+
+       head = rcu_dereference(q->head.next);
+       if (queue_node_type(head) == NODE_HEAD) {
+               /* F0 or T0b */
+               return 1;
+       }
+
+       phead = queue_node_node(head);
+       next = rcu_dereference(phead->next);
+
+       if (queue_node_type(next) == NODE_HEAD) { /* T1, F1 */
+               /* only one node */
+               return 0;
+       } else if (queue_node_type(next) == NODE_NODE) {
+               /* have >=2 nodes, Tn{n>=2} Fn{n>=2} */
+               return 0;
+       } else {
+               /* T0a */
+               assert(queue_node_type(next) == NODE_NULL);
+               return 1;
+       }
 }
 
 /*
@@ -126,98 +157,223 @@ void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
 static inline
 int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q)
 {
-       struct cds_lfq_node_rcu *head;
-
-       head = rcu_dereference(q->head);
-       if (!(head->dummy && head->next == NULL))
-               return -EPERM;  /* not empty */
-       free_dummy(head);
+       if (!_cds_lfq_is_empty(q))
+               return -EPERM;
+       /* Kept here for object debugging. */
        return 0;
 }
 
+static void __queue_post_dequeue_the_last(struct cds_lfq_queue_rcu *q,
+               unsigned long old_head, unsigned long new_head);
+
+/*
+ * Lock-free queue uses rcu_read_lock() to proctect the life-time
+ * of the nodes and to prevent ABA-problem.
+ *
+ * cds_lfq_enqueue_rcu() and cds_lfq_dequeue_rcu() need to be called
+ * under rcu read lock critical section.  The node returned by
+ * queue_dequeue() must not be modified/re-used/freed until a
+ * grace-period passed.
+ */
+
+static inline
+unsigned long __queue_enqueue(struct cds_lfq_queue_rcu *q,
+               unsigned long tail, unsigned long next,
+               struct cds_lfq_node_rcu *ptail, struct cds_lfq_node_rcu *pnode)
+{
+       unsigned long newnext;
+
+       /* increase the seq for every enqueued node */
+       pnode->next = queue_make_head(queue_node_seq(next) + 1);
+
+       /* Fn(seq) -> T(n+1)(seq+1) */
+       newnext = uatomic_cmpxchg(&ptail->next, next, queue_make_node(pnode));
+
+       if (newnext != next)
+               return newnext;
+
+       /* success, move tail(or done by other), T(n+1) -> F(n+1) */
+       uatomic_cmpxchg(&q->tail, tail, queue_make_node(pnode));
+       return next;
+}
+
 /*
- * Should be called under rcu read lock critical section.
+ * Needs to be called with RCU read-side lock held.
  */
 static inline
 void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
-                         struct cds_lfq_node_rcu *node)
+                         struct cds_lfq_node_rcu *pnode)
 {
-       /*
-        * uatomic_cmpxchg() implicit memory barrier orders earlier stores to
-        * node before publication.
-        */
+       unsigned long tail, next;
+       struct cds_lfq_node_rcu *ptail;
 
        for (;;) {
-               struct cds_lfq_node_rcu *tail, *next;
-
                tail = rcu_dereference(q->tail);
-               next = uatomic_cmpxchg(&tail->next, NULL, node);
-               if (next == NULL) {
-                       /*
-                        * Tail was at the end of queue, we successfully
-                        * appended to it. Now move tail (another
-                        * enqueue might beat us to it, that's fine).
-                        */
-                       (void) uatomic_cmpxchg(&q->tail, tail, node);
-                       return;
-               } else {
+               if (queue_node_type(tail) == NODE_HEAD) { /* F0 */
+                       ptail = &q->head;
+                       next = tail;
                        /*
-                        * Failure to append to current tail.
-                        * Help moving tail further and retry.
+                        * We cannot use "next = rcu_dereference(ptail->next);"
+                        * here, because it is control dependency, not data
+                        * dependency. But since F0 is the most likely state
+                        * when 0 node, so we use 'next = tail'.
                         */
-                       (void) uatomic_cmpxchg(&q->tail, tail, next);
-                       continue;
+               } else { /* Fn, Tn */
+                       ptail = queue_node_node(tail);
+                       next = rcu_dereference(ptail->next);
+               }
+
+               if (queue_node_type(next) == NODE_HEAD) { /* Fn */
+                       unsigned long newnext;
+
+                       /* Fn{n>=0} -> F(n+1) */
+                       newnext = __queue_enqueue(q, tail, next, ptail, pnode);
+                       if (newnext == next) {
+                               return;
+                       }
+                       next = newnext;
+               }
+
+               if (queue_node_type(next) == NODE_NODE) { /* Tn */
+                       /* help moving tail, Tn{n>=1} -> Fn */
+                       uatomic_cmpxchg(&q->tail, tail, next);
+               } else if (queue_node_type(next) == NODE_NULL) {
+                       /* help finishing dequeuing the last, T0a or T0b -> F0 */
+                       __queue_post_dequeue_the_last(q, tail,
+                                       queue_make_head(queue_node_seq(next)));
                }
        }
 }
 
 static inline
-void enqueue_dummy(struct cds_lfq_queue_rcu *q)
+void __queue_post_dequeue_the_last(struct cds_lfq_queue_rcu *q,
+               unsigned long old_head, unsigned long new_head)
+{
+       /* step2: T0a -> T0b */
+       uatomic_cmpxchg(&q->head.next, old_head, new_head);
+
+       /* step3: T0b -> F0 */
+       uatomic_cmpxchg(&q->tail, old_head, new_head);
+}
+
+static inline
+int __queue_dequeue_the_last(struct cds_lfq_queue_rcu *q,
+               unsigned long head, unsigned long next,
+               struct cds_lfq_node_rcu *plast)
 {
-       struct cds_lfq_node_rcu *node;
+       unsigned long origin_tail = rcu_dereference(q->tail);
 
-       /* We need to reallocate to protect from ABA. */
-       node = make_dummy(q, NULL);
-       _cds_lfq_enqueue_rcu(q, node);
+       /*
+        * T1 -> F1 if T1, we cannot dequeue the last node when T1.
+        *
+        * pseudocode is:
+        *   tail = rcu_dereference(q->tail); (*)
+        *   if (tail == queue_make_head(seq - 1))
+        *     uatomic_cmpxchg(&q->tail, tail, head);
+        * But we only expect (*) gets tail's value is:
+        *     head                             (F1)(likely got)
+        *     queue_make_head(seq - 1)         (T1)
+        * not newer nor older value, so the pseudocode is not acceptable.
+        */
+       if (origin_tail != head) {
+               unsigned long tail;
+
+               /* Don't believe the orderless-read tail! */
+               origin_tail = queue_make_head(queue_node_seq(next) - 1);
+
+               /* help moving tail, T1 -> F1 */
+               tail = uatomic_cmpxchg(&q->tail, origin_tail, head);
+
+               if (tail != origin_tail && tail != head)
+                       return 0;
+       }
+
+       /* step1: F1 -> T0a */
+       if (uatomic_cmpxchg(&plast->next, next, queue_make_null(queue_node_seq(next))) != next)
+               return 0;
+
+       __queue_post_dequeue_the_last(q, head, next);
+       return 1;
+}
+
+static inline
+int __queue_dequeue(struct cds_lfq_queue_rcu *q,
+               unsigned long head, unsigned long next)
+{
+       struct cds_lfq_node_rcu *pnext = queue_node_node(next);
+       unsigned long nextnext = rcu_dereference(pnext->next);
+
+       /*
+        * T2 -> F2 if T2, we cannot dequeue the first node when T2.
+        *
+        * pseudocode is:
+        *   tail = rcu_dereference(q->tail); (*)
+        *   if (tail == head)
+        *     uatomic_cmpxchg(&q->tail, head, next);
+        * But we only expect (*) gets tail's value is:
+        *     node in the queue
+        * not older value, the older value cause us save a uatomic_cmpxchg() wrongly,
+        * so the pseudocode is not acceptable.
+        *
+        * using uatomic_cmpxchg always is OK, but it adds a uatomic_cmpxchg overhead always:
+        *   uatomic_cmpxchg(&q->tail, head, next);
+        */
+       if (queue_node_type(nextnext) == NODE_HEAD) { /* 2 nodes */
+               unsigned long tail = rcu_dereference(q->tail);
+
+               /*
+                * tail == next: now is F2, don't need help moving tail
+                * tail != next: it is unlikely when 2 nodes.
+                * Don't believe the orderless-read tail!
+                */
+               if (tail != next)
+                       uatomic_cmpxchg(&q->tail, head, next); /* help for T2 -> F2 */
+       }
+
+       /* Fn{n>=2} -> F(n-1), Tn{n>=3} -> T(n-1) */
+       if (uatomic_cmpxchg(&q->head.next, head, next) != head)
+               return 0;
+
+       return 1;
 }
 
 /*
- * Should be called under rcu read lock critical section.
- *
- * The caller must wait for a grace period to pass before freeing the returned
- * node or modifying the cds_lfq_node_rcu structure.
- * Returns NULL if queue is empty.
+ * Needs to be called with rcu read-side lock held.
+ * Wait for a grace period before freeing/reusing the returned node.
+ * If NULL is returned, the queue is empty.
  */
 static inline
 struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
 {
-       for (;;) {
-               struct cds_lfq_node_rcu *head, *next;
+       unsigned long head, next;
+       struct cds_lfq_node_rcu *phead;
 
-               head = rcu_dereference(q->head);
-               next = rcu_dereference(head->next);
-               if (head->dummy && next == NULL)
-                       return NULL;    /* empty */
-               /*
-                * We never, ever allow dequeue to get to a state where
-                * the queue is empty (we need at least one node in the
-                * queue). This is ensured by checking if the head next
-                * is NULL, which means we need to enqueue a dummy node
-                * before we can hope dequeuing anything.
-                */
-               if (!next) {
-                       enqueue_dummy(q);
-                       next = rcu_dereference(head->next);
+       for (;;) {
+               head = rcu_dereference(q->head.next);
+               if (queue_node_type(head) == NODE_HEAD) {
+                       /* F0 or T0b */
+                       return NULL;
                }
-               if (uatomic_cmpxchg(&q->head, head, next) != head)
-                       continue;       /* Concurrently pushed. */
-               if (head->dummy) {
-                       /* Free dummy after grace period. */
-                       rcu_free_dummy(head);
-                       continue;       /* try again */
+
+               phead = queue_node_node(head);
+               next = rcu_dereference(phead->next);
+
+               if (queue_node_type(next) == NODE_HEAD) { /* T1, F1 */
+                       /* dequeue when only one node */
+                       if (__queue_dequeue_the_last(q, head, next, phead))
+                               goto done;
+               } else if (queue_node_type(next) == NODE_NODE) {
+                       /* dequeue when have >=2 nodes, Tn{n>=2} Fn{n>=2} */
+                       if (__queue_dequeue(q, head, next))
+                               goto done;
+               } else {
+                       /* T0a */
+                       assert(queue_node_type(next) == NODE_NULL);
+                       return NULL;
                }
-               return head;
        }
+done:
+       return phead;
 }
 
 #ifdef __cplusplus
This page took 0.034368 seconds and 4 git commands to generate.