Implement Lai Jiangshan's algorithm in RCU lock-free queue
[urcu.git] / urcu / static / rculfqueue.h
CommitLineData
3d02c34d
MD
1#ifndef _URCU_RCULFQUEUE_STATIC_H
2#define _URCU_RCULFQUEUE_STATIC_H
3
4/*
5 * rculfqueue-static.h
6 *
7 * Userspace RCU library - Lock-Free RCU Queue
8 *
2d37e098
MD
9 * Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
10 * Copyright 2011 - Lai Jiangshan <laijs@cn.fujitsu.com>
3d02c34d
MD
11 *
12 * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See rculfqueue.h for linking
13 * dynamically with the userspace rcu library.
14 *
15 * This library is free software; you can redistribute it and/or
16 * modify it under the terms of the GNU Lesser General Public
17 * License as published by the Free Software Foundation; either
18 * version 2.1 of the License, or (at your option) any later version.
19 *
20 * This library is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
23 * Lesser General Public License for more details.
24 *
25 * You should have received a copy of the GNU Lesser General Public
26 * License along with this library; if not, write to the Free Software
27 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
28 */
29
e17d9985 30#include <urcu-call-rcu.h>
a2e7bf9c 31#include <urcu/uatomic.h>
3d02c34d 32#include <assert.h>
e17d9985 33#include <errno.h>
3d02c34d
MD
34
35#ifdef __cplusplus
36extern "C" {
37#endif
38
2d37e098
MD
39enum node_type {
40 NODE_NODE = 0,
41 NODE_HEAD = 1,
42 NODE_NULL = 2, /* transitional */
e17d9985
MD
43};
44
2d37e098
MD
45#define NODE_TYPE_BITS 2
46#define NODE_TYPE_MASK ((1UL << NODE_TYPE_BITS) - 1)
47
3d02c34d 48/*
2d37e098
MD
49 * Lock-free RCU queue.
50 *
51 * Node addresses must be allocated on multiples of 4 bytes, because the
52 * two bottom bits are used internally. "Special" HEAD and NULL node
53 * references use a sequence counter (rather than an address). The
54 * sequence count is incremented as elements are enqueued. Enqueue and
55 * dequeue operations hold a RCU read lock to deal with uatomic_cmpxchg
56 * ABA problem on standard node addresses. The sequence count of HEAD
57 * and NULL nodes deals with ABA problem with these nodes.
58 *
59 * Keeping a sequence count throughout the list allows dealing with
60 * dequeue-the-last/enqueue-the-first operations without need for adding
61 * any dummy node in the queue.
e17d9985 62 *
2d37e098
MD
63 * This queue is not circular. The head node is located prior to the
64 * oldest node, tail points to the newest node.
e17d9985 65 *
2d37e098
MD
66 * Keeping a separate head and tail helps with large queues: enqueue and
67 * dequeue can proceed concurrently without wrestling for exclusive
68 * access to the same variables.
3d02c34d
MD
69 */
70
e17d9985 71static inline
2d37e098 72enum node_type queue_node_type(unsigned long node)
e17d9985 73{
2d37e098 74 return node & NODE_TYPE_MASK;
e17d9985
MD
75}
76
77static inline
2d37e098 78unsigned long queue_node_seq(unsigned long node)
e17d9985 79{
2d37e098
MD
80 assert(queue_node_type(node) == NODE_HEAD
81 || queue_node_type(node) == NODE_NULL);
82 return node >> NODE_TYPE_BITS;
e17d9985
MD
83}
84
85static inline
2d37e098 86struct cds_lfq_node_rcu *queue_node_node(unsigned long node)
e17d9985 87{
2d37e098
MD
88 assert(queue_node_type(node) == NODE_NODE);
89 return (void *) (node & ~NODE_TYPE_MASK);
90}
3d02c34d 91
2d37e098
MD
92static inline
93unsigned long queue_make_node(struct cds_lfq_node_rcu *node)
94{
95 return ((unsigned long) node) | NODE_NODE;
f6719811
MD
96}
97
98static inline
2d37e098 99unsigned long queue_make_head(unsigned long seq)
f6719811 100{
2d37e098
MD
101 return (seq << NODE_TYPE_BITS) | NODE_HEAD;
102}
f6719811 103
2d37e098
MD
104static inline
105unsigned long queue_make_null(unsigned long seq)
106{
107 return (seq << NODE_TYPE_BITS) | NODE_NULL;
e17d9985
MD
108}
109
2d37e098 110
e17d9985 111static inline
16aa9ee8 112void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node)
3d02c34d 113{
2d37e098
MD
114 /* Kept here for object debugging. */
115}
116
117static inline
118void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q)
119{
120 q->head.next = queue_make_head(0);
121 q->tail = queue_make_head(0);
3d02c34d
MD
122}
123
e17d9985 124static inline
2d37e098 125int _cds_lfq_is_empty(struct cds_lfq_queue_rcu *q)
e17d9985 126{
2d37e098
MD
127 unsigned long head, next;
128 struct cds_lfq_node_rcu *phead;
129
130 head = rcu_dereference(q->head.next);
131 if (queue_node_type(head) == NODE_HEAD) {
132 /* F0 or T0b */
133 return 1;
134 }
135
136 phead = queue_node_node(head);
137 next = rcu_dereference(phead->next);
138
139 if (queue_node_type(next) == NODE_HEAD) { /* T1, F1 */
140 /* only one node */
141 return 0;
142 } else if (queue_node_type(next) == NODE_NODE) {
143 /* have >=2 nodes, Tn{n>=2} Fn{n>=2} */
144 return 0;
145 } else {
146 /* T0a */
147 assert(queue_node_type(next) == NODE_NULL);
148 return 1;
149 }
e17d9985
MD
150}
151
152/*
153 * The queue should be emptied before calling destroy.
154 *
155 * Return 0 on success, -EPERM if queue is not empty.
156 */
157static inline
158int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q)
3d02c34d 159{
2d37e098
MD
160 if (!_cds_lfq_is_empty(q))
161 return -EPERM;
162 /* Kept here for object debugging. */
e17d9985 163 return 0;
3d02c34d
MD
164}
165
2d37e098
MD
166static void __queue_post_dequeue_the_last(struct cds_lfq_queue_rcu *q,
167 unsigned long old_head, unsigned long new_head);
168
169/*
170 * Lock-free queue uses rcu_read_lock() to proctect the life-time
171 * of the nodes and to prevent ABA-problem.
172 *
173 * cds_lfq_enqueue_rcu() and cds_lfq_dequeue_rcu() need to be called
174 * under rcu read lock critical section. The node returned by
175 * queue_dequeue() must not be modified/re-used/freed until a
176 * grace-period passed.
177 */
178
179static inline
180unsigned long __queue_enqueue(struct cds_lfq_queue_rcu *q,
181 unsigned long tail, unsigned long next,
182 struct cds_lfq_node_rcu *ptail, struct cds_lfq_node_rcu *pnode)
183{
184 unsigned long newnext;
185
186 /* increase the seq for every enqueued node */
187 pnode->next = queue_make_head(queue_node_seq(next) + 1);
188
189 /* Fn(seq) -> T(n+1)(seq+1) */
190 newnext = uatomic_cmpxchg(&ptail->next, next, queue_make_node(pnode));
191
192 if (newnext != next)
193 return newnext;
194
195 /* success, move tail(or done by other), T(n+1) -> F(n+1) */
196 uatomic_cmpxchg(&q->tail, tail, queue_make_node(pnode));
197 return next;
198}
199
d9b52143 200/*
2d37e098 201 * Needs to be called with RCU read-side lock held.
d9b52143 202 */
e17d9985 203static inline
d9b52143 204void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q,
2d37e098 205 struct cds_lfq_node_rcu *pnode)
3d02c34d 206{
2d37e098
MD
207 unsigned long tail, next;
208 struct cds_lfq_node_rcu *ptail;
3d02c34d
MD
209
210 for (;;) {
3d02c34d 211 tail = rcu_dereference(q->tail);
2d37e098
MD
212 if (queue_node_type(tail) == NODE_HEAD) { /* F0 */
213 ptail = &q->head;
214 next = tail;
3d02c34d 215 /*
2d37e098
MD
216 * We cannot use "next = rcu_dereference(ptail->next);"
217 * here, because it is control dependency, not data
218 * dependency. But since F0 is the most likely state
219 * when 0 node, so we use 'next = tail'.
3d02c34d 220 */
2d37e098
MD
221 } else { /* Fn, Tn */
222 ptail = queue_node_node(tail);
223 next = rcu_dereference(ptail->next);
224 }
225
226 if (queue_node_type(next) == NODE_HEAD) { /* Fn */
227 unsigned long newnext;
228
229 /* Fn{n>=0} -> F(n+1) */
230 newnext = __queue_enqueue(q, tail, next, ptail, pnode);
231 if (newnext == next) {
232 return;
233 }
234 next = newnext;
235 }
236
237 if (queue_node_type(next) == NODE_NODE) { /* Tn */
238 /* help moving tail, Tn{n>=1} -> Fn */
239 uatomic_cmpxchg(&q->tail, tail, next);
240 } else if (queue_node_type(next) == NODE_NULL) {
241 /* help finishing dequeuing the last, T0a or T0b -> F0 */
242 __queue_post_dequeue_the_last(q, tail,
243 queue_make_head(queue_node_seq(next)));
3d02c34d
MD
244 }
245 }
246}
247
f6719811 248static inline
2d37e098
MD
249void __queue_post_dequeue_the_last(struct cds_lfq_queue_rcu *q,
250 unsigned long old_head, unsigned long new_head)
251{
252 /* step2: T0a -> T0b */
253 uatomic_cmpxchg(&q->head.next, old_head, new_head);
254
255 /* step3: T0b -> F0 */
256 uatomic_cmpxchg(&q->tail, old_head, new_head);
257}
258
259static inline
260int __queue_dequeue_the_last(struct cds_lfq_queue_rcu *q,
261 unsigned long head, unsigned long next,
262 struct cds_lfq_node_rcu *plast)
f6719811 263{
2d37e098 264 unsigned long origin_tail = rcu_dereference(q->tail);
f6719811 265
2d37e098
MD
266 /*
267 * T1 -> F1 if T1, we cannot dequeue the last node when T1.
268 *
269 * pseudocode is:
270 * tail = rcu_dereference(q->tail); (*)
271 * if (tail == queue_make_head(seq - 1))
272 * uatomic_cmpxchg(&q->tail, tail, head);
273 * But we only expect (*) gets tail's value is:
274 * head (F1)(likely got)
275 * queue_make_head(seq - 1) (T1)
276 * not newer nor older value, so the pseudocode is not acceptable.
277 */
278 if (origin_tail != head) {
279 unsigned long tail;
280
281 /* Don't believe the orderless-read tail! */
282 origin_tail = queue_make_head(queue_node_seq(next) - 1);
283
284 /* help moving tail, T1 -> F1 */
285 tail = uatomic_cmpxchg(&q->tail, origin_tail, head);
286
287 if (tail != origin_tail && tail != head)
288 return 0;
289 }
290
291 /* step1: F1 -> T0a */
292 if (uatomic_cmpxchg(&plast->next, next, queue_make_null(queue_node_seq(next))) != next)
293 return 0;
294
295 __queue_post_dequeue_the_last(q, head, next);
296 return 1;
297}
298
299static inline
300int __queue_dequeue(struct cds_lfq_queue_rcu *q,
301 unsigned long head, unsigned long next)
302{
303 struct cds_lfq_node_rcu *pnext = queue_node_node(next);
304 unsigned long nextnext = rcu_dereference(pnext->next);
305
306 /*
307 * T2 -> F2 if T2, we cannot dequeue the first node when T2.
308 *
309 * pseudocode is:
310 * tail = rcu_dereference(q->tail); (*)
311 * if (tail == head)
312 * uatomic_cmpxchg(&q->tail, head, next);
313 * But we only expect (*) gets tail's value is:
314 * node in the queue
315 * not older value, the older value cause us save a uatomic_cmpxchg() wrongly,
316 * so the pseudocode is not acceptable.
317 *
318 * using uatomic_cmpxchg always is OK, but it adds a uatomic_cmpxchg overhead always:
319 * uatomic_cmpxchg(&q->tail, head, next);
320 */
321 if (queue_node_type(nextnext) == NODE_HEAD) { /* 2 nodes */
322 unsigned long tail = rcu_dereference(q->tail);
323
324 /*
325 * tail == next: now is F2, don't need help moving tail
326 * tail != next: it is unlikely when 2 nodes.
327 * Don't believe the orderless-read tail!
328 */
329 if (tail != next)
330 uatomic_cmpxchg(&q->tail, head, next); /* help for T2 -> F2 */
331 }
332
333 /* Fn{n>=2} -> F(n-1), Tn{n>=3} -> T(n-1) */
334 if (uatomic_cmpxchg(&q->head.next, head, next) != head)
335 return 0;
336
337 return 1;
f6719811
MD
338}
339
3d02c34d 340/*
2d37e098
MD
341 * Needs to be called with rcu read-side lock held.
342 * Wait for a grace period before freeing/reusing the returned node.
343 * If NULL is returned, the queue is empty.
3d02c34d 344 */
e17d9985 345static inline
a34df756 346struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q)
3d02c34d 347{
2d37e098
MD
348 unsigned long head, next;
349 struct cds_lfq_node_rcu *phead;
3d02c34d 350
2d37e098
MD
351 for (;;) {
352 head = rcu_dereference(q->head.next);
353 if (queue_node_type(head) == NODE_HEAD) {
354 /* F0 or T0b */
355 return NULL;
f6719811 356 }
2d37e098
MD
357
358 phead = queue_node_node(head);
359 next = rcu_dereference(phead->next);
360
361 if (queue_node_type(next) == NODE_HEAD) { /* T1, F1 */
362 /* dequeue when only one node */
363 if (__queue_dequeue_the_last(q, head, next, phead))
364 goto done;
365 } else if (queue_node_type(next) == NODE_NODE) {
366 /* dequeue when have >=2 nodes, Tn{n>=2} Fn{n>=2} */
367 if (__queue_dequeue(q, head, next))
368 goto done;
369 } else {
370 /* T0a */
371 assert(queue_node_type(next) == NODE_NULL);
372 return NULL;
3d02c34d
MD
373 }
374 }
2d37e098
MD
375done:
376 return phead;
3d02c34d
MD
377}
378
379#ifdef __cplusplus
380}
381#endif
382
383#endif /* _URCU_RCULFQUEUE_STATIC_H */
This page took 0.039639 seconds and 4 git commands to generate.