#define _LGPL_SOURCE
#include <stdlib.h>
+#include <errno.h>
+#include <assert.h>
+#include <stdio.h>
+
#include <urcu.h>
+#include <urcu-defer.h>
#include <arch.h>
#include <arch_atomic.h>
-#include <assert.h>
#include <compiler.h>
-#include <urcu-defer.h>
-#include <errno.h>
-#include <urcu-ht.h>
#include <urcu/jhash.h>
-#include <stdio.h>
+#include <urcu-ht.h>
struct rcu_ht_node;
struct rcu_ht_node *next;
void *key;
void *data;
+ int stolen;
};
struct rcu_ht {
new_head = calloc(1, sizeof(struct rcu_ht_node));
new_head->key = key;
new_head->data = data;
+ new_head->stolen = 0;
/* here comes the fun and tricky part.
* Add at the beginning with a cmpxchg.
* Hold a read lock between the moment the first element is read
/*
* Restart until we successfully remove the entry, or no entry is left
* ((void *)(unsigned long)-ENOENT).
+ * Deal with concurrent stealers by doing an extra verification pass to check
+ * that no element in the list are still pointing to the element stolen.
+ * This could happen if two concurrent steal for consecutive objects are
+ * executed. A pointer to an object being stolen could be saved by the
+ * concurrent stealer for the previous object.
+ * Also, given that in this precise scenario, another stealer can also want to
+ * delete the doubly-referenced object; use a "stolen" flag to let only one
+ * stealer delete the object.
*/
void *ht_steal(struct rcu_ht *ht, void *key)
{
- struct rcu_ht_node **prev, *node;
+ struct rcu_ht_node **prev, *node, *del_node = NULL;
unsigned long hash;
void *data;
node = rcu_dereference(*prev);
for (;;) {
if (likely(!node)) {
- data = (void *)(unsigned long)-ENOENT;
- goto error;
+ if (del_node) {
+ goto end;
+ } else {
+ goto error;
+ }
}
if (node->key == key) {
break;
prev = &node->next;
node = rcu_dereference(*prev);
}
+
+ if (!del_node) {
+ /*
+ * Another concurrent thread stole it ? If so, let it deal with
+ * this.
+ */
+ if (cmpxchg(&node->stolen, 0, 1) != 0)
+ goto error;
+ }
+
/* Found it ! pointer to object is in "prev" */
- if (rcu_cmpxchg_pointer(prev, node, node->next) != node)
- goto restart;
+ if (rcu_cmpxchg_pointer(prev, node, node->next) == node)
+ del_node = node;
+ goto restart;
- /* From that point, we own node. We can free it outside of read lock */
+end:
+ /*
+ * From that point, we own node. Note that there can still be concurrent
+ * RCU readers using it. We can free it outside of read lock after a GP.
+ */
rcu_read_unlock();
- data = node->data;
- call_rcu(free, node);
+ data = del_node->data;
+ call_rcu(free, del_node);
return data;
error:
+ data = (void *)(unsigned long)-ENOENT;
rcu_read_unlock();
return data;