doc/examples: lfstack
[urcu.git] / rculfhash.c
index 827c2956b217e6bf00c839744f057142deade1cf..423609d6c188dd79f6392763ad7b8082f2180def 100644 (file)
  *   each the same number of buckets.
  * - The RCU "mmap" memory backend uses a single memory map to hold
  *   all buckets.
- * - synchronzie_rcu is used to garbage-collect the old bucket node table.
+ * - synchronize_rcu is used to garbage-collect the old bucket node table.
  *
  * Ordering Guarantees:
  *
  *  - cds_lfht_first followed iteration with cds_lfht_next (and/or
  *    cds_lfht_next_duplicate, although less common).
  *
- * We define "write" operations as any of cds_lfht_add,
+ * We define "write" operations as any of cds_lfht_add, cds_lfht_replace,
  * cds_lfht_add_unique (success), cds_lfht_add_replace, cds_lfht_del.
  *
  * When cds_lfht_add_unique succeeds (returns the node passed as
  * Split-counters lazily update the global counter each 1024
  * addition/removal. It automatically keeps track of resize required.
  * We use the bucket length as indicator for need to expand for small
- * tables and machines lacking per-cpu data suppport.
+ * tables and machines lacking per-cpu data support.
  */
 #define COUNT_COMMIT_ORDER             10
 #define DEFAULT_SPLIT_COUNT_MASK       0xFUL
@@ -381,7 +381,8 @@ uint8_t bit_reverse_u8(uint8_t v)
        return BitReverseTable256[v];
 }
 
-static __attribute__((unused))
+#if (CAA_BITS_PER_LONG == 32)
+static
 uint32_t bit_reverse_u32(uint32_t v)
 {
        return ((uint32_t) bit_reverse_u8(v) << 24) | 
@@ -389,8 +390,8 @@ uint32_t bit_reverse_u32(uint32_t v)
                ((uint32_t) bit_reverse_u8(v >> 16) << 8) | 
                ((uint32_t) bit_reverse_u8(v >> 24));
 }
-
-static __attribute__((unused))
+#else
+static
 uint64_t bit_reverse_u64(uint64_t v)
 {
        return ((uint64_t) bit_reverse_u8(v) << 56) | 
@@ -402,6 +403,7 @@ uint64_t bit_reverse_u64(uint64_t v)
                ((uint64_t) bit_reverse_u8(v >> 48) << 8) |
                ((uint64_t) bit_reverse_u8(v >> 56));
 }
+#endif
 
 static
 unsigned long bit_reverse_ulong(unsigned long v)
