Fix: auto-resize hash table destroy deadlock
[urcu.git] / src / rculfhash.c
index 8942c80ba8c6e8eb5f155cd49e10c22d8455ee23..93b0f2c66c9ae02b58a58bbbc741aa2bfaa9d7d3 100644 (file)
 #include <urcu/uatomic.h>
 #include <urcu/compiler.h>
 #include <urcu/rculfhash.h>
-#include <rculfhash-internal.h>
+#include <urcu/static/urcu-signal-nr.h>
 #include <stdio.h>
 #include <pthread.h>
 #include <signal.h>
+#include "rculfhash-internal.h"
 #include "workqueue.h"
 #include "urcu-die.h"
+#include "urcu-utils.h"
+#include "compat-smp.h"
 
 /*
  * Split-counters lazily update the global counter each 1024
@@ -360,7 +363,6 @@ struct partition_resize_work {
 };
 
 static struct urcu_workqueue *cds_lfht_workqueue;
-static unsigned long cds_lfht_workqueue_user_count;
 
 /*
  * Mutex ensuring mutual exclusion between workqueue initialization and
@@ -377,8 +379,8 @@ static struct urcu_atfork cds_lfht_atfork;
  */
 static int cds_lfht_workqueue_atfork_nesting;
 
+static void __attribute__((destructor)) cds_lfht_exit(void);
 static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor);
-static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor);
 
 #ifdef CONFIG_CDS_LFHT_ITER_DEBUG
 
@@ -393,7 +395,8 @@ void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter)
 #else
 
 static
-void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht __attribute__((unused)),
+               struct cds_lfht_iter *iter __attribute__((unused)))
 {
 }
 
@@ -579,6 +582,7 @@ unsigned int cds_lfht_fls_ulong(unsigned long x)
  * Return the minimum order for which x <= (1UL << order).
  * Return -1 if x is 0.
  */
+static
 int cds_lfht_get_count_order_u32(uint32_t x)
 {
        if (!x)
@@ -641,12 +645,11 @@ static long nr_cpus_mask = -1;
 static long split_count_mask = -1;
 static int split_count_order = -1;
 
-#if defined(HAVE_SYSCONF)
 static void ht_init_nr_cpus_mask(void)
 {
        long maxcpus;
 
-       maxcpus = sysconf(_SC_NPROCESSORS_CONF);
+       maxcpus = get_possible_cpus_array_len();
        if (maxcpus <= 0) {
                nr_cpus_mask = -2;
                return;
@@ -658,12 +661,6 @@ static void ht_init_nr_cpus_mask(void)
        maxcpus = 1UL << cds_lfht_get_count_order_ulong(maxcpus);
        nr_cpus_mask = maxcpus - 1;
 }
-#else /* #if defined(HAVE_SYSCONF) */
-static void ht_init_nr_cpus_mask(void)
-{
-       nr_cpus_mask = -2;
-}
-#endif /* #else #if defined(HAVE_SYSCONF) */
 
 static
 void alloc_split_items_count(struct cds_lfht *ht)
@@ -711,9 +708,8 @@ int ht_get_split_count_index(unsigned long hash)
 static
 void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash)
 {
-       unsigned long split_count;
+       unsigned long split_count, count;
        int index;
-       long count;
 
        if (caa_unlikely(!ht->split_count))
                return;
@@ -732,7 +728,7 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash)
 
        if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size)
                return;
-       dbg_printf("add set global %ld\n", count);
+       dbg_printf("add set global %lu\n", count);
        cds_lfht_resize_lazy_count(ht, size,
                count >> (CHAIN_LEN_TARGET - 1));
 }
@@ -740,9 +736,8 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash)
 static
 void ht_count_del(struct cds_lfht *ht, unsigned long size, unsigned long hash)
 {
-       unsigned long split_count;
+       unsigned long split_count, count;
        int index;
-       long count;
 
        if (caa_unlikely(!ht->split_count))
                return;
@@ -761,7 +756,7 @@ void ht_count_del(struct cds_lfht *ht, unsigned long size, unsigned long hash)
 
        if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size)
                return;
