Cleanup: compiler warning on 32-bit architectures
[urcu.git] / src / rculfhash.c
index d7a1f23bf8f766040341acbd18584246b5c327e4..e0c5860c7b6b91e448b68b250c5c0a58afd49825 100644 (file)
@@ -64,7 +64,7 @@
  * - Split-counters are used to keep track of the number of
  *   nodes within the hash table for automatic resize triggering.
  * - Resize operation initiated by long chain detection is executed by a
- *   call_rcu thread, which keeps lock-freedom of add and remove.
+ *   worker thread, which keeps lock-freedom of add and remove.
  * - Resize operations are protected by a mutex.
  * - The removal operation is split in two parts: first, a "removed"
  *   flag is set in the next pointer within the node to remove. Then,
 #include <unistd.h>
 
 #include "compat-getcpu.h"
-#include <urcu-pointer.h>
-#include <urcu-call-rcu.h>
-#include <urcu-flavor.h>
+#include <urcu/pointer.h>
+#include <urcu/call-rcu.h>
+#include <urcu/flavor.h>
 #include <urcu/arch.h>
 #include <urcu/uatomic.h>
 #include <urcu/compiler.h>
 #include <rculfhash-internal.h>
 #include <stdio.h>
 #include <pthread.h>
+#include <signal.h>
+#include "workqueue.h"
+#include "urcu-die.h"
 
 /*
  * Split-counters lazily update the global counter each 1024
@@ -335,11 +338,11 @@ struct ht_items_count {
 } __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
 
 /*
- * rcu_resize_work: Contains arguments passed to RCU worker thread
+ * resize_work: Contains arguments passed to worker thread
  * responsible for performing lazy resize.
  */
-struct rcu_resize_work {
-       struct rcu_head head;
+struct resize_work {
+       struct urcu_work work;
        struct cds_lfht *ht;
 };
 
@@ -356,6 +359,48 @@ struct partition_resize_work {
                    unsigned long start, unsigned long len);
 };
 
+static struct urcu_workqueue *cds_lfht_workqueue;
+static unsigned long cds_lfht_workqueue_user_count;
+
+/*
+ * Mutex ensuring mutual exclusion between workqueue initialization and
+ * fork handlers. cds_lfht_fork_mutex nests inside call_rcu_mutex.
+ */
+static pthread_mutex_t cds_lfht_fork_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static struct urcu_atfork cds_lfht_atfork;
+
+/*
+ * atfork handler nesting counters. Handle being registered to many urcu
+ * flavors, thus being possibly invoked more than once in the
+ * pthread_atfork list of callbacks.
+ */
+static int cds_lfht_workqueue_atfork_nesting;
+
+static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor);
+static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor);
+
+#ifdef CONFIG_CDS_LFHT_ITER_DEBUG
+
+static
+void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+{
+       iter->lfht = ht;
+}
+
+#define cds_lfht_iter_debug_assert(...)                assert(__VA_ARGS__)
+
+#else
+
+static
+void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+{
+}
+
+#define cds_lfht_iter_debug_assert(...)
+
+#endif
+
 /*
  * Algorithm to reverse bits in a word by lookup table, extended to
  * 64-bit words.
@@ -561,6 +606,37 @@ static
 void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
                                unsigned long count);
 
+static void mutex_lock(pthread_mutex_t *mutex)
+{
+       int ret;
+
+#ifndef DISTRUST_SIGNALS_EXTREME
+       ret = pthread_mutex_lock(mutex);
+       if (ret)
+               urcu_die(ret);
+#else /* #ifndef DISTRUST_SIGNALS_EXTREME */
+       while ((ret = pthread_mutex_trylock(mutex)) != 0) {
+               if (ret != EBUSY && ret != EINTR)
+                       urcu_die(ret);
+               if (CMM_LOAD_SHARED(URCU_TLS(rcu_reader).need_mb)) {
+                       cmm_smp_mb();
+                       _CMM_STORE_SHARED(URCU_TLS(rcu_reader).need_mb, 0);
+                       cmm_smp_mb();
+               }
+               (void) poll(NULL, 0, 10);
+       }
+#endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
+}
+
+static void mutex_unlock(pthread_mutex_t *mutex)
+{
+       int ret;
+
+       ret = pthread_mutex_unlock(mutex);
+       if (ret)
+               urcu_die(ret);
+}
+
 static long nr_cpus_mask = -1;
 static long split_count_mask = -1;
 static int split_count_order = -1;