@@ -589,8 +591,6 @@ static void ht_init_nr_cpus_mask(void)
 static
 void alloc_split_items_count(struct cds_lfht *ht)
 {
-       struct ht_items_count *count;
-
        if (nr_cpus_mask == -1) {
                ht_init_nr_cpus_mask();
                if (nr_cpus_mask < 0)
@@ -602,7 +602,8 @@ void alloc_split_items_count(struct cds_lfht *ht)
        assert(split_count_mask >= 0);
 
        if (ht->flags & CDS_LFHT_ACCOUNTING) {
-               ht->split_count = calloc(split_count_mask + 1, sizeof(*count));
+               ht->split_count = calloc(split_count_mask + 1,
+                                       sizeof(struct ht_items_count));
                assert(ht->split_count);
        } else {
                ht->split_count = NULL;
@@ -734,12 +735,6 @@ int is_removed(struct cds_lfht_node *node)
        return ((unsigned long) node) & REMOVED_FLAG;
 }
 
-static
-struct cds_lfht_node *flag_removed(struct cds_lfht_node *node)
-{
-       return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG);
-}
-
 static
 int is_bucket(struct cds_lfht_node *node)
 {
@@ -764,6 +759,12 @@ struct cds_lfht_node *flag_removal_owner(struct cds_lfht_node *node)
        return (struct cds_lfht_node *) (((unsigned long) node) | REMOVAL_OWNER_FLAG);
 }
 
+static
+struct cds_lfht_node *flag_removed_or_removal_owner(struct cds_lfht_node *node)
+{
+       return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG | REMOVAL_OWNER_FLAG);
+}
+
 static
 struct cds_lfht_node *get_end(void)
 {
@@ -832,13 +833,16 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod
 
        assert(!is_bucket(bucket));
        assert(!is_removed(bucket));
+       assert(!is_removal_owner(bucket));
        assert(!is_bucket(node));
        assert(!is_removed(node));
+       assert(!is_removal_owner(node));
        for (;;) {
                iter_prev = bucket;
                /* We can always skip the bucket node initially */
                iter = rcu_dereference(iter_prev->next);
                assert(!is_removed(iter));
+               assert(!is_removal_owner(iter));
                assert(iter_prev->reverse_hash <= node->reverse_hash);
                /*
                 * We should never be called with bucket (start of chain)
@@ -859,6 +863,7 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod
                        iter = next;
                }
                assert(!is_removed(iter));
+               assert(!is_removal_owner(iter));
                if (is_bucket(iter))
                        new_next = flag_bucket(clear_flag(next));
                else
@@ -879,8 +884,10 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
                return -ENOENT;
 
        assert(!is_removed(old_node));
+       assert(!is_removal_owner(old_node));
        assert(!is_bucket(old_node));
        assert(!is_removed(new_node));
+       assert(!is_removal_owner(new_node));
        assert(!is_bucket(new_node));
        assert(new_node != old_node);
        for (;;) {
@@ -894,6 +901,12 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
                }
                assert(old_next == clear_flag(old_next));
                assert(new_node != old_next);
+               /*
+                * REMOVAL_OWNER flag is _NEVER_ set before the REMOVED
+                * flag. It is either set atomically at the same time
+                * (replace) or after (del).
+                */
+               assert(!is_removal_owner(old_next));
                new_node->next = old_next;
                /*
                 * Here is the whole trick for lock-free replace: we add
@@ -906,10 +919,12 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
                 * the old node, but will not see the new one.
                 * This is a replacement of a node with another node
                 * that has the same value: we are therefore not
-                * removing a value from the hash table.
+                * removing a value from the hash table. We set both the
+                * REMOVED and REMOVAL_OWNER flags atomically so we own
+                * the node after successful cmpxchg.
                 */
                ret_next = uatomic_cmpxchg(&old_node->next,
-                             old_next, flag_removed(new_node));
+                       old_next, flag_removed_or_removal_owner(new_node));
                if (ret_next == old_next)
                        break;          /* We performed the replacement. */
                old_next = ret_next;
@@ -923,7 +938,7 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
        bucket = lookup_bucket(ht, size, bit_reverse_ulong(old_node->reverse_hash));
        _cds_lfht_gc_bucket(bucket, new_node);
 
-       assert(is_removed(rcu_dereference(old_node->next)));
+       assert(is_removed(CMM_LOAD_SHARED(old_node->next)));
        return 0;
 }
 