-       dbg_printf("del set global %ld\n", count);
+       dbg_printf("del set global %lu\n", count);
        /*
         * Don't shrink table if the number of nodes is below a
         * certain threshold.
@@ -826,7 +821,7 @@ struct cds_lfht_node *clear_flag(struct cds_lfht_node *node)
 }
 
 static
-int is_removed(struct cds_lfht_node *node)
+int is_removed(const struct cds_lfht_node *node)
 {
        return ((unsigned long) node) & REMOVED_FLAG;
 }
@@ -1247,8 +1242,8 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
 {
        unsigned long partition_len, start = 0;
        struct partition_resize_work *work;
-       int thread, ret;
-       unsigned long nr_threads;
+       int ret;
+       unsigned long thread, nr_threads;
 
        assert(nr_cpus_mask != -1);
        if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD)
@@ -1260,7 +1255,7 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
         * partition size, up to the number of CPUs in the system.
         */
        if (nr_cpus_mask > 0) {
-               nr_threads = min(nr_cpus_mask + 1,
+               nr_threads = min_t(unsigned long, nr_cpus_mask + 1,
                                 len >> MIN_PARTITION_PER_THREAD_ORDER);
        } else {
                nr_threads = 1;
@@ -1277,7 +1272,8 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
                work[thread].len = partition_len;
                work[thread].start = thread * partition_len;
                work[thread].fct = fct;
-               ret = pthread_create(&(work[thread].thread_id), ht->resize_attr,
+               ret = pthread_create(&(work[thread].thread_id),
+                       ht->caller_resize_attr ? &ht->resize_attr : NULL,
                        partition_resize_thread, &work[thread]);
                if (ret == EAGAIN) {
                        /*
@@ -1449,8 +1445,7 @@ static
 void fini_table(struct cds_lfht *ht,
                unsigned long first_order, unsigned long last_order)
 {
-       long i;
-       unsigned long free_by_rcu_order = 0;
+       unsigned long free_by_rcu_order = 0, i;
 
        dbg_printf("fini table: first_order %lu last_order %lu\n",
                   first_order, last_order);
@@ -1499,11 +1494,15 @@ void fini_table(struct cds_lfht *ht,
        }
 }
 
+/*
+ * Never called with size < 1.
+ */
 static
 void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size)
 {
        struct cds_lfht_node *prev, *node;
        unsigned long order, len, i;
+       int bucket_order;
 
        cds_lfht_alloc_bucket_table(ht, 0);
 
@@ -1512,7 +1511,10 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size)
        node->next = flag_bucket(get_end());
        node->reverse_hash = 0;
 
-       for (order = 1; order < cds_lfht_get_count_order_ulong(size) + 1; order++) {
+       bucket_order = cds_lfht_get_count_order_ulong(size);
+       assert(bucket_order >= 0);
+
+       for (order = 1; order < (unsigned long) bucket_order + 1; order++) {
                len = 1UL << (order - 1);
                cds_lfht_alloc_bucket_table(ht, order);
 
@@ -1543,6 +1545,33 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size)
        }
 }
 
+#if (CAA_BITS_PER_LONG > 32)
+/*
+ * For 64-bit architectures, with max number of buckets small enough not to
+ * use the entire 64-bit memory mapping space (and allowing a fair number of
+ * hash table instances), use the mmap allocator, which is faster. Otherwise,
+ * fallback to the order allocator.
+ */
+static
+const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets)
+{
+       if (max_nr_buckets && max_nr_buckets <= (1ULL << 32))
+               return &cds_lfht_mm_mmap;
+       else
+               return &cds_lfht_mm_order;
+}
+#else
+/*
+ * For 32-bit architectures, use the order allocator.
+ */
+static
+const struct cds_lfht_mm_type *get_mm_type(
+               unsigned long max_nr_buckets __attribute__((unused)))
+{
+       return &cds_lfht_mm_order;
+}
+#endif
+
 struct cds_lfht *_cds_lfht_new(unsigned long init_size,
                        unsigned long min_nr_alloc_buckets,
                        unsigned long max_nr_buckets,
@@ -1565,26 +1594,8 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size,
        /*
         * Memory management plugin default.
         */
-       if (!mm) {
-               if (CAA_BITS_PER_LONG > 32
-                               && max_nr_buckets
-                               && max_nr_buckets <= (1ULL << 32)) {
-                       /*
-                        * For 64-bit architectures, with max number of
-                        * buckets small enough not to use the entire
-                        * 64-bit memory mapping space (and allowing a
-                        * fair number of hash table instances), use the
-                        * mmap allocator, which is faster than the
-                        * order allocator.
-                        */
-                       mm = &cds_lfht_mm_mmap;
-               } else {
-                       /*
-                        * The fallback is to use the order allocator.
-                        */
-                       mm = &cds_lfht_mm_order;
-               }
-       }
+       if (!mm)
+               mm = get_mm_type(max_nr_buckets);
 
        /* max_nr_buckets == 0 for order based mm means infinite */
        if (mm == &cds_lfht_mm_order && !max_nr_buckets)
@@ -1609,7 +1620,9 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size,
 
        ht->flags = flags;
        ht->flavor = flavor;
-       ht->resize_attr = attr;
+       ht->caller_resize_attr = attr;
+       if (attr)
+               ht->resize_attr = *attr;
        alloc_split_items_count(ht);
        /* this mutex should not nest in read-side C.S. */
        pthread_mutex_init(&ht->resize_mutex, NULL);
@@ -1660,7 +1673,8 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash,
        iter->next = next;
 }
 
-void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match,
+void cds_lfht_next_duplicate(struct cds_lfht *ht __attribute__((unused)),
+               cds_lfht_match_fct match,
                const void *key, struct cds_lfht_iter *iter)
 {
        struct cds_lfht_node *node, *next;
@@ -1694,7 +1708,8 @@ void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match,
        iter->next = next;
 }
 
-void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter)
+void cds_lfht_next(struct cds_lfht *ht __attribute__((unused)),
+               struct cds_lfht_iter *iter)
 {
        struct cds_lfht_node *node, *next;
 
@@ -1816,11 +1831,40 @@ int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node)
        return ret;
 }
 
