X-Git-Url: http://git.lttng.org/?a=blobdiff_plain;f=src%2Frculfhash.c;h=93b0f2c66c9ae02b58a58bbbc741aa2bfaa9d7d3;hb=19ec56ca7d502f12dfb7f7d4299b506ba5abecce;hp=8942c80ba8c6e8eb5f155cd49e10c22d8455ee23;hpb=d7c76f85442125bcfef40f58b1c6fc1bd5ce4ffd;p=urcu.git diff --git a/src/rculfhash.c b/src/rculfhash.c index 8942c80..93b0f2c 100644 --- a/src/rculfhash.c +++ b/src/rculfhash.c @@ -273,12 +273,15 @@ #include #include #include -#include +#include #include #include #include +#include "rculfhash-internal.h" #include "workqueue.h" #include "urcu-die.h" +#include "urcu-utils.h" +#include "compat-smp.h" /* * Split-counters lazily update the global counter each 1024 @@ -360,7 +363,6 @@ struct partition_resize_work { }; static struct urcu_workqueue *cds_lfht_workqueue; -static unsigned long cds_lfht_workqueue_user_count; /* * Mutex ensuring mutual exclusion between workqueue initialization and @@ -377,8 +379,8 @@ static struct urcu_atfork cds_lfht_atfork; */ static int cds_lfht_workqueue_atfork_nesting; +static void __attribute__((destructor)) cds_lfht_exit(void); static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor); -static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor); #ifdef CONFIG_CDS_LFHT_ITER_DEBUG @@ -393,7 +395,8 @@ void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter) #else static -void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter) +void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht __attribute__((unused)), + struct cds_lfht_iter *iter __attribute__((unused))) { } @@ -579,6 +582,7 @@ unsigned int cds_lfht_fls_ulong(unsigned long x) * Return the minimum order for which x <= (1UL << order). * Return -1 if x is 0. */ +static int cds_lfht_get_count_order_u32(uint32_t x) { if (!x) @@ -641,12 +645,11 @@ static long nr_cpus_mask = -1; static long split_count_mask = -1; static int split_count_order = -1; -#if defined(HAVE_SYSCONF) static void ht_init_nr_cpus_mask(void) { long maxcpus; - maxcpus = sysconf(_SC_NPROCESSORS_CONF); + maxcpus = get_possible_cpus_array_len(); if (maxcpus <= 0) { nr_cpus_mask = -2; return; @@ -658,12 +661,6 @@ static void ht_init_nr_cpus_mask(void) maxcpus = 1UL << cds_lfht_get_count_order_ulong(maxcpus); nr_cpus_mask = maxcpus - 1; } -#else /* #if defined(HAVE_SYSCONF) */ -static void ht_init_nr_cpus_mask(void) -{ - nr_cpus_mask = -2; -} -#endif /* #else #if defined(HAVE_SYSCONF) */ static void alloc_split_items_count(struct cds_lfht *ht) @@ -711,9 +708,8 @@ int ht_get_split_count_index(unsigned long hash) static void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash) { - unsigned long split_count; + unsigned long split_count, count; int index; - long count; if (caa_unlikely(!ht->split_count)) return; @@ -732,7 +728,7 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash) if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size) return; - dbg_printf("add set global %ld\n", count); + dbg_printf("add set global %lu\n", count); cds_lfht_resize_lazy_count(ht, size, count >> (CHAIN_LEN_TARGET - 1)); } @@ -740,9 +736,8 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size, unsigned long hash) static void ht_count_del(struct cds_lfht *ht, unsigned long size, unsigned long hash) { - unsigned long split_count; + unsigned long split_count, count; int index; - long count; if (caa_unlikely(!ht->split_count)) return; @@ -761,7 +756,7 @@ void ht_count_del(struct cds_lfht *ht, unsigned long size, unsigned long hash) if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size) return; - dbg_printf("del set global %ld\n", count); + dbg_printf("del set global %lu\n", count); /* * Don't shrink table if the number of nodes is below a * certain threshold. @@ -826,7 +821,7 @@ struct cds_lfht_node *clear_flag(struct cds_lfht_node *node) } static -int is_removed(struct cds_lfht_node *node) +int is_removed(const struct cds_lfht_node *node) { return ((unsigned long) node) & REMOVED_FLAG; } @@ -1247,8 +1242,8 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, { unsigned long partition_len, start = 0; struct partition_resize_work *work; - int thread, ret; - unsigned long nr_threads; + int ret; + unsigned long thread, nr_threads; assert(nr_cpus_mask != -1); if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) @@ -1260,7 +1255,7 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, * partition size, up to the number of CPUs in the system. */ if (nr_cpus_mask > 0) { - nr_threads = min(nr_cpus_mask + 1, + nr_threads = min_t(unsigned long, nr_cpus_mask + 1, len >> MIN_PARTITION_PER_THREAD_ORDER); } else { nr_threads = 1; @@ -1277,7 +1272,8 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, work[thread].len = partition_len; work[thread].start = thread * partition_len; work[thread].fct = fct; - ret = pthread_create(&(work[thread].thread_id), ht->resize_attr, + ret = pthread_create(&(work[thread].thread_id), + ht->caller_resize_attr ? &ht->resize_attr : NULL, partition_resize_thread, &work[thread]); if (ret == EAGAIN) { /* @@ -1449,8 +1445,7 @@ static void fini_table(struct cds_lfht *ht, unsigned long first_order, unsigned long last_order) { - long i; - unsigned long free_by_rcu_order = 0; + unsigned long free_by_rcu_order = 0, i; dbg_printf("fini table: first_order %lu last_order %lu\n", first_order, last_order); @@ -1499,11 +1494,15 @@ void fini_table(struct cds_lfht *ht, } } +/* + * Never called with size < 1. + */ static void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) { struct cds_lfht_node *prev, *node; unsigned long order, len, i; + int bucket_order; cds_lfht_alloc_bucket_table(ht, 0); @@ -1512,7 +1511,10 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) node->next = flag_bucket(get_end()); node->reverse_hash = 0; - for (order = 1; order < cds_lfht_get_count_order_ulong(size) + 1; order++) { + bucket_order = cds_lfht_get_count_order_ulong(size); + assert(bucket_order >= 0); + + for (order = 1; order < (unsigned long) bucket_order + 1; order++) { len = 1UL << (order - 1); cds_lfht_alloc_bucket_table(ht, order); @@ -1543,6 +1545,33 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) } } +#if (CAA_BITS_PER_LONG > 32) +/* + * For 64-bit architectures, with max number of buckets small enough not to + * use the entire 64-bit memory mapping space (and allowing a fair number of + * hash table instances), use the mmap allocator, which is faster. Otherwise, + * fallback to the order allocator. + */ +static +const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets) +{ + if (max_nr_buckets && max_nr_buckets <= (1ULL << 32)) + return &cds_lfht_mm_mmap; + else + return &cds_lfht_mm_order; +} +#else +/* + * For 32-bit architectures, use the order allocator. + */ +static +const struct cds_lfht_mm_type *get_mm_type( + unsigned long max_nr_buckets __attribute__((unused))) +{ + return &cds_lfht_mm_order; +} +#endif + struct cds_lfht *_cds_lfht_new(unsigned long init_size, unsigned long min_nr_alloc_buckets, unsigned long max_nr_buckets, @@ -1565,26 +1594,8 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size, /* * Memory management plugin default. */ - if (!mm) { - if (CAA_BITS_PER_LONG > 32 - && max_nr_buckets - && max_nr_buckets <= (1ULL << 32)) { - /* - * For 64-bit architectures, with max number of - * buckets small enough not to use the entire - * 64-bit memory mapping space (and allowing a - * fair number of hash table instances), use the - * mmap allocator, which is faster than the - * order allocator. - */ - mm = &cds_lfht_mm_mmap; - } else { - /* - * The fallback is to use the order allocator. - */ - mm = &cds_lfht_mm_order; - } - } + if (!mm) + mm = get_mm_type(max_nr_buckets); /* max_nr_buckets == 0 for order based mm means infinite */ if (mm == &cds_lfht_mm_order && !max_nr_buckets) @@ -1609,7 +1620,9 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size, ht->flags = flags; ht->flavor = flavor; - ht->resize_attr = attr; + ht->caller_resize_attr = attr; + if (attr) + ht->resize_attr = *attr; alloc_split_items_count(ht); /* this mutex should not nest in read-side C.S. */ pthread_mutex_init(&ht->resize_mutex, NULL); @@ -1660,7 +1673,8 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, iter->next = next; } -void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match, +void cds_lfht_next_duplicate(struct cds_lfht *ht __attribute__((unused)), + cds_lfht_match_fct match, const void *key, struct cds_lfht_iter *iter) { struct cds_lfht_node *node, *next; @@ -1694,7 +1708,8 @@ void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match, iter->next = next; } -void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) +void cds_lfht_next(struct cds_lfht *ht __attribute__((unused)), + struct cds_lfht_iter *iter) { struct cds_lfht_node *node, *next; @@ -1816,11 +1831,40 @@ int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node) return ret; } -int cds_lfht_is_node_deleted(struct cds_lfht_node *node) +int cds_lfht_is_node_deleted(const struct cds_lfht_node *node) { return is_removed(CMM_LOAD_SHARED(node->next)); } +static +bool cds_lfht_is_empty(struct cds_lfht *ht) +{ + struct cds_lfht_node *node, *next; + bool empty = true; + bool was_online; + + was_online = ht->flavor->read_ongoing(); + if (!was_online) { + ht->flavor->thread_online(); + ht->flavor->read_lock(); + } + /* Check that the table is empty */ + node = bucket_at(ht, 0); + do { + next = rcu_dereference(node->next); + if (!is_bucket(next)) { + empty = false; + break; + } + node = clear_flag(next); + } while (!is_end(node)); + if (!was_online) { + ht->flavor->read_unlock(); + ht->flavor->thread_offline(); + } + return empty; +} + static int cds_lfht_delete_bucket(struct cds_lfht *ht) { @@ -1855,6 +1899,24 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) return 0; } +static +void do_auto_resize_destroy_cb(struct urcu_work *work) +{ + struct cds_lfht *ht = caa_container_of(work, struct cds_lfht, destroy_work); + int ret; + + ht->flavor->register_thread(); + ret = cds_lfht_delete_bucket(ht); + if (ret) + urcu_die(ret); + free_split_items_count(ht); + ret = pthread_mutex_destroy(&ht->resize_mutex); + if (ret) + urcu_die(ret); + ht->flavor->unregister_thread(); + poison_free(ht); +} + /* * Should only be called when no more concurrent readers nor writers can * possibly access the table. @@ -1864,22 +1926,38 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) int ret; if (ht->flags & CDS_LFHT_AUTO_RESIZE) { + /* + * Perform error-checking for emptiness before queuing + * work, so we can return error to the caller. This runs + * concurrently with ongoing resize. + */ + if (!cds_lfht_is_empty(ht)) + return -EPERM; /* Cancel ongoing resize operations. */ _CMM_STORE_SHARED(ht->in_progress_destroy, 1); - /* Wait for in-flight resize operations to complete */ - urcu_workqueue_flush_queued_work(cds_lfht_workqueue); + if (attr) { + *attr = ht->caller_resize_attr; + ht->caller_resize_attr = NULL; + } + /* + * Queue destroy work after prior queued resize + * operations. Given there are no concurrent writers + * accessing the hash table at this point, no resize + * operations can be queued after this destroy work. + */ + urcu_workqueue_queue_work(cds_lfht_workqueue, + &ht->destroy_work, do_auto_resize_destroy_cb); + return 0; } ret = cds_lfht_delete_bucket(ht); if (ret) return ret; free_split_items_count(ht); if (attr) - *attr = ht->resize_attr; + *attr = ht->caller_resize_attr; ret = pthread_mutex_destroy(&ht->resize_mutex); if (ret) ret = -EBUSY; - if (ht->flags & CDS_LFHT_AUTO_RESIZE) - cds_lfht_fini_worker(ht->flavor); poison_free(ht); return ret; } @@ -2100,7 +2178,7 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size, __cds_lfht_resize_lazy_launch(ht); } -static void cds_lfht_before_fork(void *priv) +static void cds_lfht_before_fork(void *priv __attribute__((unused))) { if (cds_lfht_workqueue_atfork_nesting++) return; @@ -2110,7 +2188,7 @@ static void cds_lfht_before_fork(void *priv) urcu_workqueue_pause_worker(cds_lfht_workqueue); } -static void cds_lfht_after_fork_parent(void *priv) +static void cds_lfht_after_fork_parent(void *priv __attribute__((unused))) { if (--cds_lfht_workqueue_atfork_nesting) return; @@ -2121,7 +2199,7 @@ end: mutex_unlock(&cds_lfht_fork_mutex); } -static void cds_lfht_after_fork_child(void *priv) +static void cds_lfht_after_fork_child(void *priv __attribute__((unused))) { if (--cds_lfht_workqueue_atfork_nesting) return; @@ -2138,18 +2216,25 @@ static struct urcu_atfork cds_lfht_atfork = { .after_fork_child = cds_lfht_after_fork_child, }; -/* Block all signals to ensure we don't disturb the application. */ -static void cds_lfht_worker_init(struct urcu_workqueue *workqueue, - void *priv) +/* + * Block all signals for the workqueue worker thread to ensure we don't + * disturb the application. The SIGRCU signal needs to be unblocked for + * the urcu-signal flavor. + */ +static void cds_lfht_worker_init( + struct urcu_workqueue *workqueue __attribute__((unused)), + void *priv __attribute__((unused))) { int ret; sigset_t mask; - /* Block signal for entire process, so only our thread processes it. */ ret = sigfillset(&mask); if (ret) urcu_die(errno); - ret = pthread_sigmask(SIG_BLOCK, &mask, NULL); + ret = sigdelset(&mask, SIGRCU); + if (ret) + urcu_die(errno); + ret = pthread_sigmask(SIG_SETMASK, &mask, NULL); if (ret) urcu_die(ret); } @@ -2159,23 +2244,19 @@ static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor) flavor->register_rculfhash_atfork(&cds_lfht_atfork); mutex_lock(&cds_lfht_fork_mutex); - if (cds_lfht_workqueue_user_count++) - goto end; - cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL, - NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL); -end: + if (!cds_lfht_workqueue) + cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL, + NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL); mutex_unlock(&cds_lfht_fork_mutex); } -static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor) +static void cds_lfht_exit(void) { mutex_lock(&cds_lfht_fork_mutex); - if (--cds_lfht_workqueue_user_count) - goto end; - urcu_workqueue_destroy(cds_lfht_workqueue); - cds_lfht_workqueue = NULL; -end: + if (cds_lfht_workqueue) { + urcu_workqueue_flush_queued_work(cds_lfht_workqueue); + urcu_workqueue_destroy(cds_lfht_workqueue); + cds_lfht_workqueue = NULL; + } mutex_unlock(&cds_lfht_fork_mutex); - - flavor->unregister_rculfhash_atfork(&cds_lfht_atfork); }