wfcqueue: implement mutex-free splice
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 20 Nov 2012 02:45:04 +0000 (21:45 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 5 Dec 2012 10:48:00 +0000 (05:48 -0500)
A carefully crafted splice operation does not need to use an external
mutex to synchronize against other splice operations.

The trick is atomically exchange the head next pointer with
NULL. If the pointer we replaced was NULL, it means the queue was
possibly empty. If head next was not NULL, by setting head to NULL, we
ensure that concurrent splice operations are going to see an empty
queue, even if concurrent enqueue operations move tail further. This
means that as long as we are within splice, after setting head to NULL,
but before moving tail back to head, concurrent splice operations will
always see an empty queue, therefore acting as mutual exclusion.

If exchange returns a NULL head, we confirm that it was indeed empty by
checking if the tail pointer points to the head node, busy-waiting if
necessary.

Then the last step is to move the tail pointer to head. At that point,
enqueuers are going to start enqueuing at head again, and other splice
operations will be able to proceed.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
urcu/static/wfcqueue.h
urcu/wfcqueue.h
wfcqueue.c

index 8774c0375eff1aa6cd458f39697cd1b7a39ef173..99643be98c393310bc36d591262fcddd5f0227af 100644 (file)
@@ -46,15 +46,30 @@ extern "C" {
  * half-wait-free/half-blocking queue implementation done by Paul E.
  * McKenney.
  *
- * Mutual exclusion of __cds_wfcq_* API
- *
- * Unless otherwise stated, the caller must ensure mutual exclusion of
- * queue update operations "dequeue" and "splice" (for source queue).
- * Queue read operations "first" and "next", which are used by
- * "for_each" iterations, need to be protected against concurrent
- * "dequeue" and "splice" (for source queue) by the caller.
- * "enqueue", "splice" (for destination queue), and "empty" are the only
- * operations that can be used without any mutual exclusion.
+ * Mutual exclusion of cds_wfcq_* / __cds_wfcq_* API
+ *
+ * Synchronization table:
+ *
+ * External synchronization techniques described in the API below is
+ * required between pairs marked with "X". No external synchronization
+ * required between pairs marked with "-".
+ *
+ * Legend:
+ * [1] cds_wfcq_enqueue
+ * [2] __cds_wfcq_splice (destination queue)
+ * [3] __cds_wfcq_dequeue
+ * [4] __cds_wfcq_splice (source queue)
+ * [5] __cds_wfcq_first
+ * [6] __cds_wfcq_next
+ *
+ *     [1] [2] [3] [4] [5] [6]
+ * [1]  -   -   -   -   -   -
+ * [2]  -   -   -   -   -   -
+ * [3]  -   -   X   X   X   X
+ * [4]  -   -   X   -   X   X
+ * [5]  -   -   X   X   -   -
+ * [6]  -   -   X   X   -   -
+ *
  * Mutual exclusion can be ensured by holding cds_wfcq_dequeue_lock().
  *
  * For convenience, cds_wfcq_dequeue_blocking() and
@@ -182,6 +197,25 @@ static inline bool _cds_wfcq_enqueue(struct cds_wfcq_head *head,
        return ___cds_wfcq_append(head, tail, new_tail, new_tail);
 }
 
+/*
+ * ___cds_wfcq_busy_wait: adaptative busy-wait.
+ *
+ * Returns 1 if nonblocking and needs to block, 0 otherwise.
+ */
+static inline bool
+___cds_wfcq_busy_wait(int *attempt, int blocking)
+{
+       if (!blocking)
+               return 1;
+       if (++(*attempt) >= WFCQ_ADAPT_ATTEMPTS) {
+               poll(NULL, 0, WFCQ_WAIT);       /* Wait for 10ms */
+               *attempt = 0;
+       } else {
+               caa_cpu_relax();
+       }
+       return 0;
+}
+
 /*
  * Waiting for enqueuer to complete enqueue and return the next node.
  */
@@ -195,14 +229,8 @@ ___cds_wfcq_node_sync_next(struct cds_wfcq_node *node, int blocking)
         * Adaptative busy-looping waiting for enqueuer to complete enqueue.
         */
        while ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
-               if (!blocking)
+               if (___cds_wfcq_busy_wait(&attempt, blocking))
                        return CDS_WFCQ_WOULDBLOCK;
-               if (++attempt >= WFCQ_ADAPT_ATTEMPTS) {
-                       poll(NULL, 0, WFCQ_WAIT);       /* Wait for 10ms */
-                       attempt = 0;
-               } else {
-                       caa_cpu_relax();
-               }
        }
 
        return next;
@@ -399,6 +427,16 @@ ___cds_wfcq_dequeue_nonblocking(struct cds_wfcq_head *head,
        return ___cds_wfcq_dequeue(head, tail, 0);
 }
 
+/*
+ * __cds_wfcq_splice: enqueue all src_q nodes at the end of dest_q.
+ *
+ * Dequeue all nodes from src_q.
+ * dest_q must be already initialized.
+ * Mutual exclusion for src_q should be ensured by the caller as
+ * specified in the "Synchronisation table".
+ * Returns enum cds_wfcq_ret which indicates the state of the src or
+ * dest queue.
+ */
 static inline enum cds_wfcq_ret
 ___cds_wfcq_splice(
                struct cds_wfcq_head *dest_q_head,
@@ -408,14 +446,29 @@ ___cds_wfcq_splice(
                int blocking)
 {
        struct cds_wfcq_node *head, *tail;
+       int attempt = 0;
 
+       /*
+        * Initial emptiness check to speed up cases where queue is
+        * empty: only require loads to check if queue is empty.
+        */
        if (_cds_wfcq_empty(src_q_head, src_q_tail))
                return CDS_WFCQ_RET_SRC_EMPTY;
 
-       head = ___cds_wfcq_node_sync_next(&src_q_head->node, blocking);
-       if (head == CDS_WFCQ_WOULDBLOCK)
-               return CDS_WFCQ_RET_WOULDBLOCK;
-       _cds_wfcq_node_init(&src_q_head->node);
+       for (;;) {
+               /*
+                * Open-coded _cds_wfcq_empty() by testing result of
+                * uatomic_xchg, as well as tail pointer vs head node
+                * address.
+                */
+               head = uatomic_xchg(&src_q_head->node.next, NULL);
+               if (head)
+                       break;  /* non-empty */
+               if (CMM_LOAD_SHARED(src_q_tail->p) == &src_q_head->node)
+                       return CDS_WFCQ_RET_SRC_EMPTY;
+               if (___cds_wfcq_busy_wait(&attempt, blocking))
+                       return CDS_WFCQ_RET_WOULDBLOCK;
+       }
 
        /*
         * Memory barrier implied before uatomic_xchg() orders store to
@@ -435,14 +488,13 @@ ___cds_wfcq_splice(
                return CDS_WFCQ_RET_DEST_EMPTY;
 }
 
-
 /*
  * __cds_wfcq_splice_blocking: enqueue all src_q nodes at the end of dest_q.
  *
  * Dequeue all nodes from src_q.
  * dest_q must be already initialized.
- * Dequeue/splice/iteration mutual exclusion for src_q should be ensured
- * by the caller.
+ * Mutual exclusion for src_q should be ensured by the caller as
+ * specified in the "Synchronisation table".
  * Returns enum cds_wfcq_ret which indicates the state of the src or
  * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK.
  */
index ddf6b87c478eb808d72fbeb8242fb294726933c5..d9ec5349d7be203da7f80f6e6c0de5cda96e20e9 100644 (file)
@@ -46,7 +46,7 @@ extern "C" {
 #define CDS_WFCQ_WOULDBLOCK    ((void *) -1UL)
 
 enum cds_wfcq_ret {
-       CDS_WFCQ_RET_WOULDBLOCK =       -1,
+       CDS_WFCQ_RET_WOULDBLOCK =       -1,
        CDS_WFCQ_RET_DEST_EMPTY =       0,
        CDS_WFCQ_RET_DEST_NON_EMPTY =   1,
        CDS_WFCQ_RET_SRC_EMPTY =        2,
@@ -110,13 +110,28 @@ struct cds_wfcq_tail {
 /*
  * Mutual exclusion of cds_wfcq_* / __cds_wfcq_* API
  *
- * Unless otherwise stated, the caller must ensure mutual exclusion of
- * queue update operations "dequeue" and "splice" (for source queue).
- * Queue read operations "first" and "next", which are used by
- * "for_each" iterations, need to be protected against concurrent
- * "dequeue" and "splice" (for source queue) by the caller.
- * "enqueue", "splice" (for destination queue), and "empty" are the only
- * operations that can be used without any mutual exclusion.
+ * Synchronization table:
+ *
+ * External synchronization techniques described in the API below is
+ * required between pairs marked with "X". No external synchronization
+ * required between pairs marked with "-".
+ *
+ * Legend:
+ * [1] cds_wfcq_enqueue
+ * [2] __cds_wfcq_splice (destination queue)
+ * [3] __cds_wfcq_dequeue
+ * [4] __cds_wfcq_splice (source queue)
+ * [5] __cds_wfcq_first
+ * [6] __cds_wfcq_next
+ *
+ *     [1] [2] [3] [4] [5] [6]
+ * [1]  -   -   -   -   -   -
+ * [2]  -   -   -   -   -   -
+ * [3]  -   -   X   X   X   X
+ * [4]  -   -   X   -   X   X
+ * [5]  -   -   X   X   -   -
+ * [6]  -   -   X   X   -   -
+ *
  * Mutual exclusion can be ensured by holding cds_wfcq_dequeue_lock().
  *
  * For convenience, cds_wfcq_dequeue_blocking() and
@@ -231,13 +246,10 @@ extern struct cds_wfcq_node *__cds_wfcq_dequeue_nonblocking(
  *
  * Dequeue all nodes from src_q.
  * dest_q must be already initialized.
- * Content written into the node before enqueue is guaranteed to be
- * consistent, but no other memory ordering is ensured.
- * Dequeue/splice/iteration mutual exclusion for src_q should be ensured
- * by the caller.
- *
+ * Mutual exclusion for src_q should be ensured by the caller as
+ * specified in the "Synchronisation table".
  * Returns enum cds_wfcq_ret which indicates the state of the src or
- * dest queue. Cannot block.
+ * dest queue. Never returns CDS_WFCQ_RET_WOULDBLOCK.
  */
 extern enum cds_wfcq_ret __cds_wfcq_splice_blocking(
                struct cds_wfcq_head *dest_q_head,
index 207df95a5b7a4c3347de699f2fc37886120ac346..ab0eb939e912e15c4b8f44ca9d65d67df0a3b4cd 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * wfcqueue.c
  *
- * Userspace RCU library - Concurrent queue with Wait-Free Enqueue/Blocking Dequeue
+ * Userspace RCU library - Concurrent Queue with Wait-Free Enqueue/Blocking Dequeue
  *
  * Copyright 2010-2012 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  * Copyright 2011-2012 - Lai Jiangshan <laijs@cn.fujitsu.com>
This page took 0.029902 seconds and 4 git commands to generate.