-int cds_lfht_is_node_deleted(struct cds_lfht_node *node)
+int cds_lfht_is_node_deleted(const struct cds_lfht_node *node)
 {
        return is_removed(CMM_LOAD_SHARED(node->next));
 }
 
+static
+bool cds_lfht_is_empty(struct cds_lfht *ht)
+{
+       struct cds_lfht_node *node, *next;
+       bool empty = true;
+       bool was_online;
+
+       was_online = ht->flavor->read_ongoing();
+       if (!was_online) {
+               ht->flavor->thread_online();
+               ht->flavor->read_lock();
+       }
+       /* Check that the table is empty */
+       node = bucket_at(ht, 0);
+       do {
+               next = rcu_dereference(node->next);
+               if (!is_bucket(next)) {
+                       empty = false;
+                       break;
+               }
+               node = clear_flag(next);
+       } while (!is_end(node));
+       if (!was_online) {
+               ht->flavor->read_unlock();
+               ht->flavor->thread_offline();
+       }
+       return empty;
+}
+
 static
 int cds_lfht_delete_bucket(struct cds_lfht *ht)
 {
@@ -1855,6 +1899,24 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht)
        return 0;
 }
 
+static
+void do_auto_resize_destroy_cb(struct urcu_work *work)
+{
+       struct cds_lfht *ht = caa_container_of(work, struct cds_lfht, destroy_work);
+       int ret;
+
+       ht->flavor->register_thread();
+       ret = cds_lfht_delete_bucket(ht);
+       if (ret)
+               urcu_die(ret);
+       free_split_items_count(ht);
+       ret = pthread_mutex_destroy(&ht->resize_mutex);
+       if (ret)
+               urcu_die(ret);
+       ht->flavor->unregister_thread();
+       poison_free(ht);
+}
+
 /*
  * Should only be called when no more concurrent readers nor writers can
  * possibly access the table.
@@ -1864,22 +1926,38 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
        int ret;
 
        if (ht->flags & CDS_LFHT_AUTO_RESIZE) {
+               /*
+                * Perform error-checking for emptiness before queuing
+                * work, so we can return error to the caller. This runs
+                * concurrently with ongoing resize.
+                */
+               if (!cds_lfht_is_empty(ht))
+                       return -EPERM;
                /* Cancel ongoing resize operations. */
                _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