@@ -1013,7 +1089,13 @@ void _cds_lfht_add(struct cds_lfht *ht,
                        if (unique_ret
                            && !is_bucket(next)
                            && clear_flag(iter)->reverse_hash == node->reverse_hash) {
-                               struct cds_lfht_iter d_iter = { .node = node, .next = iter, };
+                               struct cds_lfht_iter d_iter = {
+                                       .node = node,
+                                       .next = iter,
+#ifdef CONFIG_CDS_LFHT_ITER_DEBUG
+                                       .lfht = ht,
+#endif
+                               };
 
                                /*
                                 * uniquely adding inserts the node as the first
@@ -1224,14 +1306,12 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
        if (start == 0 && nr_threads > 0)
                return;
 fallback:
-       ht->flavor->thread_online();
        fct(ht, i, start, len);
-       ht->flavor->thread_offline();
 }
 
 /*
  * Holding RCU read lock to protect _cds_lfht_add against memory
- * reclaim that could be performed by other call_rcu worker threads (ABA
+ * reclaim that could be performed by other worker threads (ABA
  * problem).
  *
  * When we reach a certain length, we can split this population phase over
@@ -1308,7 +1388,7 @@ void init_table(struct cds_lfht *ht,
 
 /*
  * Holding RCU read lock to protect _cds_lfht_remove against memory
- * reclaim that could be performed by other call_rcu worker threads (ABA
+ * reclaim that could be performed by other worker threads (ABA
  * problem).
  * For a single level, we logically remove and garbage collect each node.
  *
@@ -1320,8 +1400,9 @@ void init_table(struct cds_lfht *ht,
  *
  * Concurrent removal and add operations are helping us perform garbage
  * collection of logically removed nodes. We guarantee that all logically
- * removed nodes have been garbage-collected (unlinked) before call_rcu is
- * invoked to free a hole level of bucket nodes (after a grace period).
+ * removed nodes have been garbage-collected (unlinked) before work
+ * enqueue is invoked to free a hole level of bucket nodes (after a
+ * grace period).
  *
  * Logical removal and garbage collection can therefore be done in batch
  * or on a node-per-node basis, as long as the guarantee above holds.
@@ -1462,6 +1543,32 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size)
        }
 }
 
+#if (CAA_BITS_PER_LONG > 32)
+/*
+ * For 64-bit architectures, with max number of buckets small enough not to
+ * use the entire 64-bit memory mapping space (and allowing a fair number of
+ * hash table instances), use the mmap allocator, which is faster. Otherwise,
+ * fallback to the order allocator.
+ */
+static
+const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets)
+{
+       if (max_nr_buckets && max_nr_buckets <= (1ULL << 32))
+               return &cds_lfht_mm_mmap;
+       else
+               return &cds_lfht_mm_order;
+}
+#else
+/*
+ * For 32-bit architectures, use the order allocator.
+ */
+static
+const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets)
+{
+       return &cds_lfht_mm_order;
+}
+#endif
+
 struct cds_lfht *_cds_lfht_new(unsigned long init_size,
                        unsigned long min_nr_alloc_buckets,
                        unsigned long max_nr_buckets,
@@ -1484,26 +1591,8 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size,
        /*
         * Memory management plugin default.
         */
-       if (!mm) {
-               if (CAA_BITS_PER_LONG > 32
-                               && max_nr_buckets
-                               && max_nr_buckets <= (1ULL << 32)) {
-                       /*
-                        * For 64-bit architectures, with max number of
-                        * buckets small enough not to use the entire
-                        * 64-bit memory mapping space (and allowing a
-                        * fair number of hash table instances), use the
-                        * mmap allocator, which is faster than the
-                        * order allocator.
-                        */
-                       mm = &cds_lfht_mm_mmap;
-               } else {
-                       /*
-                        * The fallback is to use the order allocator.
-                        */
-                       mm = &cds_lfht_mm_order;
-               }
-       }
+       if (!mm)
+               mm = get_mm_type(max_nr_buckets);
 
        /* max_nr_buckets == 0 for order based mm means infinite */
        if (mm == &cds_lfht_mm_order && !max_nr_buckets)
@@ -1513,6 +1602,9 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size,
        if (!max_nr_buckets || (max_nr_buckets & (max_nr_buckets - 1)))
                return NULL;
 
+       if (flags & CDS_LFHT_AUTO_RESIZE)
+               cds_lfht_init_worker(flavor);
+
        min_nr_alloc_buckets = max(min_nr_alloc_buckets, MIN_TABLE_SIZE);
        init_size = max(init_size, MIN_TABLE_SIZE);
        max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets);
