* Upon node removal below min_child, if child pool is filled
* beyond capacity, we roll back to pigeon.
*/
- { .type_class = RCU_JA_PIGEON, .min_child = 101, .max_child = ja_type_7_max_child, .order = 11, },
+ { .type_class = RCU_JA_PIGEON, .min_child = 95, .max_child = ja_type_7_max_child, .order = 11, },
{ .type_class = RCU_JA_NULL, .min_child = 0, .max_child = ja_type_8_max_child, },
};
goto unlock_parent;
}
+ /*
+ * Perform a lookup query to handle the case where
+ * old_node_flag_ptr is NULL. We cannot use it to check if the
+ * node has been populated between RCU lookup and mutex
+ * acquisition.
+ */
+ if (!old_node_flag_ptr) {
+ uint8_t iter_key;
+ struct cds_ja_inode_flag *lookup_node_flag;
+ struct cds_ja_inode_flag **lookup_node_flag_ptr;
+
+ iter_key = (uint8_t) (key >> (JA_BITS_PER_BYTE * (ja->tree_depth - level)));
+ lookup_node_flag = ja_node_get_nth(attach_node_flag,
+ &lookup_node_flag_ptr,
+ iter_key);
+ if (lookup_node_flag) {
+ ret = -EEXIST;
+ goto unlock_parent;
+ }
+ }
+
if (attach_node_flag_ptr && ja_node_ptr(*attach_node_flag_ptr) !=
ja_node_ptr(attach_node_flag)) {
/*
iter_key,
iter_node_flag,
NULL, i);
- if (ret)
+ if (ret) {
+ dbg_printf("branch creation error %d\n", ret);
goto check_error;
+ }
created_nodes[nr_created_nodes++] = iter_dest_node_flag;
iter_node_flag = iter_dest_node_flag;
}
iter_key,
iter_node_flag,
shadow_node, level - 1);
- if (ret)
+ if (ret) {
+ dbg_printf("branch publish error %d\n", ret);
goto check_error;
+ }
/*
* Attach branch
*/
return ret;
}
-int cds_ja_add(struct cds_ja *ja, uint64_t key,
- struct cds_ja_node *new_node)
+static
+int _cds_ja_add(struct cds_ja *ja, uint64_t key,
+ struct cds_ja_node *new_node,
+ struct cds_ja_node **unique_node_ret)
{
unsigned int tree_depth, i;
struct cds_ja_inode_flag *attach_node_flag,
if (!ja_node_ptr(node_flag)) {
dbg_printf("cds_ja_add NULL parent2_node_flag %p parent_node_flag %p node_flag_ptr %p node_flag %p\n",
parent2_node_flag, parent_node_flag, node_flag_ptr, node_flag);
+
attach_node_flag = parent_node_flag;
attach_node_flag_ptr = parent_node_flag_ptr;
parent_attach_node_flag = parent2_node_flag;
node_flag,
key, i, new_node);
} else {
+ if (unique_node_ret) {
+ *unique_node_ret = (struct cds_ja_node *) ja_node_ptr(node_flag);
+ return -EEXIST;
+ }
+
dbg_printf("cds_ja_add duplicate parent2_node_flag %p parent_node_flag %p node_flag_ptr %p node_flag %p\n",
parent2_node_flag, parent_node_flag, node_flag_ptr, node_flag);
+
attach_node_flag = node_flag;
attach_node_flag_ptr = node_flag_ptr;
parent_attach_node_flag = parent_node_flag;
return ret;
}
+int cds_ja_add(struct cds_ja *ja, uint64_t key,
+ struct cds_ja_node *new_node)
+{
+ return _cds_ja_add(ja, key, new_node, NULL);
+}
+
+struct cds_ja_node *cds_ja_add_unique(struct cds_ja *ja, uint64_t key,
+ struct cds_ja_node *new_node)
+{
+ int ret;
+ struct cds_ja_node *ret_node;
+
+ ret = _cds_ja_add(ja, key, new_node, &ret_node);
+ if (ret == -EEXIST)
+ return ret_node;
+ else
+ return new_node;
+}
+
/*
* Note: there is no need to lookup the pointer address associated with
* each node's nth item after taking the lock: it's already been done by
struct cds_hlist_node *pos, *tmp;
uint8_t v;
- ja_linear_node_get_ith_pos(type, node, j, &v, &iter);
+ ja_linear_node_get_ith_pos(type, pool, j, &v, &iter);
if (!iter)
continue;
head.next = (struct cds_hlist_node *) iter;
ret = rcuja_delete_ht(ja->ht);
if (ret)
return ret;
+ fprintf(stderr, "Waiting arbitrary time for node free accounting...\n");
+ sleep(10); //wait for free TEST XXX
flavor->thread_online();
if (uatomic_read(&ja->nr_fallback))
fprintf(stderr,