-               /* Wait for in-flight resize operations to complete */
-               urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
+               if (attr) {
+                       *attr = ht->caller_resize_attr;
+                       ht->caller_resize_attr = NULL;
+               }
+               /*
+                * Queue destroy work after prior queued resize
+                * operations. Given there are no concurrent writers
+                * accessing the hash table at this point, no resize
+                * operations can be queued after this destroy work.
+                */
+               urcu_workqueue_queue_work(cds_lfht_workqueue,
+                       &ht->destroy_work, do_auto_resize_destroy_cb);
+               return 0;
        }
        ret = cds_lfht_delete_bucket(ht);
        if (ret)
                return ret;
        free_split_items_count(ht);
        if (attr)
-               *attr = ht->resize_attr;
+               *attr = ht->caller_resize_attr;
        ret = pthread_mutex_destroy(&ht->resize_mutex);
        if (ret)
                ret = -EBUSY;
-       if (ht->flags & CDS_LFHT_AUTO_RESIZE)
-               cds_lfht_fini_worker(ht->flavor);
        poison_free(ht);
        return ret;
 }
@@ -2100,7 +2178,7 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
        __cds_lfht_resize_lazy_launch(ht);
 }
 
-static void cds_lfht_before_fork(void *priv)
+static void cds_lfht_before_fork(void *priv __attribute__((unused)))
 {
        if (cds_lfht_workqueue_atfork_nesting++)
                return;
@@ -2110,7 +2188,7 @@ static void cds_lfht_before_fork(void *priv)
        urcu_workqueue_pause_worker(cds_lfht_workqueue);
 }
 
-static void cds_lfht_after_fork_parent(void *priv)
+static void cds_lfht_after_fork_parent(void *priv __attribute__((unused)))
 {
        if (--cds_lfht_workqueue_atfork_nesting)
                return;
@@ -2121,7 +2199,7 @@ end:
        mutex_unlock(&cds_lfht_fork_mutex);
 }
 
-static void cds_lfht_after_fork_child(void *priv)
+static void cds_lfht_after_fork_child(void *priv __attribute__((unused)))
 {
        if (--cds_lfht_workqueue_atfork_nesting)
                return;
@@ -2138,18 +2216,25 @@ static struct urcu_atfork cds_lfht_atfork = {
        .after_fork_child = cds_lfht_after_fork_child,
 };
 
-/* Block all signals to ensure we don't disturb the application. */
-static void cds_lfht_worker_init(struct urcu_workqueue *workqueue,
-               void *priv)
+/*
+ * Block all signals for the workqueue worker thread to ensure we don't
+ * disturb the application. The SIGRCU signal needs to be unblocked for
+ * the urcu-signal flavor.
+ */
+static void cds_lfht_worker_init(
+               struct urcu_workqueue *workqueue __attribute__((unused)),
+               void *priv __attribute__((unused)))
 {
        int ret;
        sigset_t mask;
 
-       /* Block signal for entire process, so only our thread processes it. */
        ret = sigfillset(&mask);
        if (ret)
                urcu_die(errno);
-       ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+       ret = sigdelset(&mask, SIGRCU);
+       if (ret)
+               urcu_die(errno);
+       ret = pthread_sigmask(SIG_SETMASK, &mask, NULL);
        if (ret)
                urcu_die(ret);
 }
@@ -2159,23 +2244,19 @@ static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor)
        flavor->register_rculfhash_atfork(&cds_lfht_atfork);
 
        mutex_lock(&cds_lfht_fork_mutex);
-       if (cds_lfht_workqueue_user_count++)
-               goto end;
-       cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
-               NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL);
-end:
+       if (!cds_lfht_workqueue)
+               cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
+                       NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL);
        mutex_unlock(&cds_lfht_fork_mutex);
 }
 
-static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor)
+static void cds_lfht_exit(void)
 {
        mutex_lock(&cds_lfht_fork_mutex);
-       if (--cds_lfht_workqueue_user_count)
-               goto end;
-       urcu_workqueue_destroy(cds_lfht_workqueue);
-       cds_lfht_workqueue = NULL;
-end:
+       if (cds_lfht_workqueue) {
+               urcu_workqueue_flush_queued_work(cds_lfht_workqueue);
+               urcu_workqueue_destroy(cds_lfht_workqueue);
+               cds_lfht_workqueue = NULL;
+       }
        mutex_unlock(&cds_lfht_fork_mutex);
-
-       flavor->unregister_rculfhash_atfork(&cds_lfht_atfork);
 }
This page took 0.028887 seconds and 4 git commands to generate.