dummy = malloc(sizeof(struct cds_lfq_node_rcu_dummy));
assert(dummy);
dummy->parent.next = next;
+ dummy->parent.dummy = 1;
dummy->q = q;
return &dummy->parent;
}
{
struct cds_lfq_node_rcu_dummy *dummy;
+ assert(node->dummy);
dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent);
- dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb);
+ call_rcu(&dummy->head, free_dummy_cb);
}
static inline
{
struct cds_lfq_node_rcu_dummy *dummy;
+ assert(node->dummy);
dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent);
free(dummy);
}
void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
{
node->next = NULL;
+ node->dummy = 0;
}
static inline
-void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q,
- void queue_call_rcu(struct rcu_head *head,
- void (*func)(struct rcu_head *head)))
+void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
{
q->tail = make_dummy(q, NULL);
- q->dummy = q->tail;
q->head = q->tail;
- q->queue_call_rcu = queue_call_rcu;
}
/*
struct cds_lfq_node_rcu *head;
head = rcu_dereference(q->head);
- if (!(head == q->dummy && head->next == NULL))
+ if (!(head->dummy && head->next == NULL))
return -EPERM; /* not empty */
free_dummy(head);
return 0;
}
/*
- * Should be called under rcu read lock critical section.
+ * Acts as a RCU reader.
*/
static inline
void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
for (;;) {
struct cds_lfq_node_rcu *tail, *next;
+ rcu_read_lock();
tail = rcu_dereference(q->tail);
next = uatomic_cmpxchg(&tail->next, NULL, node);
if (next == NULL) {
* enqueue might beat us to it, that's fine).
*/
(void) uatomic_cmpxchg(&q->tail, tail, node);
+ rcu_read_unlock();
return;
} else {
/*
* Help moving tail further and retry.
*/
(void) uatomic_cmpxchg(&q->tail, tail, next);
+ rcu_read_unlock();
continue;
}
}
}
static inline
-void reenqueue_dummy(struct cds_lfq_queue_rcu *q)
+void enqueue_dummy(struct cds_lfq_queue_rcu *q)
{
struct cds_lfq_node_rcu *node;
/* We need to reallocate to protect from ABA. */
node = make_dummy(q, NULL);
- if (uatomic_cmpxchg(&q->dummy, NULL, node) != NULL) {
- /* other dequeue populated its new dummy */
- free_dummy(node);
- return;
- }
_cds_lfq_enqueue_rcu(q, node);
}
/*
- * Should be called under rcu read lock critical section.
+ * Acts as a RCU reader.
*
* The caller must wait for a grace period to pass before freeing the returned
* node or modifying the cds_lfq_node_rcu structure.
struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
{
for (;;) {
- struct cds_lfq_node_rcu *head, *next, *dummy;
+ struct cds_lfq_node_rcu *head, *next;
+ rcu_read_lock();
head = rcu_dereference(q->head);
next = rcu_dereference(head->next);
- dummy = rcu_dereference(q->dummy);
- if (head == dummy && next == NULL)
+ if (head->dummy && next == NULL) {
+ rcu_read_unlock();
return NULL; /* empty */
+ }
/*
* We never, ever allow dequeue to get to a state where
* the queue is empty (we need at least one node in the
* queue). This is ensured by checking if the head next
- * is NULL. This means a concurrent dummy node
- * re-enqueue is in progress. We help it reach
- * completion, and retry.
+ * is NULL, which means we need to enqueue a dummy node
+ * before we can hope dequeuing anything.
*/
if (!next) {
- /*
- * Dummy node re-enqueue is in progress. Try to
- * help.
- */
- reenqueue_dummy(q);
- continue; /* try again */
+ enqueue_dummy(q);
+ next = rcu_dereference(head->next);
}
- if (uatomic_cmpxchg(&q->head, head, next) != head)
+ if (uatomic_cmpxchg(&q->head, head, next) != head) {
+ rcu_read_unlock();
continue; /* Concurrently pushed. */
- if (head == dummy) {
- /* Free old and requeue new dummy. */
- rcu_set_pointer(&q->dummy, NULL);
- rcu_free_dummy(dummy);
- reenqueue_dummy(q);
+ }
+ if (head->dummy) {
+ /* Free dummy after grace period. */
+ rcu_free_dummy(head);
+ rcu_read_unlock();
continue; /* try again */
}
+ rcu_read_unlock();
return head;
}
}