@@ -947,6 +962,7 @@ void _cds_lfht_add(struct cds_lfht *ht,
 
        assert(!is_bucket(node));
        assert(!is_removed(node));
+       assert(!is_removal_owner(node));
        bucket = lookup_bucket(ht, size, hash);
        for (;;) {
                uint32_t chain_len = 0;
@@ -1007,7 +1023,9 @@ void _cds_lfht_add(struct cds_lfht *ht,
        insert:
                assert(node != clear_flag(iter));
                assert(!is_removed(iter_prev));
+               assert(!is_removal_owner(iter_prev));
                assert(!is_removed(iter));
+               assert(!is_removal_owner(iter));
                assert(iter_prev != node);
                if (!bucket_flag)
                        node->next = clear_flag(iter);
@@ -1027,6 +1045,7 @@ void _cds_lfht_add(struct cds_lfht *ht,
 
        gc_node:
                assert(!is_removed(iter));
+               assert(!is_removal_owner(iter));
                if (is_bucket(iter))
                        new_next = flag_bucket(clear_flag(next));
                else
@@ -1061,10 +1080,15 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
         * logical removal flag). Return -ENOENT if the node had
         * previously been removed.
         */
-       next = rcu_dereference(node->next);
+       next = CMM_LOAD_SHARED(node->next);     /* next is not dereferenced */
        if (caa_unlikely(is_removed(next)))
                return -ENOENT;
        assert(!is_bucket(next));
+       /*
+        * The del operation semantic guarantees a full memory barrier
+        * before the uatomic_or atomic commit of the deletion flag.
+        */
+       cmm_smp_mb__before_uatomic_or();
        /*
         * We set the REMOVED_FLAG unconditionally. Note that there may
         * be more than one concurrent thread setting this flag.
@@ -1082,7 +1106,7 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
        bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash));
        _cds_lfht_gc_bucket(bucket, node);
 
-       assert(is_removed(rcu_dereference(node->next)));
+       assert(is_removed(CMM_LOAD_SHARED(node->next)));
        /*
         * Last phase: atomically exchange node->next with a version
         * having "REMOVAL_OWNER_FLAG" set. If the returned node->next
@@ -1510,7 +1534,7 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash,
                }
                node = clear_flag(next);
        }
-       assert(!node || !is_bucket(rcu_dereference(node->next)));
+       assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
        iter->node = node;
        iter->next = next;
 }
@@ -1543,7 +1567,7 @@ void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match,
                }
                node = clear_flag(next);
        }
-       assert(!node || !is_bucket(rcu_dereference(node->next)));
+       assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
        iter->node = node;
        iter->next = next;
 }
@@ -1565,7 +1589,7 @@ void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter)
                }
                node = clear_flag(next);
        }
-       assert(!node || !is_bucket(rcu_dereference(node->next)));
+       assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next)));
        iter->node = node;
        iter->next = next;
 }
@@ -1654,12 +1678,14 @@ int cds_lfht_replace(struct cds_lfht *ht,
 
 int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node)
 {
-       unsigned long size, hash;
+       unsigned long size;
        int ret;
 
        size = rcu_dereference(ht->size);
        ret = _cds_lfht_del(ht, size, node);
        if (!ret) {
+               unsigned long hash;
+
                hash = bit_reverse_ulong(node->reverse_hash);
                ht_count_del(ht, size, hash);
        }
@@ -1668,7 +1694,7 @@ int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node)
 
 int cds_lfht_is_node_deleted(struct cds_lfht_node *node)
 {
-       return is_removed(rcu_dereference(node->next));
+       return is_removed(CMM_LOAD_SHARED(node->next));
 }
 
 static
@@ -1684,6 +1710,7 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht)
                if (!is_bucket(node))
                        return -EPERM;
                assert(!is_removed(node));
+               assert(!is_removal_owner(node));
        } while (!is_end(node));
        /*
         * size accessed without rcu_dereference because hash table is
@@ -1710,13 +1737,25 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht)
  */
 int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
 {
-       int ret;
+       int ret, was_online;
 
        /* Wait for in-flight resize operations to complete */
        _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
        cmm_smp_mb();   /* Store destroy before load resize */
+       was_online = ht->flavor->read_ongoing();
+       if (was_online)
+               ht->flavor->thread_offline();
+       /* Calling with RCU read-side held is an error. */
+       if (ht->flavor->read_ongoing()) {
+               ret = -EINVAL;
+               if (was_online)
+                       ht->flavor->thread_online();
+               goto end;
+       }
        while (uatomic_read(&ht->in_progress_resize))
                poll(NULL, 0, 100);     /* wait for 100ms */
+       if (was_online)
+               ht->flavor->thread_online();
        ret = cds_lfht_delete_bucket(ht);
        if (ret)
                return ret;
@@ -1724,6 +1763,7 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
        if (attr)
                *attr = ht->resize_attr;
        poison_free(ht);
+end:
        return ret;
 }
 
@@ -1852,13 +1892,30 @@ void resize_target_update_count(struct cds_lfht *ht,
 
 void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
 {
+       int was_online;
+
+       was_online = ht->flavor->read_ongoing();
+       if (was_online)
+               ht->flavor->thread_offline();
+       /* Calling with RCU read-side held is an error. */
+       if (ht->flavor->read_ongoing()) {
+               static int print_once;
+
+               if (!CMM_LOAD_SHARED(print_once))
+                       fprintf(stderr, "[error] rculfhash: cds_lfht_resize "
+                               "called with RCU read-side lock held.\n");
+               CMM_STORE_SHARED(print_once, 1);
+               assert(0);
+               goto end;
+       }
        resize_target_update_count(ht, new_size);
        CMM_STORE_SHARED(ht->resize_initiated, 1);
-       ht->flavor->thread_offline();
        pthread_mutex_lock(&ht->resize_mutex);
        _do_cds_lfht_resize(ht);
        pthread_mutex_unlock(&ht->resize_mutex);
-       ht->flavor->thread_online();
+end:
+       if (was_online)
+               ht->flavor->thread_online();
 }
 
 static
This page took 0.027513 seconds and 4 git commands to generate.