#include "config.h"
#include <urcu.h>
#include <urcu-call-rcu.h>
+#include <urcu-flavor.h>
#include <urcu/arch.h>
#include <urcu/uatomic.h>
#include <urcu/compiler.h>
#include <urcu/rculfhash.h>
+#include <rculfhash-internal.h>
#include <stdio.h>
#include <pthread.h>
-#ifdef DEBUG
-#define dbg_printf(fmt, args...) printf("[debug rculfhash] " fmt, ## args)
-#else
-#define dbg_printf(fmt, args...)
-#endif
-
/*
* Split-counters lazily update the global counter each 1024
* addition/removal. It automatically keeps track of resize required.
#define MIN_TABLE_ORDER 0
#define MIN_TABLE_SIZE (1UL << MIN_TABLE_ORDER)
-#if (CAA_BITS_PER_LONG == 32)
-#define MAX_TABLE_ORDER 32
-#else
-#define MAX_TABLE_ORDER 64
-#endif
-
/*
* Minimum number of bucket nodes to touch per thread to parallelize grow/shrink.
*/
#define MIN_PARTITION_PER_THREAD_ORDER 12
#define MIN_PARTITION_PER_THREAD (1UL << MIN_PARTITION_PER_THREAD_ORDER)
-#ifndef min
-#define min(a, b) ((a) < (b) ? (a) : (b))
-#endif
-
-#ifndef max
-#define max(a, b) ((a) > (b) ? (a) : (b))
-#endif
-
/*
* The removed flag needs to be updated atomically with the pointer.
* It indicates that no node must attach to the node scheduled for
unsigned long add, del;
} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
-/*
- * rcu_table: Contains the size and desired new size if a resize
- * operation is in progress, as well as the statically-sized array of
- * bucket table pointers.
- */
-struct rcu_table {
- unsigned long size; /* always a power of 2, shared (RCU) */
- unsigned long resize_target;
- int resize_initiated;
-
- /*
- * Contains the per order-index-level bucket node table. The size
- * of each bucket node table is half the number of hashes contained
- * in this order (except for order 0). The minimum allocation size
- * parameter allows combining the bucket node arrays of the lowermost
- * levels to improve cache locality for small index orders.
- */
- struct cds_lfht_node *tbl[MAX_TABLE_ORDER];
-};
-
-/*
- * cds_lfht: Top-level data structure representing a lock-free hash
- * table. Defined in the implementation file to make it be an opaque
- * cookie to users.
- */
-struct cds_lfht {
- struct rcu_table t;
- unsigned long min_alloc_buckets_order;
- unsigned long min_nr_alloc_buckets;
- int flags;
- /*
- * We need to put the work threads offline (QSBR) when taking this
- * mutex, because we use synchronize_rcu within this mutex critical
- * section, which waits on read-side critical sections, and could
- * therefore cause grace-period deadlock if we hold off RCU G.P.
- * completion.
- */
- pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */
- unsigned int in_progress_resize, in_progress_destroy;
- void (*cds_lfht_call_rcu)(struct rcu_head *head,
- void (*func)(struct rcu_head *head));
- void (*cds_lfht_synchronize_rcu)(void);
- void (*cds_lfht_rcu_read_lock)(void);
- void (*cds_lfht_rcu_read_unlock)(void);
- void (*cds_lfht_rcu_thread_offline)(void);
- void (*cds_lfht_rcu_thread_online)(void);
- void (*cds_lfht_rcu_register_thread)(void);
- void (*cds_lfht_rcu_unregister_thread)(void);
- pthread_attr_t *resize_attr; /* Resize threads attributes */
- long count; /* global approximate item count */
- struct ht_items_count *split_count; /* split item count */
-};
-
/*
* rcu_resize_work: Contains arguments passed to RCU worker thread
* responsible for performing lazy resize.
unsigned long start, unsigned long len);
};
-static
-void _cds_lfht_add(struct cds_lfht *ht,
- cds_lfht_match_fct match,
- const void *key,
- unsigned long size,
- struct cds_lfht_node *node,
- struct cds_lfht_iter *unique_ret,
- int bucket);
-
/*
* Algorithm to reverse bits in a word by lookup table, extended to
* 64-bit words.
return fls_ulong(x - 1);
}
-#ifdef POISON_FREE
-#define poison_free(ptr) \
- do { \
- if (ptr) { \
- memset(ptr, 0x42, sizeof(*(ptr))); \
- free(ptr); \
- } \
- } while (0)
-#else
-#define poison_free(ptr) free(ptr)
-#endif
-
static
void cds_lfht_resize_lazy_grow(struct cds_lfht *ht, unsigned long size, int growth);
{
unsigned long split_count;
int index;
+ long count;
if (caa_unlikely(!ht->split_count))
return;
index = ht_get_split_count_index(hash);
split_count = uatomic_add_return(&ht->split_count[index].add, 1);
- if (caa_unlikely(!(split_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))) {
- long count;
-
- dbg_printf("add split count %lu\n", split_count);
- count = uatomic_add_return(&ht->count,
- 1UL << COUNT_COMMIT_ORDER);
- /* If power of 2 */
- if (!(count & (count - 1))) {
- if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size)
- return;
- dbg_printf("add set global %ld\n", count);
- cds_lfht_resize_lazy_count(ht, size,
- count >> (CHAIN_LEN_TARGET - 1));
- }
- }
+ if (caa_likely(split_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))
+ return;
+ /* Only if number of add multiple of 1UL << COUNT_COMMIT_ORDER */
+
+ dbg_printf("add split count %lu\n", split_count);
+ count = uatomic_add_return(&ht->count,
+ 1UL << COUNT_COMMIT_ORDER);
+ if (caa_likely(count & (count - 1)))
+ return;
+ /* Only if global count is power of 2 */
+
+ if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size)
+ return;
+ dbg_printf("add set global %ld\n", count);
+ cds_lfht_resize_lazy_count(ht, size,
+ count >> (CHAIN_LEN_TARGET - 1));
}
static
{
unsigned long split_count;
int index;
+ long count;
if (caa_unlikely(!ht->split_count))
return;
index = ht_get_split_count_index(hash);
split_count = uatomic_add_return(&ht->split_count[index].del, 1);
- if (caa_unlikely(!(split_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))) {
- long count;
-
- dbg_printf("del split count %lu\n", split_count);
- count = uatomic_add_return(&ht->count,
- -(1UL << COUNT_COMMIT_ORDER));
- /* If power of 2 */
- if (!(count & (count - 1))) {
- if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size)
- return;
- dbg_printf("del set global %ld\n", count);
- /*
- * Don't shrink table if the number of nodes is below a
- * certain threshold.
- */
- if (count < (1UL << COUNT_COMMIT_ORDER) * (split_count_mask + 1))
- return;
- cds_lfht_resize_lazy_count(ht, size,
- count >> (CHAIN_LEN_TARGET - 1));
- }
- }
+ if (caa_likely(split_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))
+ return;
+ /* Only if number of deletes multiple of 1UL << COUNT_COMMIT_ORDER */
+
+ dbg_printf("del split count %lu\n", split_count);
+ count = uatomic_add_return(&ht->count,
+ -(1UL << COUNT_COMMIT_ORDER));
+ if (caa_likely(count & (count - 1)))
+ return;
+ /* Only if global count is power of 2 */
+
+ if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size)
+ return;
+ dbg_printf("del set global %ld\n", count);
+ /*
+ * Don't shrink table if the number of nodes is below a
+ * certain threshold.
+ */
+ if (count < (1UL << COUNT_COMMIT_ORDER) * (split_count_mask + 1))
+ return;
+ cds_lfht_resize_lazy_count(ht, size,
+ count >> (CHAIN_LEN_TARGET - 1));
}
static
static
void cds_lfht_alloc_bucket_table(struct cds_lfht *ht, unsigned long order)
{
- if (order == 0) {
- ht->t.tbl[0] = calloc(ht->min_nr_alloc_buckets,
- sizeof(struct cds_lfht_node));
- assert(ht->t.tbl[0]);
- } else if (order > ht->min_alloc_buckets_order) {
- ht->t.tbl[order] = calloc(1UL << (order -1),
- sizeof(struct cds_lfht_node));
- assert(ht->t.tbl[order]);
- }
- /* Nothing to do for 0 < order && order <= ht->min_alloc_buckets_order */
+ return ht->mm->alloc_bucket_table(ht, order);
}
/*
static
void cds_lfht_free_bucket_table(struct cds_lfht *ht, unsigned long order)
{
- if (order == 0)
- poison_free(ht->t.tbl[0]);
- else if (order > ht->min_alloc_buckets_order)
- poison_free(ht->t.tbl[order]);
- /* Nothing to do for 0 < order && order <= ht->min_alloc_buckets_order */
+ return ht->mm->free_bucket_table(ht, order);
}
static inline
struct cds_lfht_node *bucket_at(struct cds_lfht *ht, unsigned long index)
{
- unsigned long order;
-
- if ((__builtin_constant_p(index) && index == 0)
- || index < ht->min_nr_alloc_buckets) {
- dbg_printf("bucket index %lu order 0 aridx 0\n", index);
- return &ht->t.tbl[0][index];
- }
- /*
- * equivalent to get_count_order_ulong(index + 1), but optimizes
- * away the non-existing 0 special-case for
- * get_count_order_ulong.
- */
- order = fls_ulong(index);
- dbg_printf("bucket index %lu order %lu aridx %lu\n",
- index, order, index & ((1UL << (order - 1)) - 1));
- return &ht->t.tbl[order][index & ((1UL << (order - 1)) - 1)];
+ return ht->bucket_at(ht, index);
}
static inline
{
struct partition_resize_work *work = arg;
- work->ht->cds_lfht_rcu_register_thread();
+ work->ht->flavor->register_thread();
work->fct(work->ht, work->i, work->start, work->len);
- work->ht->cds_lfht_rcu_unregister_thread();
+ work->ht->flavor->unregister_thread();
return NULL;
}
unsigned long j, size = 1UL << (i - 1);
assert(i > MIN_TABLE_ORDER);
- ht->cds_lfht_rcu_read_lock();
+ ht->flavor->read_lock();
for (j = size + start; j < size + start + len; j++) {
struct cds_lfht_node *new_node = bucket_at(ht, j);
new_node->reverse_hash = bit_reverse_ulong(j);
_cds_lfht_add(ht, NULL, NULL, size, new_node, NULL, 1);
}
- ht->cds_lfht_rcu_read_unlock();
+ ht->flavor->read_unlock();
}
static
{
assert(nr_cpus_mask != -1);
if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) {
- ht->cds_lfht_rcu_thread_online();
+ ht->flavor->thread_online();
init_table_populate_partition(ht, i, 0, len);
- ht->cds_lfht_rcu_thread_offline();
+ ht->flavor->thread_offline();
return;
}
partition_resize_helper(ht, i, len, init_table_populate_partition);
dbg_printf("init order %lu len: %lu\n", i, len);
/* Stop expand if the resize target changes under us */
- if (CMM_LOAD_SHARED(ht->t.resize_target) < (1UL << i))
+ if (CMM_LOAD_SHARED(ht->resize_target) < (1UL << i))
break;
cds_lfht_alloc_bucket_table(ht, i);
* Update table size.
*/
cmm_smp_wmb(); /* populate data before RCU size */
- CMM_STORE_SHARED(ht->t.size, 1UL << i);
+ CMM_STORE_SHARED(ht->size, 1UL << i);
dbg_printf("init new size: %lu\n", 1UL << i);
if (CMM_LOAD_SHARED(ht->in_progress_destroy))
unsigned long j, size = 1UL << (i - 1);
assert(i > MIN_TABLE_ORDER);
- ht->cds_lfht_rcu_read_lock();
+ ht->flavor->read_lock();
for (j = size + start; j < size + start + len; j++) {
struct cds_lfht_node *fini_node = bucket_at(ht, j);
fini_node->reverse_hash = bit_reverse_ulong(j);
(void) _cds_lfht_del(ht, size, fini_node, 1);
}
- ht->cds_lfht_rcu_read_unlock();
+ ht->flavor->read_unlock();
}
static
assert(nr_cpus_mask != -1);
if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) {
- ht->cds_lfht_rcu_thread_online();
+ ht->flavor->thread_online();
remove_table_partition(ht, i, 0, len);
- ht->cds_lfht_rcu_thread_offline();
+ ht->flavor->thread_offline();
return;
}
partition_resize_helper(ht, i, len, remove_table_partition);
}
+/*
+ * fini_table() is never called for first_order == 0, which is why
+ * free_by_rcu_order == 0 can be used as criterion to know if free must
+ * be called.
+ */
static
void fini_table(struct cds_lfht *ht,
unsigned long first_order, unsigned long last_order)
dbg_printf("fini order %lu len: %lu\n", i, len);
/* Stop shrink if the resize target changes under us */
- if (CMM_LOAD_SHARED(ht->t.resize_target) > (1UL << (i - 1)))
+ if (CMM_LOAD_SHARED(ht->resize_target) > (1UL << (i - 1)))
break;
cmm_smp_wmb(); /* populate data before RCU size */
- CMM_STORE_SHARED(ht->t.size, 1UL << (i - 1));
+ CMM_STORE_SHARED(ht->size, 1UL << (i - 1));
/*
* We need to wait for all add operations to reach Q.S. (and
* releasing the old bucket nodes. Otherwise their lookup will
* return a logically removed node as insert position.
*/
- ht->cds_lfht_synchronize_rcu();
+ ht->flavor->update_synchronize_rcu();
if (free_by_rcu_order)
cds_lfht_free_bucket_table(ht, free_by_rcu_order);
}
if (free_by_rcu_order) {
- ht->cds_lfht_synchronize_rcu();
+ ht->flavor->update_synchronize_rcu();
cds_lfht_free_bucket_table(ht, free_by_rcu_order);
}
}
struct cds_lfht *_cds_lfht_new(unsigned long init_size,
unsigned long min_nr_alloc_buckets,
+ unsigned long max_nr_buckets,
int flags,
- void (*cds_lfht_call_rcu)(struct rcu_head *head,
- void (*func)(struct rcu_head *head)),
- void (*cds_lfht_synchronize_rcu)(void),
- void (*cds_lfht_rcu_read_lock)(void),
- void (*cds_lfht_rcu_read_unlock)(void),
- void (*cds_lfht_rcu_thread_offline)(void),
- void (*cds_lfht_rcu_thread_online)(void),
- void (*cds_lfht_rcu_register_thread)(void),
- void (*cds_lfht_rcu_unregister_thread)(void),
+ const struct cds_lfht_mm_type *mm,
+ const struct rcu_flavor_struct *flavor,
pthread_attr_t *attr)
{
struct cds_lfht *ht;
/* min_nr_alloc_buckets must be power of two */
if (!min_nr_alloc_buckets || (min_nr_alloc_buckets & (min_nr_alloc_buckets - 1)))
return NULL;
+
/* init_size must be power of two */
if (!init_size || (init_size & (init_size - 1)))
return NULL;
+
+ /*
+ * Memory management plugin default.
+ */
+ if (!mm) {
+ if (!max_nr_buckets) {
+ /*
+ * If the maximum number of buckets is not
+ * specified, we cannot use the mmap allocator,
+ * so fallback on order allocator.
+ */
+ mm = &cds_lfht_mm_order;
+ } else if (CAA_BITS_PER_LONG > 32
+ && max_nr_buckets <= (1ULL << 32)) {
+ /*
+ * For 64-bit architectures, with max number of
+ * buckets small enough not to use the entire
+ * 64-bit memory mapping space (and allowing a
+ * fair number of hash table instances), use the
+ * mmap allocator, which is faster than the
+ * order allocator.
+ */
+ mm = &cds_lfht_mm_mmap;
+ } else {
+ /*
+ * The fallback is to use the order allocator.
+ */
+ mm = &cds_lfht_mm_order;
+ }
+ }
+
+ /* max_nr_buckets == 0 for order based mm means infinite */
+ if (mm == &cds_lfht_mm_order && !max_nr_buckets)
+ max_nr_buckets = 1UL << (MAX_TABLE_ORDER - 1);
+
+ /* max_nr_buckets must be power of two */
+ if (!max_nr_buckets || (max_nr_buckets & (max_nr_buckets - 1)))
+ return NULL;
+
min_nr_alloc_buckets = max(min_nr_alloc_buckets, MIN_TABLE_SIZE);
init_size = max(init_size, MIN_TABLE_SIZE);
- ht = calloc(1, sizeof(struct cds_lfht));
+ max_nr_buckets = max(max_nr_buckets, min_nr_alloc_buckets);
+ init_size = min(init_size, max_nr_buckets);
+
+ ht = mm->alloc_cds_lfht(min_nr_alloc_buckets, max_nr_buckets);
assert(ht);
+ assert(ht->mm == mm);
+ assert(ht->bucket_at == mm->bucket_at);
+
ht->flags = flags;
- ht->cds_lfht_call_rcu = cds_lfht_call_rcu;
- ht->cds_lfht_synchronize_rcu = cds_lfht_synchronize_rcu;
- ht->cds_lfht_rcu_read_lock = cds_lfht_rcu_read_lock;
- ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock;
- ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline;
- ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online;
- ht->cds_lfht_rcu_register_thread = cds_lfht_rcu_register_thread;
- ht->cds_lfht_rcu_unregister_thread = cds_lfht_rcu_unregister_thread;
+ ht->flavor = flavor;
ht->resize_attr = attr;
alloc_split_items_count(ht);
/* this mutex should not nest in read-side C.S. */
pthread_mutex_init(&ht->resize_mutex, NULL);
order = get_count_order_ulong(init_size);
- ht->t.resize_target = 1UL << order;
- ht->min_nr_alloc_buckets = min_nr_alloc_buckets;
- ht->min_alloc_buckets_order = get_count_order_ulong(min_nr_alloc_buckets);
+ ht->resize_target = 1UL << order;
cds_lfht_create_bucket(ht, 1UL << order);
- ht->t.size = 1UL << order;
+ ht->size = 1UL << order;
return ht;
}
reverse_hash = bit_reverse_ulong(hash);
- size = rcu_dereference(ht->t.size);
+ size = rcu_dereference(ht->size);
bucket = lookup_bucket(ht, size, hash);
/* We can always skip the bucket node initially */
node = rcu_dereference(bucket->next);
unsigned long size;
node->reverse_hash = bit_reverse_ulong((unsigned long) hash);
- size = rcu_dereference(ht->t.size);
+ size = rcu_dereference(ht->size);
_cds_lfht_add(ht, NULL, NULL, size, node, NULL, 0);
ht_count_add(ht, size, hash);
}
struct cds_lfht_iter iter;
node->reverse_hash = bit_reverse_ulong((unsigned long) hash);
- size = rcu_dereference(ht->t.size);
+ size = rcu_dereference(ht->size);
_cds_lfht_add(ht, match, key, size, node, &iter, 0);
if (iter.node == node)
ht_count_add(ht, size, hash);
struct cds_lfht_iter iter;
node->reverse_hash = bit_reverse_ulong((unsigned long) hash);
- size = rcu_dereference(ht->t.size);
+ size = rcu_dereference(ht->size);
for (;;) {
_cds_lfht_add(ht, match, key, size, node, &iter, 0);
if (iter.node == node) {
{
unsigned long size;
- size = rcu_dereference(ht->t.size);
+ size = rcu_dereference(ht->size);
return _cds_lfht_replace(ht, size, old_iter->node, old_iter->next,
new_node);
}
unsigned long size, hash;
int ret;
- size = rcu_dereference(ht->t.size);
+ size = rcu_dereference(ht->size);
ret = _cds_lfht_del(ht, size, iter->node, 0);
if (!ret) {
hash = bit_reverse_ulong(iter->node->reverse_hash);
* size accessed without rcu_dereference because hash table is
* being destroyed.
*/
- size = ht->t.size;
+ size = ht->size;
/* Internal sanity check: all nodes left should be bucket */
for (i = 0; i < size; i++) {
node = bucket_at(ht, i);
assert(uatomic_read(&ht->in_progress_resize));
if (CMM_LOAD_SHARED(ht->in_progress_destroy))
break;
- ht->t.resize_initiated = 1;
- old_size = ht->t.size;
- new_size = CMM_LOAD_SHARED(ht->t.resize_target);
+ ht->resize_initiated = 1;
+ old_size = ht->size;
+ new_size = CMM_LOAD_SHARED(ht->resize_target);
if (old_size < new_size)
_do_cds_lfht_grow(ht, old_size, new_size);
else if (old_size > new_size)
_do_cds_lfht_shrink(ht, old_size, new_size);
- ht->t.resize_initiated = 0;
+ ht->resize_initiated = 0;
/* write resize_initiated before read resize_target */
cmm_smp_mb();
- } while (ht->t.size != CMM_LOAD_SHARED(ht->t.resize_target));
+ } while (ht->size != CMM_LOAD_SHARED(ht->resize_target));
}
static
unsigned long resize_target_grow(struct cds_lfht *ht, unsigned long new_size)
{
- return _uatomic_xchg_monotonic_increase(&ht->t.resize_target, new_size);
+ return _uatomic_xchg_monotonic_increase(&ht->resize_target, new_size);
}
static
unsigned long count)
{
count = max(count, MIN_TABLE_SIZE);
- uatomic_set(&ht->t.resize_target, count);
+ count = min(count, ht->max_nr_buckets);
+ uatomic_set(&ht->resize_target, count);
}
void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
{
resize_target_update_count(ht, new_size);
- CMM_STORE_SHARED(ht->t.resize_initiated, 1);
- ht->cds_lfht_rcu_thread_offline();
+ CMM_STORE_SHARED(ht->resize_initiated, 1);
+ ht->flavor->thread_offline();
pthread_mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
pthread_mutex_unlock(&ht->resize_mutex);
- ht->cds_lfht_rcu_thread_online();
+ ht->flavor->thread_online();
}
static
caa_container_of(head, struct rcu_resize_work, head);
struct cds_lfht *ht = work->ht;
- ht->cds_lfht_rcu_thread_offline();
+ ht->flavor->thread_offline();
pthread_mutex_lock(&ht->resize_mutex);
_do_cds_lfht_resize(ht);
pthread_mutex_unlock(&ht->resize_mutex);
- ht->cds_lfht_rcu_thread_online();
+ ht->flavor->thread_online();
poison_free(work);
cmm_smp_mb(); /* finish resize before decrement */
uatomic_dec(&ht->in_progress_resize);
/* Store resize_target before read resize_initiated */
cmm_smp_mb();
- if (!CMM_LOAD_SHARED(ht->t.resize_initiated)) {
+ if (!CMM_LOAD_SHARED(ht->resize_initiated)) {
uatomic_inc(&ht->in_progress_resize);
cmm_smp_mb(); /* increment resize count before load destroy */
if (CMM_LOAD_SHARED(ht->in_progress_destroy)) {
}
work = malloc(sizeof(*work));
work->ht = ht;
- ht->cds_lfht_call_rcu(&work->head, do_resize_cb);
- CMM_STORE_SHARED(ht->t.resize_initiated, 1);
+ ht->flavor->update_call_rcu(&work->head, do_resize_cb);
+ CMM_STORE_SHARED(ht->resize_initiated, 1);
}
}
{
unsigned long target_size = size << growth;
+ target_size = min(target_size, ht->max_nr_buckets);
if (resize_target_grow(ht, target_size) >= target_size)
return;
if (!(ht->flags & CDS_LFHT_AUTO_RESIZE))
return;
count = max(count, MIN_TABLE_SIZE);
+ count = min(count, ht->max_nr_buckets);
if (count == size)
return; /* Already the right size, no resize needed */
if (count > size) { /* lazy grow */
for (;;) {
unsigned long s;
- s = uatomic_cmpxchg(&ht->t.resize_target, size, count);
+ s = uatomic_cmpxchg(&ht->resize_target, size, count);
if (s == size)
break; /* no resize needed */
if (s > size)