X-Git-Url: https://git.lttng.org/?p=urcu.git;a=blobdiff_plain;f=src%2Frculfhash.c;h=8046f3a63a280397be3986231f1db8921ea4bfdb;hp=51972c8d1b318c618776a8d0b4b26e5373b9dbae;hb=HEAD;hpb=afa5940dbe80a259cf8bc4a99403554a3c2c9e32 diff --git a/src/rculfhash.c b/src/rculfhash.c index 51972c8..10f5b8e 100644 --- a/src/rculfhash.c +++ b/src/rculfhash.c @@ -1,24 +1,10 @@ +// SPDX-FileCopyrightText: 2010-2011 Mathieu Desnoyers +// SPDX-FileCopyrightText: 2011 Lai Jiangshan +// +// SPDX-License-Identifier: LGPL-2.1-or-later + /* - * rculfhash.c - * * Userspace RCU library - Lock-Free Resizable RCU Hash Table - * - * Copyright 2010-2011 - Mathieu Desnoyers - * Copyright 2011 - Lai Jiangshan - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2.1 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ /* @@ -258,14 +244,15 @@ #define _LGPL_SOURCE #include #include -#include #include #include #include #include #include +#include #include "compat-getcpu.h" +#include #include #include #include @@ -273,14 +260,14 @@ #include #include #include -#include -#include #include #include #include +#include "rculfhash-internal.h" #include "workqueue.h" #include "urcu-die.h" #include "urcu-utils.h" +#include "compat-smp.h" /* * Split-counters lazily update the global counter each 1024 @@ -361,8 +348,12 @@ struct partition_resize_work { unsigned long start, unsigned long len); }; +enum nr_cpus_mask_state { + NR_CPUS_MASK_INIT_FAILED = -2, + NR_CPUS_MASK_UNINITIALIZED = -1, +}; + static struct urcu_workqueue *cds_lfht_workqueue; -static unsigned long cds_lfht_workqueue_user_count; /* * Mutex ensuring mutual exclusion between workqueue initialization and @@ -379,8 +370,8 @@ static struct urcu_atfork cds_lfht_atfork; */ static int cds_lfht_workqueue_atfork_nesting; +static void __attribute__((destructor)) cds_lfht_exit(void); static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor); -static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor); #ifdef CONFIG_CDS_LFHT_ITER_DEBUG @@ -390,12 +381,13 @@ void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter) iter->lfht = ht; } -#define cds_lfht_iter_debug_assert(...) assert(__VA_ARGS__) +#define cds_lfht_iter_debug_assert(...) urcu_posix_assert(__VA_ARGS__) #else static -void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht, struct cds_lfht_iter *iter) +void cds_lfht_iter_debug_set_ht(struct cds_lfht *ht __attribute__((unused)), + struct cds_lfht_iter *iter __attribute__((unused))) { } @@ -467,7 +459,7 @@ unsigned long bit_reverse_ulong(unsigned long v) * Returns 0 if no bit is set, else returns the position of the most * significant bit (from 1 to 32 on 32-bit, from 1 to 64 on 64-bit). */ -#if defined(__i386) || defined(__x86_64) +#if defined(URCU_ARCH_X86) static inline unsigned int fls_u32(uint32_t x) { @@ -483,7 +475,7 @@ unsigned int fls_u32(uint32_t x) #define HAS_FLS_U32 #endif -#if defined(__x86_64) +#if defined(URCU_ARCH_AMD64) static inline unsigned int fls_u64(uint64_t x) { @@ -577,10 +569,55 @@ unsigned int cds_lfht_fls_ulong(unsigned long x) #endif } +static void *cds_lfht_malloc(void *state __attribute__((unused)), + size_t size) +{ + return malloc(size); +} + +static void *cds_lfht_calloc(void *state __attribute__((unused)), + size_t nmemb, size_t size) +{ + return calloc(nmemb, size); +} + +static void *cds_lfht_realloc(void *state __attribute__((unused)), + void *ptr, size_t size) +{ + return realloc(ptr, size); +} + +static void *cds_lfht_aligned_alloc(void *state __attribute__((unused)), + size_t alignment, size_t size) +{ + void *ptr; + + if (posix_memalign(&ptr, alignment, size)) + return NULL; + return ptr; +} + +static void cds_lfht_free(void *state __attribute__((unused)), void *ptr) +{ + free(ptr); +} + + +/* Default memory allocator */ +static struct cds_lfht_alloc cds_lfht_default_alloc = { + .malloc = cds_lfht_malloc, + .calloc = cds_lfht_calloc, + .realloc = cds_lfht_realloc, + .aligned_alloc = cds_lfht_aligned_alloc, + .free = cds_lfht_free, + .state = NULL, +}; + /* * Return the minimum order for which x <= (1UL << order). * Return -1 if x is 0. */ +static int cds_lfht_get_count_order_u32(uint32_t x) { if (!x) @@ -621,9 +658,7 @@ static void mutex_lock(pthread_mutex_t *mutex) if (ret != EBUSY && ret != EINTR) urcu_die(ret); if (CMM_LOAD_SHARED(URCU_TLS(rcu_reader).need_mb)) { - cmm_smp_mb(); - _CMM_STORE_SHARED(URCU_TLS(rcu_reader).need_mb, 0); - cmm_smp_mb(); + uatomic_store(&URCU_TLS(rcu_reader).need_mb, 0, CMM_SEQ_CST); } (void) poll(NULL, 0, 10); } @@ -639,18 +674,17 @@ static void mutex_unlock(pthread_mutex_t *mutex) urcu_die(ret); } -static long nr_cpus_mask = -1; +static long nr_cpus_mask = NR_CPUS_MASK_UNINITIALIZED; static long split_count_mask = -1; static int split_count_order = -1; -#if defined(HAVE_SYSCONF) static void ht_init_nr_cpus_mask(void) { long maxcpus; - maxcpus = sysconf(_SC_NPROCESSORS_CONF); + maxcpus = get_possible_cpus_array_len(); if (maxcpus <= 0) { - nr_cpus_mask = -2; + nr_cpus_mask = NR_CPUS_MASK_INIT_FAILED; return; } /* @@ -660,17 +694,11 @@ static void ht_init_nr_cpus_mask(void) maxcpus = 1UL << cds_lfht_get_count_order_ulong(maxcpus); nr_cpus_mask = maxcpus - 1; } -#else /* #if defined(HAVE_SYSCONF) */ -static void ht_init_nr_cpus_mask(void) -{ - nr_cpus_mask = -2; -} -#endif /* #else #if defined(HAVE_SYSCONF) */ static void alloc_split_items_count(struct cds_lfht *ht) { - if (nr_cpus_mask == -1) { + if (nr_cpus_mask == NR_CPUS_MASK_UNINITIALIZED) { ht_init_nr_cpus_mask(); if (nr_cpus_mask < 0) split_count_mask = DEFAULT_SPLIT_COUNT_MASK; @@ -680,12 +708,12 @@ void alloc_split_items_count(struct cds_lfht *ht) cds_lfht_get_count_order_ulong(split_count_mask + 1); } - assert(split_count_mask >= 0); + urcu_posix_assert(split_count_mask >= 0); if (ht->flags & CDS_LFHT_ACCOUNTING) { - ht->split_count = calloc(split_count_mask + 1, + ht->split_count = ht->alloc->calloc(ht->alloc->state, split_count_mask + 1, sizeof(struct ht_items_count)); - assert(ht->split_count); + urcu_posix_assert(ht->split_count); } else { ht->split_count = NULL; } @@ -694,7 +722,7 @@ void alloc_split_items_count(struct cds_lfht *ht) static void free_split_items_count(struct cds_lfht *ht) { - poison_free(ht->split_count); + poison_free(ht->alloc, ht->split_count); } static @@ -702,7 +730,7 @@ int ht_get_split_count_index(unsigned long hash) { int cpu; - assert(split_count_mask >= 0); + urcu_posix_assert(split_count_mask >= 0); cpu = urcu_sched_getcpu(); if (caa_unlikely(cpu < 0)) return hash & split_count_mask; @@ -761,7 +789,7 @@ void ht_count_del(struct cds_lfht *ht, unsigned long size, unsigned long hash) if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size) return; - dbg_printf("del set global %ld\n", count); + dbg_printf("del set global %lu\n", count); /* * Don't shrink table if the number of nodes is below a * certain threshold. @@ -849,6 +877,12 @@ int is_removal_owner(struct cds_lfht_node *node) return ((unsigned long) node) & REMOVAL_OWNER_FLAG; } +static +struct cds_lfht_node *flag_removed(struct cds_lfht_node *node) +{ + return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG); +} + static struct cds_lfht_node *flag_removal_owner(struct cds_lfht_node *node) { @@ -882,8 +916,10 @@ unsigned long _uatomic_xchg_monotonic_increase(unsigned long *ptr, old1 = uatomic_read(ptr); do { old2 = old1; - if (old2 >= v) + if (old2 >= v) { + cmm_smp_mb(); return old2; + } } while ((old1 = uatomic_cmpxchg(ptr, old2, v)) != old2); return old2; } @@ -915,7 +951,7 @@ static inline struct cds_lfht_node *lookup_bucket(struct cds_lfht *ht, unsigned long size, unsigned long hash) { - assert(size > 0); + urcu_posix_assert(size > 0); return bucket_at(ht, hash & (size - 1)); } @@ -927,26 +963,26 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod { struct cds_lfht_node *iter_prev, *iter, *next, *new_next; - assert(!is_bucket(bucket)); - assert(!is_removed(bucket)); - assert(!is_removal_owner(bucket)); - assert(!is_bucket(node)); - assert(!is_removed(node)); - assert(!is_removal_owner(node)); + urcu_posix_assert(!is_bucket(bucket)); + urcu_posix_assert(!is_removed(bucket)); + urcu_posix_assert(!is_removal_owner(bucket)); + urcu_posix_assert(!is_bucket(node)); + urcu_posix_assert(!is_removed(node)); + urcu_posix_assert(!is_removal_owner(node)); for (;;) { iter_prev = bucket; /* We can always skip the bucket node initially */ iter = rcu_dereference(iter_prev->next); - assert(!is_removed(iter)); - assert(!is_removal_owner(iter)); - assert(iter_prev->reverse_hash <= node->reverse_hash); + urcu_posix_assert(!is_removed(iter)); + urcu_posix_assert(!is_removal_owner(iter)); + urcu_posix_assert(iter_prev->reverse_hash <= node->reverse_hash); /* * We should never be called with bucket (start of chain) * and logically removed node (end of path compression * marker) being the actual same node. This would be a * bug in the algorithm implementation. */ - assert(bucket != node); + urcu_posix_assert(bucket != node); for (;;) { if (caa_unlikely(is_end(iter))) return; @@ -958,8 +994,8 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *bucket, struct cds_lfht_node *nod iter_prev = clear_flag(iter); iter = next; } - assert(!is_removed(iter)); - assert(!is_removal_owner(iter)); + urcu_posix_assert(!is_removed(iter)); + urcu_posix_assert(!is_removal_owner(iter)); if (is_bucket(iter)) new_next = flag_bucket(clear_flag(next)); else @@ -979,13 +1015,13 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, if (!old_node) /* Return -ENOENT if asked to replace NULL node */ return -ENOENT; - assert(!is_removed(old_node)); - assert(!is_removal_owner(old_node)); - assert(!is_bucket(old_node)); - assert(!is_removed(new_node)); - assert(!is_removal_owner(new_node)); - assert(!is_bucket(new_node)); - assert(new_node != old_node); + urcu_posix_assert(!is_removed(old_node)); + urcu_posix_assert(!is_removal_owner(old_node)); + urcu_posix_assert(!is_bucket(old_node)); + urcu_posix_assert(!is_removed(new_node)); + urcu_posix_assert(!is_removal_owner(new_node)); + urcu_posix_assert(!is_bucket(new_node)); + urcu_posix_assert(new_node != old_node); for (;;) { /* Insert after node to be replaced */ if (is_removed(old_next)) { @@ -995,14 +1031,14 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, */ return -ENOENT; } - assert(old_next == clear_flag(old_next)); - assert(new_node != old_next); + urcu_posix_assert(old_next == clear_flag(old_next)); + urcu_posix_assert(new_node != old_next); /* * REMOVAL_OWNER flag is _NEVER_ set before the REMOVED * flag. It is either set atomically at the same time * (replace) or after (del). */ - assert(!is_removal_owner(old_next)); + urcu_posix_assert(!is_removal_owner(old_next)); new_node->next = old_next; /* * Here is the whole trick for lock-free replace: we add @@ -1034,7 +1070,7 @@ int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, bucket = lookup_bucket(ht, size, bit_reverse_ulong(old_node->reverse_hash)); _cds_lfht_gc_bucket(bucket, new_node); - assert(is_removed(CMM_LOAD_SHARED(old_node->next))); + urcu_posix_assert(is_removed(CMM_LOAD_SHARED(old_node->next))); return 0; } @@ -1056,9 +1092,9 @@ void _cds_lfht_add(struct cds_lfht *ht, *return_node; struct cds_lfht_node *bucket; - assert(!is_bucket(node)); - assert(!is_removed(node)); - assert(!is_removal_owner(node)); + urcu_posix_assert(!is_bucket(node)); + urcu_posix_assert(!is_removed(node)); + urcu_posix_assert(!is_removal_owner(node)); bucket = lookup_bucket(ht, size, hash); for (;;) { uint32_t chain_len = 0; @@ -1070,7 +1106,7 @@ void _cds_lfht_add(struct cds_lfht *ht, iter_prev = bucket; /* We can always skip the bucket node initially */ iter = rcu_dereference(iter_prev->next); - assert(iter_prev->reverse_hash <= node->reverse_hash); + urcu_posix_assert(iter_prev->reverse_hash <= node->reverse_hash); for (;;) { if (caa_unlikely(is_end(iter))) goto insert; @@ -1123,12 +1159,12 @@ void _cds_lfht_add(struct cds_lfht *ht, } insert: - assert(node != clear_flag(iter)); - assert(!is_removed(iter_prev)); - assert(!is_removal_owner(iter_prev)); - assert(!is_removed(iter)); - assert(!is_removal_owner(iter)); - assert(iter_prev != node); + urcu_posix_assert(node != clear_flag(iter)); + urcu_posix_assert(!is_removed(iter_prev)); + urcu_posix_assert(!is_removal_owner(iter_prev)); + urcu_posix_assert(!is_removed(iter)); + urcu_posix_assert(!is_removal_owner(iter)); + urcu_posix_assert(iter_prev != node); if (!bucket_flag) node->next = clear_flag(iter); else @@ -1146,8 +1182,8 @@ void _cds_lfht_add(struct cds_lfht *ht, } gc_node: - assert(!is_removed(iter)); - assert(!is_removal_owner(iter)); + urcu_posix_assert(!is_removed(iter)); + urcu_posix_assert(!is_removal_owner(iter)); if (is_bucket(iter)) new_next = flag_bucket(clear_flag(next)); else @@ -1167,14 +1203,15 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, struct cds_lfht_node *node) { struct cds_lfht_node *bucket, *next; + uintptr_t *node_next; if (!node) /* Return -ENOENT if asked to delete NULL node */ return -ENOENT; /* logically delete the node */ - assert(!is_bucket(node)); - assert(!is_removed(node)); - assert(!is_removal_owner(node)); + urcu_posix_assert(!is_bucket(node)); + urcu_posix_assert(!is_removed(node)); + urcu_posix_assert(!is_removal_owner(node)); /* * We are first checking if the node had previously been @@ -1185,19 +1222,22 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, next = CMM_LOAD_SHARED(node->next); /* next is not dereferenced */ if (caa_unlikely(is_removed(next))) return -ENOENT; - assert(!is_bucket(next)); + urcu_posix_assert(!is_bucket(next)); /* * The del operation semantic guarantees a full memory barrier * before the uatomic_or atomic commit of the deletion flag. - */ - cmm_smp_mb__before_uatomic_or(); - /* + * * We set the REMOVED_FLAG unconditionally. Note that there may * be more than one concurrent thread setting this flag. * Knowing which wins the race will be known after the garbage * collection phase, stay tuned! + * + * NOTE: The node_next variable is present to avoid breaking + * strict-aliasing rules. */ - uatomic_or(&node->next, REMOVED_FLAG); + node_next = (uintptr_t*)&node->next; + uatomic_or_mo(node_next, REMOVED_FLAG, CMM_RELEASE); + /* We performed the (logical) deletion. */ /* @@ -1208,7 +1248,7 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, bucket = lookup_bucket(ht, size, bit_reverse_ulong(node->reverse_hash)); _cds_lfht_gc_bucket(bucket, node); - assert(is_removed(CMM_LOAD_SHARED(node->next))); + urcu_posix_assert(is_removed(CMM_LOAD_SHARED(node->next))); /* * Last phase: atomically exchange node->next with a version * having "REMOVAL_OWNER_FLAG" set. If the returned node->next @@ -1222,7 +1262,7 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, * was already set). */ if (!is_removal_owner(uatomic_xchg(&node->next, - flag_removal_owner(node->next)))) + flag_removal_owner(uatomic_load(&node->next, CMM_RELAXED))))) return 0; else return -ENOENT; @@ -1249,8 +1289,9 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, struct partition_resize_work *work; int ret; unsigned long thread, nr_threads; + sigset_t newmask, oldmask; - assert(nr_cpus_mask != -1); + urcu_posix_assert(nr_cpus_mask != NR_CPUS_MASK_UNINITIALIZED); if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) goto fallback; @@ -1266,18 +1307,25 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, nr_threads = 1; } partition_len = len >> cds_lfht_get_count_order_ulong(nr_threads); - work = calloc(nr_threads, sizeof(*work)); + work = ht->alloc->calloc(ht->alloc->state, nr_threads, sizeof(*work)); if (!work) { dbg_printf("error allocating for resize, single-threading\n"); goto fallback; } + + ret = sigfillset(&newmask); + urcu_posix_assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask); + urcu_posix_assert(!ret); + for (thread = 0; thread < nr_threads; thread++) { work[thread].ht = ht; work[thread].i = i; work[thread].len = partition_len; work[thread].start = thread * partition_len; work[thread].fct = fct; - ret = pthread_create(&(work[thread].thread_id), ht->resize_attr, + ret = pthread_create(&(work[thread].thread_id), + ht->caller_resize_attr ? &ht->resize_attr : NULL, partition_resize_thread, &work[thread]); if (ret == EAGAIN) { /* @@ -1290,13 +1338,17 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i, nr_threads = thread; break; } - assert(!ret); + urcu_posix_assert(!ret); } + + ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); + urcu_posix_assert(!ret); + for (thread = 0; thread < nr_threads; thread++) { ret = pthread_join(work[thread].thread_id, NULL); - assert(!ret); + urcu_posix_assert(!ret); } - free(work); + ht->alloc->free(ht->alloc->state, work); /* * A pthread_create failure above will either lead in us having @@ -1326,12 +1378,12 @@ void init_table_populate_partition(struct cds_lfht *ht, unsigned long i, { unsigned long j, size = 1UL << (i - 1); - assert(i > MIN_TABLE_ORDER); + urcu_posix_assert(i > MIN_TABLE_ORDER); ht->flavor->read_lock(); for (j = size + start; j < size + start + len; j++) { struct cds_lfht_node *new_node = bucket_at(ht, j); - assert(j >= size && j < (size << 1)); + urcu_posix_assert(j >= size && j < (size << 1)); dbg_printf("init populate: order %lu index %lu hash %lu\n", i, j, j); new_node->reverse_hash = bit_reverse_ulong(j); @@ -1355,7 +1407,7 @@ void init_table(struct cds_lfht *ht, dbg_printf("init table: first_order %lu last_order %lu\n", first_order, last_order); - assert(first_order > MIN_TABLE_ORDER); + urcu_posix_assert(first_order > MIN_TABLE_ORDER); for (i = first_order; i <= last_order; i++) { unsigned long len; @@ -1376,9 +1428,10 @@ void init_table(struct cds_lfht *ht, /* * Update table size. + * + * Populate data before RCU size. */ - cmm_smp_wmb(); /* populate data before RCU size */ - CMM_STORE_SHARED(ht->size, 1UL << i); + uatomic_store(&ht->size, 1UL << i, CMM_RELEASE); dbg_printf("init new size: %lu\n", 1UL << i); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) @@ -1418,17 +1471,23 @@ void remove_table_partition(struct cds_lfht *ht, unsigned long i, { unsigned long j, size = 1UL << (i - 1); - assert(i > MIN_TABLE_ORDER); + urcu_posix_assert(i > MIN_TABLE_ORDER); ht->flavor->read_lock(); for (j = size + start; j < size + start + len; j++) { struct cds_lfht_node *fini_bucket = bucket_at(ht, j); struct cds_lfht_node *parent_bucket = bucket_at(ht, j - size); + uintptr_t *fini_bucket_next; - assert(j >= size && j < (size << 1)); + urcu_posix_assert(j >= size && j < (size << 1)); dbg_printf("remove entry: order %lu index %lu hash %lu\n", i, j, j); - /* Set the REMOVED_FLAG to freeze the ->next for gc */ - uatomic_or(&fini_bucket->next, REMOVED_FLAG); + /* Set the REMOVED_FLAG to freeze the ->next for gc. + * + * NOTE: The fini_bucket_next variable is present to + * avoid breaking strict-aliasing rules. + */ + fini_bucket_next = (uintptr_t*)&fini_bucket->next; + uatomic_or(fini_bucket_next, REMOVED_FLAG); _cds_lfht_gc_bucket(parent_bucket, fini_bucket); } ht->flavor->read_unlock(); @@ -1453,7 +1512,7 @@ void fini_table(struct cds_lfht *ht, dbg_printf("fini table: first_order %lu last_order %lu\n", first_order, last_order); - assert(first_order > MIN_TABLE_ORDER); + urcu_posix_assert(first_order > MIN_TABLE_ORDER); for (i = last_order; i >= first_order; i--) { unsigned long len; @@ -1516,7 +1575,7 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) node->reverse_hash = 0; bucket_order = cds_lfht_get_count_order_ulong(size); - assert(bucket_order >= 0); + urcu_posix_assert(bucket_order >= 0); for (order = 1; order < (unsigned long) bucket_order + 1; order++) { len = 1UL << (order - 1); @@ -1542,7 +1601,7 @@ void cds_lfht_create_bucket(struct cds_lfht *ht, unsigned long size) node->reverse_hash = bit_reverse_ulong(len + i); /* insert after prev */ - assert(is_bucket(prev->next)); + urcu_posix_assert(is_bucket(prev->next)); node->next = prev->next; prev->next = flag_bucket(node); } @@ -1569,18 +1628,26 @@ const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets) * For 32-bit architectures, use the order allocator. */ static -const struct cds_lfht_mm_type *get_mm_type(unsigned long max_nr_buckets) +const struct cds_lfht_mm_type *get_mm_type( + unsigned long max_nr_buckets __attribute__((unused))) { return &cds_lfht_mm_order; } #endif -struct cds_lfht *_cds_lfht_new(unsigned long init_size, +void cds_lfht_node_init_deleted(struct cds_lfht_node *node) +{ + cds_lfht_node_init(node); + node->next = flag_removed(NULL); +} + +struct cds_lfht *_cds_lfht_new_with_alloc(unsigned long init_size, unsigned long min_nr_alloc_buckets, unsigned long max_nr_buckets, int flags, const struct cds_lfht_mm_type *mm, const struct rcu_flavor_struct *flavor, + const struct cds_lfht_alloc *alloc, pthread_attr_t *attr) { struct cds_lfht *ht; @@ -1616,14 +1683,17 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size, max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets); init_size = min(init_size, max_nr_buckets); - ht = mm->alloc_cds_lfht(min_nr_alloc_buckets, max_nr_buckets); - assert(ht); - assert(ht->mm == mm); - assert(ht->bucket_at == mm->bucket_at); + ht = mm->alloc_cds_lfht(min_nr_alloc_buckets, max_nr_buckets, alloc ? : &cds_lfht_default_alloc); + + urcu_posix_assert(ht); + urcu_posix_assert(ht->mm == mm); + urcu_posix_assert(ht->bucket_at == mm->bucket_at); ht->flags = flags; ht->flavor = flavor; - ht->resize_attr = attr; + ht->caller_resize_attr = attr; + if (attr) + ht->resize_attr = *attr; alloc_split_items_count(ht); /* this mutex should not nest in read-side C.S. */ pthread_mutex_init(&ht->resize_mutex, NULL); @@ -1634,6 +1704,19 @@ struct cds_lfht *_cds_lfht_new(unsigned long init_size, return ht; } +struct cds_lfht *_cds_lfht_new(unsigned long init_size, + unsigned long min_nr_alloc_buckets, + unsigned long max_nr_buckets, + int flags, + const struct cds_lfht_mm_type *mm, + const struct rcu_flavor_struct *flavor, + pthread_attr_t *attr) +{ + return _cds_lfht_new_with_alloc(init_size, + min_nr_alloc_buckets, max_nr_buckets, + flags, mm, flavor, NULL, attr); +} + void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, cds_lfht_match_fct match, const void *key, struct cds_lfht_iter *iter) @@ -1645,7 +1728,14 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, reverse_hash = bit_reverse_ulong(hash); - size = rcu_dereference(ht->size); + /* + * Use load acquire instead of rcu_dereference because there is no + * dependency between the table size and the dereference of the bucket + * content. + * + * This acquire is paired with the store release in init_table(). + */ + size = uatomic_load(&ht->size, CMM_ACQUIRE); bucket = lookup_bucket(ht, size, hash); /* We can always skip the bucket node initially */ node = rcu_dereference(bucket->next); @@ -1660,7 +1750,7 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, break; } next = rcu_dereference(node->next); - assert(node == clear_flag(node)); + urcu_posix_assert(node == clear_flag(node)); if (caa_likely(!is_removed(next)) && !is_bucket(next) && node->reverse_hash == reverse_hash @@ -1669,12 +1759,13 @@ void cds_lfht_lookup(struct cds_lfht *ht, unsigned long hash, } node = clear_flag(next); } - assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); + urcu_posix_assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); iter->node = node; iter->next = next; } -void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match, +void cds_lfht_next_duplicate(struct cds_lfht *ht __attribute__((unused)), + cds_lfht_match_fct match, const void *key, struct cds_lfht_iter *iter) { struct cds_lfht_node *node, *next; @@ -1703,12 +1794,13 @@ void cds_lfht_next_duplicate(struct cds_lfht *ht, cds_lfht_match_fct match, } node = clear_flag(next); } - assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); + urcu_posix_assert(!node || !is_bucket(uatomic_load(&node->next, CMM_RELAXED))); iter->node = node; iter->next = next; } -void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) +void cds_lfht_next(struct cds_lfht *ht __attribute__((unused)), + struct cds_lfht_iter *iter) { struct cds_lfht_node *node, *next; @@ -1726,7 +1818,7 @@ void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) } node = clear_flag(next); } - assert(!node || !is_bucket(CMM_LOAD_SHARED(node->next))); + urcu_posix_assert(!node || !is_bucket(uatomic_load(&node->next, CMM_RELAXED))); iter->node = node; iter->next = next; } @@ -1738,7 +1830,7 @@ void cds_lfht_first(struct cds_lfht *ht, struct cds_lfht_iter *iter) * Get next after first bucket node. The first bucket node is the * first node of the linked list. */ - iter->next = bucket_at(ht, 0)->next; + iter->next = uatomic_load(&bucket_at(ht, 0)->next, CMM_CONSUME); cds_lfht_next(ht, iter); } @@ -1748,7 +1840,7 @@ void cds_lfht_add(struct cds_lfht *ht, unsigned long hash, unsigned long size; node->reverse_hash = bit_reverse_ulong(hash); - size = rcu_dereference(ht->size); + size = uatomic_load(&ht->size, CMM_ACQUIRE); _cds_lfht_add(ht, hash, NULL, NULL, size, node, NULL, 0); ht_count_add(ht, size, hash); } @@ -1763,7 +1855,7 @@ struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht, struct cds_lfht_iter iter; node->reverse_hash = bit_reverse_ulong(hash); - size = rcu_dereference(ht->size); + size = uatomic_load(&ht->size, CMM_ACQUIRE); _cds_lfht_add(ht, hash, match, key, size, node, &iter, 0); if (iter.node == node) ht_count_add(ht, size, hash); @@ -1780,7 +1872,7 @@ struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht, struct cds_lfht_iter iter; node->reverse_hash = bit_reverse_ulong(hash); - size = rcu_dereference(ht->size); + size = uatomic_load(&ht->size, CMM_ACQUIRE); for (;;) { _cds_lfht_add(ht, hash, match, key, size, node, &iter, 0); if (iter.node == node) { @@ -1809,7 +1901,7 @@ int cds_lfht_replace(struct cds_lfht *ht, return -EINVAL; if (caa_unlikely(!match(old_iter->node, key))) return -EINVAL; - size = rcu_dereference(ht->size); + size = uatomic_load(&ht->size, CMM_ACQUIRE); return _cds_lfht_replace(ht, size, old_iter->node, old_iter->next, new_node); } @@ -1819,7 +1911,7 @@ int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node) unsigned long size; int ret; - size = rcu_dereference(ht->size); + size = uatomic_load(&ht->size, CMM_ACQUIRE); ret = _cds_lfht_del(ht, size, node); if (!ret) { unsigned long hash; @@ -1835,6 +1927,35 @@ int cds_lfht_is_node_deleted(const struct cds_lfht_node *node) return is_removed(CMM_LOAD_SHARED(node->next)); } +static +bool cds_lfht_is_empty(struct cds_lfht *ht) +{ + struct cds_lfht_node *node, *next; + bool empty = true; + bool was_online; + + was_online = ht->flavor->read_ongoing(); + if (!was_online) { + ht->flavor->thread_online(); + ht->flavor->read_lock(); + } + /* Check that the table is empty */ + node = bucket_at(ht, 0); + do { + next = rcu_dereference(node->next); + if (!is_bucket(next)) { + empty = false; + break; + } + node = clear_flag(next); + } while (!is_end(node)); + if (!was_online) { + ht->flavor->read_unlock(); + ht->flavor->thread_offline(); + } + return empty; +} + static int cds_lfht_delete_bucket(struct cds_lfht *ht) { @@ -1847,8 +1968,8 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) node = clear_flag(node)->next; if (!is_bucket(node)) return -EPERM; - assert(!is_removed(node)); - assert(!is_removal_owner(node)); + urcu_posix_assert(!is_removed(node)); + urcu_posix_assert(!is_removal_owner(node)); } while (!is_end(node)); /* * size accessed without rcu_dereference because hash table is @@ -1860,7 +1981,7 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) node = bucket_at(ht, i); dbg_printf("delete bucket: index %lu expected hash %lu hash %lu\n", i, i, bit_reverse_ulong(node->reverse_hash)); - assert(is_bucket(node->next)); + urcu_posix_assert(is_bucket(node->next)); } for (order = cds_lfht_get_count_order_ulong(size); (long)order >= 0; order--) @@ -1869,6 +1990,24 @@ int cds_lfht_delete_bucket(struct cds_lfht *ht) return 0; } +static +void do_auto_resize_destroy_cb(struct urcu_work *work) +{ + struct cds_lfht *ht = caa_container_of(work, struct cds_lfht, destroy_work); + int ret; + + ht->flavor->register_thread(); + ret = cds_lfht_delete_bucket(ht); + if (ret) + urcu_die(-ret); + free_split_items_count(ht); + ret = pthread_mutex_destroy(&ht->resize_mutex); + if (ret) + urcu_die(ret); + ht->flavor->unregister_thread(); + poison_free(ht->alloc, ht); +} + /* * Should only be called when no more concurrent readers nor writers can * possibly access the table. @@ -1878,23 +2017,39 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) int ret; if (ht->flags & CDS_LFHT_AUTO_RESIZE) { + /* + * Perform error-checking for emptiness before queuing + * work, so we can return error to the caller. This runs + * concurrently with ongoing resize. + */ + if (!cds_lfht_is_empty(ht)) + return -EPERM; /* Cancel ongoing resize operations. */ - _CMM_STORE_SHARED(ht->in_progress_destroy, 1); - /* Wait for in-flight resize operations to complete */ - urcu_workqueue_flush_queued_work(cds_lfht_workqueue); + uatomic_store(&ht->in_progress_destroy, 1, CMM_RELAXED); + if (attr) { + *attr = ht->caller_resize_attr; + ht->caller_resize_attr = NULL; + } + /* + * Queue destroy work after prior queued resize + * operations. Given there are no concurrent writers + * accessing the hash table at this point, no resize + * operations can be queued after this destroy work. + */ + urcu_workqueue_queue_work(cds_lfht_workqueue, + &ht->destroy_work, do_auto_resize_destroy_cb); + return 0; } ret = cds_lfht_delete_bucket(ht); if (ret) return ret; free_split_items_count(ht); if (attr) - *attr = ht->resize_attr; + *attr = ht->caller_resize_attr; ret = pthread_mutex_destroy(&ht->resize_mutex); if (ret) ret = -EBUSY; - if (ht->flags & CDS_LFHT_AUTO_RESIZE) - cds_lfht_fini_worker(ht->flavor); - poison_free(ht); + poison_free(ht->alloc, ht); return ret; } @@ -1957,7 +2112,7 @@ void _do_cds_lfht_grow(struct cds_lfht *ht, new_order = cds_lfht_get_count_order_ulong(new_size); dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n", old_size, old_order, new_size, new_order); - assert(new_size > old_size); + urcu_posix_assert(new_size > old_size); init_table(ht, old_order + 1, new_order); } @@ -1973,7 +2128,7 @@ void _do_cds_lfht_shrink(struct cds_lfht *ht, new_order = cds_lfht_get_count_order_ulong(new_size); dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n", old_size, old_order, new_size, new_order); - assert(new_size < old_size); + urcu_posix_assert(new_size < old_size); /* Remove and unlink all bucket nodes to remove. */ fini_table(ht, new_order + 1, old_order); @@ -1990,19 +2145,22 @@ void _do_cds_lfht_resize(struct cds_lfht *ht) * Resize table, re-do if the target size has changed under us. */ do { - if (CMM_LOAD_SHARED(ht->in_progress_destroy)) + if (uatomic_load(&ht->in_progress_destroy, CMM_RELAXED)) break; - ht->resize_initiated = 1; + + uatomic_store(&ht->resize_initiated, 1, CMM_RELAXED); + old_size = ht->size; - new_size = CMM_LOAD_SHARED(ht->resize_target); + new_size = uatomic_load(&ht->resize_target, CMM_RELAXED); if (old_size < new_size) _do_cds_lfht_grow(ht, old_size, new_size); else if (old_size > new_size) _do_cds_lfht_shrink(ht, old_size, new_size); - ht->resize_initiated = 0; + + uatomic_store(&ht->resize_initiated, 0, CMM_RELAXED); /* write resize_initiated before read resize_target */ cmm_smp_mb(); - } while (ht->size != CMM_LOAD_SHARED(ht->resize_target)); + } while (ht->size != uatomic_load(&ht->resize_target, CMM_RELAXED)); } static @@ -2023,7 +2181,12 @@ void resize_target_update_count(struct cds_lfht *ht, void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size) { resize_target_update_count(ht, new_size); - CMM_STORE_SHARED(ht->resize_initiated, 1); + + /* + * Set flags has early as possible even in contention case. + */ + uatomic_store(&ht->resize_initiated, 1, CMM_RELAXED); + mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); mutex_unlock(&ht->resize_mutex); @@ -2041,7 +2204,7 @@ void do_resize_cb(struct urcu_work *work) _do_cds_lfht_resize(ht); mutex_unlock(&ht->resize_mutex); ht->flavor->unregister_thread(); - poison_free(work); + poison_free(ht->alloc, work); } static @@ -2049,13 +2212,15 @@ void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht) { struct resize_work *work; - /* Store resize_target before read resize_initiated */ - cmm_smp_mb(); - if (!CMM_LOAD_SHARED(ht->resize_initiated)) { - if (CMM_LOAD_SHARED(ht->in_progress_destroy)) { + /* + * Store to resize_target is before read resize_initiated as guaranteed + * by either cmpxchg or _uatomic_xchg_monotonic_increase. + */ + if (!uatomic_load(&ht->resize_initiated, CMM_RELAXED)) { + if (uatomic_load(&ht->in_progress_destroy, CMM_RELAXED)) { return; } - work = malloc(sizeof(*work)); + work = ht->alloc->malloc(ht->alloc->state, sizeof(*work)); if (work == NULL) { dbg_printf("error allocating resize work, bailing out\n"); return; @@ -2063,7 +2228,7 @@ void __cds_lfht_resize_lazy_launch(struct cds_lfht *ht) work->ht = ht; urcu_workqueue_queue_work(cds_lfht_workqueue, &work->work, do_resize_cb); - CMM_STORE_SHARED(ht->resize_initiated, 1); + uatomic_store(&ht->resize_initiated, 1, CMM_RELAXED); } } @@ -2114,7 +2279,7 @@ void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size, __cds_lfht_resize_lazy_launch(ht); } -static void cds_lfht_before_fork(void *priv) +static void cds_lfht_before_fork(void *priv __attribute__((unused))) { if (cds_lfht_workqueue_atfork_nesting++) return; @@ -2124,7 +2289,7 @@ static void cds_lfht_before_fork(void *priv) urcu_workqueue_pause_worker(cds_lfht_workqueue); } -static void cds_lfht_after_fork_parent(void *priv) +static void cds_lfht_after_fork_parent(void *priv __attribute__((unused))) { if (--cds_lfht_workqueue_atfork_nesting) return; @@ -2135,7 +2300,7 @@ end: mutex_unlock(&cds_lfht_fork_mutex); } -static void cds_lfht_after_fork_child(void *priv) +static void cds_lfht_after_fork_child(void *priv __attribute__((unused))) { if (--cds_lfht_workqueue_atfork_nesting) return; @@ -2152,50 +2317,24 @@ static struct urcu_atfork cds_lfht_atfork = { .after_fork_child = cds_lfht_after_fork_child, }; -/* - * Block all signals for the workqueue worker thread to ensure we don't - * disturb the application. The SIGRCU signal needs to be unblocked for - * the urcu-signal flavor. - */ -static void cds_lfht_worker_init(struct urcu_workqueue *workqueue, - void *priv) -{ - int ret; - sigset_t mask; - - ret = sigfillset(&mask); - if (ret) - urcu_die(errno); - ret = sigdelset(&mask, SIGRCU); - if (ret) - urcu_die(errno); - ret = pthread_sigmask(SIG_SETMASK, &mask, NULL); - if (ret) - urcu_die(ret); -} - static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor) { flavor->register_rculfhash_atfork(&cds_lfht_atfork); mutex_lock(&cds_lfht_fork_mutex); - if (cds_lfht_workqueue_user_count++) - goto end; - cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL, - NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL); -end: + if (!cds_lfht_workqueue) + cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL, + NULL, NULL, NULL, NULL, NULL, NULL, NULL); mutex_unlock(&cds_lfht_fork_mutex); } -static void cds_lfht_fini_worker(const struct rcu_flavor_struct *flavor) +static void cds_lfht_exit(void) { mutex_lock(&cds_lfht_fork_mutex); - if (--cds_lfht_workqueue_user_count) - goto end; - urcu_workqueue_destroy(cds_lfht_workqueue); - cds_lfht_workqueue = NULL; -end: + if (cds_lfht_workqueue) { + urcu_workqueue_flush_queued_work(cds_lfht_workqueue); + urcu_workqueue_destroy(cds_lfht_workqueue); + cds_lfht_workqueue = NULL; + } mutex_unlock(&cds_lfht_fork_mutex); - - flavor->unregister_rculfhash_atfork(&cds_lfht_atfork); }