@@ -1543,6 +1635,8 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash,
        struct cds_lfht_node *node, *next, *bucket;
        unsigned long reverse_hash, size;
 
+       cds_lfht_iter_debug_set_ht(ht, iter);
+
        reverse_hash = bit_reverse_ulong(hash);
 
        size = rcu_dereference(ht->size);
@@ -1580,6 +1674,7 @@ void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match,
        struct cds_lfht_node *node, *next;
        unsigned long reverse_hash;
 
+       cds_lfht_iter_debug_assert(ht == iter->lfht);
        node = iter->node;
        reverse_hash = node->reverse_hash;
        next = iter->next;
@@ -1611,6 +1706,7 @@ void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter)
 {
        struct cds_lfht_node *node, *next;
 
+       cds_lfht_iter_debug_assert(ht == iter->lfht);
        node = clear_flag(iter->next);
        for (;;) {
                if (caa_unlikely(is_end(node))) {
@@ -1631,6 +1727,7 @@ void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter)
 
 void cds_lfht_first(struct cds_lfht *ht, struct cds_lfht_iter *iter)
 {
+       cds_lfht_iter_debug_set_ht(ht, iter);
        /*
         * Get next after first bucket node. The first bucket node is the
         * first node of the linked list.
@@ -1772,25 +1869,14 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht)
  */
 int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
 {
-       int ret, was_online;
-
-       /* Wait for in-flight resize operations to complete */
-       _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
-       cmm_smp_mb();   /* Store destroy before load resize */
-       was_online = ht->flavor->read_ongoing();
-       if (was_online)
-               ht->flavor->thread_offline();
-       /* Calling with RCU read-side held is an error. */
-       if (ht->flavor->read_ongoing()) {
-               ret = -EINVAL;
-               if (was_online)
-                       ht->flavor->thread_online();
-               goto end;
+       int ret;
+
+       if (ht->flags & CDS_LFHT_AUTO_RESIZE) {
+               /* Cancel ongoing resize operations. */
+               _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
+               /* Wait for in-flight resize operations to complete */
+               urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
        }
-       while (uatomic_read(&ht->in_progress_resize))
-               poll(NULL, 0, 100);     /* wait for 100ms */
-       if (was_online)
-               ht->flavor->thread_online();
        ret = cds_lfht_delete_bucket(ht);
        if (ret)
                return ret;
@@ -1800,8 +1886,9 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
        ret = pthread_mutex_destroy(&ht->resize_mutex);
        if (ret)
                ret = -EBUSY;
+       if (ht->flags & CDS_LFHT_AUTO_RESIZE)
+               cds_lfht_fini_worker(ht->flavor);
        poison_free(ht);
-end:
        return ret;
 }
 
@@ -1897,7 +1984,6 @@ void _do_cds_lfht_resize(struct cds_lfht *ht)
         * Resize table, re-do if the target size has changed under us.
         */
        do {
-               assert(uatomic_read(&ht->in_progress_resize));
                if (CMM_LOAD_SHARED(ht->in_progress_destroy))
                        break;
                ht->resize_initiated = 1;
@@ -1930,71 +2016,47 @@ void resize_target_update_count(struct cds_lfht *ht,
 
 void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
 {
-       int was_online;
-
-       was_online = ht->flavor->read_ongoing();
-       if (was_online)
-               ht->flavor->thread_offline();
-       /* Calling with RCU read-side held is an error. */
-       if (ht->flavor->read_ongoing()) {
-               static int print_once;
-
-               if (!CMM_LOAD_SHARED(print_once))
-                       fprintf(stderr, "[error] rculfhash: cds_lfht_resize "
-                               "called with RCU read-side lock held.\n");
-               CMM_STORE_SHARED(print_once, 1);
-               assert(0);
-               goto end;
-       }
        resize_target_update_count(ht, new_size);
        CMM_STORE_SHARED(ht->resize_initiated, 1);
-       pthread_mutex_lock(&ht->resize_mutex);
+       mutex_lock(&ht->resize_mutex);
        _do_cds_lfht_resize(ht);
-       pthread_mutex_unlock(&ht->resize_mutex);
-end:
-       if (was_online)
-               ht->flavor->thread_online();
+       mutex_unlock(&ht->resize_mutex);
 }
 
 static
-void do_resize_cb(struct rcu_head *head)
+void do_resize_cb(struct urcu_work *work)
 {
-       struct rcu_resize_work *work =
-               caa_container_of(head, struct rcu_resize_work, head);
-       struct cds_lfht *ht = work->ht;
+       struct resize_work *resize_work =
+               caa_container_of(work, struct resize_work, work);
+       struct cds_lfht *ht = resize_work->ht;
 
-       ht->flavor->thread_offline();
-       pthread_mutex_lock(&ht->resize_mutex);
+       ht->flavor->register_thread();
+       mutex_lock(&ht->resize_mutex);
        _do_cds_lfht_resize(ht);
-       pthread_mutex_unlock(&ht->resize_mutex);
-       ht->flavor->thread_online();
+       mutex_unlock(&ht->resize_mutex);
+       ht->flavor->unregister_thread();
        poison_free(work);
-       cmm_smp_mb();   /* finish resize before decrement */
-       uatomic_dec(&ht->in_progress_resize);
 }
 
 static
 void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht)
 {
-       struct rcu_resize_work *work;
+       struct resize_work *work;
 
        /* Store resize_target before read resize_initiated */
        cmm_smp_mb();
        if (!CMM_LOAD_SHARED(ht->resize_initiated)) {
-               uatomic_inc(&ht->in_progress_resize);
-               cmm_smp_mb();   /* increment resize count before load destroy */
                if (CMM_LOAD_SHARED(ht->in_progress_destroy)) {
-                       uatomic_dec(&ht->in_progress_resize);
                        return;
                }
                work = malloc(sizeof(*work));
                if (work == NULL) {
                        dbg_printf("error allocating resize work, bailing out\n");
-                       uatomic_dec(&ht->in_progress_resize);
                        return;
                }
                work->ht = ht;
-               ht->flavor->update_call_rcu(&work->head, do_resize_cb);
+               urcu_workqueue_queue_work(cds_lfht_workqueue,
+                       &work->work, do_resize_cb);
                CMM_STORE_SHARED(ht->resize_initiated, 1);
        }
 }
@@ -2045,3 +2107,83 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
        }
        __cds_lfht_resize_lazy_launch(ht);
 }
+
+static void cds_lfht_before_fork(void *priv)
+{
+       if (cds_lfht_workqueue_atfork_nesting++)
+               return;
+       mutex_lock(&cds_lfht_fork_mutex);
+       if (!cds_lfht_workqueue)
+               return;
+       urcu_workqueue_pause_worker(cds_lfht_workqueue);
+}
+
+static void cds_lfht_after_fork_parent(void *priv)
+{
+       if (--cds_lfht_workqueue_atfork_nesting)
+               return;
+       if (!cds_lfht_workqueue)
+               goto end;
+       urcu_workqueue_resume_worker(cds_lfht_workqueue);
+end:
+       mutex_unlock(&cds_lfht_fork_mutex);
+}
+
+static void cds_lfht_after_fork_child(void *priv)
+{
+       if (--cds_lfht_workqueue_atfork_nesting)
+               return;
+       if (!cds_lfht_workqueue)
+               goto end;
+       urcu_workqueue_create_worker(cds_lfht_workqueue);
+end:
+       mutex_unlock(&cds_lfht_fork_mutex);
+}
+
+static struct urcu_atfork cds_lfht_atfork = {
+       .before_fork = cds_lfht_before_fork,
+       .after_fork_parent = cds_lfht_after_fork_parent,
+       .after_fork_child = cds_lfht_after_fork_child,
+};
+
+/* Block all signals to ensure we don't disturb the application. */
+static void cds_lfht_worker_init(struct urcu_workqueue *workqueue,
+               void *priv)
+{
+       int ret;
+       sigset_t mask;
+
+       /* Block signal for entire process, so only our thread processes it. */
+       ret = sigfillset(&mask);
+       if (ret)
+               urcu_die(errno);
+       ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+       if (ret)
+               urcu_die(ret);
+}
+
+static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor)
+{
+       flavor->register_rculfhash_atfork(&cds_lfht_atfork);
+
+       mutex_lock(&cds_lfht_fork_mutex);
+       if (cds_lfht_workqueue_user_count++)
+               goto end;
+       cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
+               NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL);
+end:
+       mutex_unlock(&cds_lfht_fork_mutex);
+}
+
+static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor)
+{
+       mutex_lock(&cds_lfht_fork_mutex);
+       if (--cds_lfht_workqueue_user_count)
+               goto end;
+       urcu_workqueue_destroy(cds_lfht_workqueue);
+       cds_lfht_workqueue = NULL;
+end:
+       mutex_unlock(&cds_lfht_fork_mutex);
+
+       flavor->unregister_rculfhash_atfork(&cds_lfht_atfork);
+}
This page took 0.028339 seconds and 4 git commands to generate.