rcuja fix: get the right node
[urcu.git] / rcuja / rcuja.c
index a651c7e2174c2829e7c725760a483446029077c7..67964b65da65bb4abee4db5b9379f270bda06f47 100644 (file)
@@ -61,32 +61,6 @@ struct cds_ja_type {
        uint16_t pool_size_order;       /* pool size */
 };
 
-/*
- * Number of least significant pointer bits reserved to represent the
- * child type.
- */
-#define JA_TYPE_BITS   3
-#define JA_TYPE_MAX_NR (1UL << JA_TYPE_BITS)
-#define JA_TYPE_MASK   (JA_TYPE_MAX_NR - 1)
-#define JA_PTR_MASK    (~JA_TYPE_MASK)
-
-#define JA_ENTRY_PER_NODE      256UL
-#define JA_BITS_PER_BYTE       3
-
-#define JA_MAX_DEPTH   9       /* Maximum depth, including leafs */
-
-/*
- * Entry for NULL node is at index 8 of the table. It is never encoded
- * in flags.
- */
-#define NODE_INDEX_NULL                8
-
-/*
- * Number of removals needed on a fallback node before we try to shrink
- * it.
- */
-#define JA_FALLBACK_REMOVAL_COUNT      8
-
 /*
  * Iteration on the array to find the right node size for the number of
  * children stops when it reaches .max_child == 256 (this is the largest
@@ -259,32 +233,11 @@ struct cds_ja_inode {
        } u;
 };
 
-static
-struct cds_ja_inode_flag *ja_node_flag(struct cds_ja_inode *node,
-               unsigned long type)
-{
-       assert(type < (1UL << JA_TYPE_BITS));
-       return (struct cds_ja_inode_flag *) (((unsigned long) node) | type);
-}
-
-static
-struct cds_ja_inode *ja_node_ptr(struct cds_ja_inode_flag *node)
-{
-       return (struct cds_ja_inode *) (((unsigned long) node) & JA_PTR_MASK);
-}
-
-static
-unsigned long ja_node_type(struct cds_ja_inode_flag *node)
-{
-       unsigned long type;
-
-       if (ja_node_ptr(node) == NULL) {
-               return NODE_INDEX_NULL;
-       }
-       type = (unsigned int) ((unsigned long) node & JA_TYPE_MASK);
-       assert(type < (1UL << JA_TYPE_BITS));
-       return type;
-}
+enum ja_recompact {
+       JA_RECOMPACT,
+       JA_RECOMPACT_ADD,
+       JA_RECOMPACT_DEL,
+};
 
 struct cds_ja_inode *alloc_cds_ja_node(const struct cds_ja_type *ja_type)
 {
@@ -312,7 +265,7 @@ uint8_t ja_linear_node_get_nr_child(const struct cds_ja_type *type,
                struct cds_ja_inode *node)
 {
        assert(type->type_class == RCU_JA_LINEAR || type->type_class == RCU_JA_POOL);
-       return CMM_LOAD_SHARED(node->u.data[0]);
+       return rcu_dereference(node->u.data[0]);
 }
 
 /*
@@ -324,6 +277,7 @@ static
 struct cds_ja_inode_flag *ja_linear_node_get_nth(const struct cds_ja_type *type,
                struct cds_ja_inode *node,
                struct cds_ja_inode_flag ***child_node_flag_ptr,
+               struct cds_ja_inode_flag ***node_flag_ptr,
                uint8_t n)
 {
        uint8_t nr_child;
@@ -344,13 +298,17 @@ struct cds_ja_inode_flag *ja_linear_node_get_nth(const struct cds_ja_type *type,
                if (CMM_LOAD_SHARED(values[i]) == n)
                        break;
        }
-       if (i >= nr_child)
+       if (i >= nr_child) {
+               if (caa_unlikely(node_flag_ptr))
+                       *node_flag_ptr = NULL;
                return NULL;
+       }
        pointers = (struct cds_ja_inode_flag **) align_ptr_size(&values[type->max_linear_child]);
-       if (caa_unlikely(child_node_flag_ptr))
-               *child_node_flag_ptr = &pointers[i];
        ptr = rcu_dereference(pointers[i]);
-       assert(ja_node_ptr(ptr) != NULL);
+       if (caa_unlikely(child_node_flag_ptr) && ptr)
+               *child_node_flag_ptr = &pointers[i];
+       if (caa_unlikely(node_flag_ptr))
+               *node_flag_ptr = &pointers[i];
        return ptr;
 }
 
@@ -377,6 +335,7 @@ static
 struct cds_ja_inode_flag *ja_pool_node_get_nth(const struct cds_ja_type *type,
                struct cds_ja_inode *node,
                struct cds_ja_inode_flag ***child_node_flag_ptr,
+               struct cds_ja_inode_flag ***node_flag_ptr,
                uint8_t n)
 {
        struct cds_ja_inode *linear;
@@ -388,7 +347,8 @@ struct cds_ja_inode_flag *ja_pool_node_get_nth(const struct cds_ja_type *type,
         */
        linear = (struct cds_ja_inode *)
                &node->u.data[((unsigned long) n >> (CHAR_BIT - type->nr_pool_order)) << type->pool_size_order];
-       return ja_linear_node_get_nth(type, linear, child_node_flag_ptr, n);
+       return ja_linear_node_get_nth(type, linear, child_node_flag_ptr,
+               node_flag_ptr, n);
 }
 
 static
@@ -405,17 +365,30 @@ static
 struct cds_ja_inode_flag *ja_pigeon_node_get_nth(const struct cds_ja_type *type,
                struct cds_ja_inode *node,
                struct cds_ja_inode_flag ***child_node_flag_ptr,
+               struct cds_ja_inode_flag ***node_flag_ptr,
                uint8_t n)
 {
        struct cds_ja_inode_flag **child_node_flag;
 
        assert(type->type_class == RCU_JA_PIGEON);
        child_node_flag = &((struct cds_ja_inode_flag **) node->u.data)[n];
-       if (caa_unlikely(child_node_flag_ptr))
+       dbg_printf("ja_pigeon_node_get_nth child_node_flag_ptr %p\n",
+               child_node_flag);
+       if (caa_unlikely(child_node_flag_ptr) && *child_node_flag)
                *child_node_flag_ptr = child_node_flag;
+       if (caa_unlikely(node_flag_ptr))
+               *node_flag_ptr = child_node_flag;
        return rcu_dereference(*child_node_flag);
 }
 
+static
+struct cds_ja_inode_flag *ja_pigeon_node_get_ith_pos(const struct cds_ja_type *type,
+               struct cds_ja_inode *node,
+               uint8_t i)
+{
+       return ja_pigeon_node_get_nth(type, node, NULL, NULL, i);
+}
+
 /*
  * ja_node_get_nth: get nth item from a node.
  * node_flag is already rcu_dereference'd.
@@ -423,6 +396,7 @@ struct cds_ja_inode_flag *ja_pigeon_node_get_nth(const struct cds_ja_type *type,
 static
 struct cds_ja_inode_flag * ja_node_get_nth(struct cds_ja_inode_flag *node_flag,
                struct cds_ja_inode_flag ***child_node_flag_ptr,
+               struct cds_ja_inode_flag ***node_flag_ptr,
                uint8_t n)
 {
        unsigned int type_index;
@@ -437,31 +411,19 @@ struct cds_ja_inode_flag * ja_node_get_nth(struct cds_ja_inode_flag *node_flag,
        switch (type->type_class) {
        case RCU_JA_LINEAR:
                return ja_linear_node_get_nth(type, node,
-                               child_node_flag_ptr, n);
+                               child_node_flag_ptr, node_flag_ptr, n);
        case RCU_JA_POOL:
                return ja_pool_node_get_nth(type, node,
-                               child_node_flag_ptr, n);
+                               child_node_flag_ptr, node_flag_ptr, n);
        case RCU_JA_PIGEON:
                return ja_pigeon_node_get_nth(type, node,
-                               child_node_flag_ptr, n);
+                               child_node_flag_ptr, node_flag_ptr, n);
        default:
                assert(0);
                return (void *) -1UL;
        }
 }
 
-/*
- * TODO: use ja_get_nr_child to monitor limits triggering shrink
- * recompaction.
- * Also use ja_get_nr_child to make the difference between resize and
- * pool change of compaction bit(s).
- */
-static
-unsigned int ja_get_nr_child(struct cds_ja_shadow_node *shadow_node)
-{
-       return shadow_node->nr_child;
-}
-
 static
 int ja_linear_node_set_nth(const struct cds_ja_type *type,
                struct cds_ja_inode *node,
@@ -472,7 +434,7 @@ int ja_linear_node_set_nth(const struct cds_ja_type *type,
        uint8_t nr_child;
        uint8_t *values, *nr_child_ptr;
        struct cds_ja_inode_flag **pointers;
-       unsigned int i;
+       unsigned int i, unused = 0;
 
        assert(type->type_class == RCU_JA_LINEAR || type->type_class == RCU_JA_POOL);
 
@@ -482,20 +444,35 @@ int ja_linear_node_set_nth(const struct cds_ja_type *type,
        assert(nr_child <= type->max_linear_child);
 
        values = &node->u.data[1];
+       pointers = (struct cds_ja_inode_flag **) align_ptr_size(&values[type->max_linear_child]);
+       /* Check if node value is already populated */
        for (i = 0; i < nr_child; i++) {
-               if (values[i] == n)
-                       return -EEXIST;
+               if (values[i] == n) {
+                       if (pointers[i])
+                               return -EEXIST;
+                       else
+                               break;
+               } else {
+                       if (!pointers[i])
+                               unused++;
+               }
        }
-       if (nr_child >= type->max_linear_child) {
-               /* No space left in this node type */
-               return -ENOSPC;
+       if (i == nr_child && nr_child >= type->max_linear_child) {
+               if (unused)
+                       return -ERANGE; /* recompact node */
+               else
+                       return -ENOSPC; /* No space left in this node type */
+       }
+
+       assert(pointers[i] == NULL);
+       rcu_assign_pointer(pointers[i], child_node_flag);
+       /* If we expanded the nr_child, increment it */
+       if (i == nr_child) {
+               CMM_STORE_SHARED(values[nr_child], n);
+               /* write pointer and value before nr_child */
+               cmm_smp_wmb();
+               CMM_STORE_SHARED(*nr_child_ptr, nr_child + 1);
        }
-       pointers = (struct cds_ja_inode_flag **) align_ptr_size(&values[type->max_linear_child]);
-       assert(pointers[nr_child] == NULL);
-       rcu_assign_pointer(pointers[nr_child], child_node_flag);
-       CMM_STORE_SHARED(values[nr_child], n);
-       cmm_smp_wmb();  /* write value and pointer before nr_child */
-       CMM_STORE_SHARED(*nr_child_ptr, nr_child + 1);
        shadow_node->nr_child++;
        dbg_printf("linear set nth: %u child, shadow: %u child, for node %p shadow %p\n",
                (unsigned int) CMM_LOAD_SHARED(*nr_child_ptr),
@@ -542,7 +519,6 @@ int ja_pigeon_node_set_nth(const struct cds_ja_type *type,
 /*
  * _ja_node_set_nth: set nth item within a node. Return an error
  * (negative error value) if it is already there.
- * TODO: exclusive access on node.
  */
 static
 int _ja_node_set_nth(const struct cds_ja_type *type,
@@ -571,54 +547,181 @@ int _ja_node_set_nth(const struct cds_ja_type *type,
        return 0;
 }
 
+static
+int ja_linear_node_clear_ptr(const struct cds_ja_type *type,
+               struct cds_ja_inode *node,
+               struct cds_ja_shadow_node *shadow_node,
+               struct cds_ja_inode_flag **node_flag_ptr)
+{
+       uint8_t nr_child;
+       uint8_t *nr_child_ptr;
+
+       assert(type->type_class == RCU_JA_LINEAR || type->type_class == RCU_JA_POOL);
+
+       nr_child_ptr = &node->u.data[0];
+       dbg_printf("linear clear ptr: nr_child_ptr %p\n", nr_child_ptr);
+       nr_child = *nr_child_ptr;
+       assert(nr_child <= type->max_linear_child);
+
+       if (shadow_node->fallback_removal_count) {
+               shadow_node->fallback_removal_count--;
+       } else {
+               if (shadow_node->nr_child <= type->min_child) {
+                       /* We need to try recompacting the node */
+                       return -EFBIG;
+               }
+       }
+       assert(*node_flag_ptr != NULL);
+       rcu_assign_pointer(*node_flag_ptr, NULL);
+       /*
+        * Value and nr_child are never changed (would cause ABA issue).
+        * Instead, we leave the pointer to NULL and recompact the node
+        * once in a while. It is allowed to set a NULL pointer to a new
+        * value without recompaction though.
+        * Only update the shadow node accounting.
+        */
+       shadow_node->nr_child--;
+       dbg_printf("linear clear ptr: %u child, shadow: %u child, for node %p shadow %p\n",
+               (unsigned int) CMM_LOAD_SHARED(*nr_child_ptr),
+               (unsigned int) shadow_node->nr_child,
+               node, shadow_node);
+
+       return 0;
+}
+
+static
+int ja_pool_node_clear_ptr(const struct cds_ja_type *type,
+               struct cds_ja_inode *node,
+               struct cds_ja_shadow_node *shadow_node,
+               struct cds_ja_inode_flag **node_flag_ptr,
+               uint8_t n)
+{
+       struct cds_ja_inode *linear;
+
+       assert(type->type_class == RCU_JA_POOL);
+       linear = (struct cds_ja_inode *)
+               &node->u.data[((unsigned long) n >> (CHAR_BIT - type->nr_pool_order)) << type->pool_size_order];
+       return ja_linear_node_clear_ptr(type, linear, shadow_node, node_flag_ptr);
+}
+
+static
+int ja_pigeon_node_clear_ptr(const struct cds_ja_type *type,
+               struct cds_ja_inode *node,
+               struct cds_ja_shadow_node *shadow_node,
+               struct cds_ja_inode_flag **node_flag_ptr)
+{
+       assert(type->type_class == RCU_JA_PIGEON);
+       dbg_printf("ja_pigeon_node_clear_ptr: clearing ptr: %p\n", *node_flag_ptr);
+       rcu_assign_pointer(*node_flag_ptr, NULL);
+       shadow_node->nr_child--;
+       return 0;
+}
+
+/*
+ * _ja_node_clear_ptr: clear ptr item within a node. Return an error
+ * (negative error value) if it is not found (-ENOENT).
+ */
+static
+int _ja_node_clear_ptr(const struct cds_ja_type *type,
+               struct cds_ja_inode *node,
+               struct cds_ja_shadow_node *shadow_node,
+               struct cds_ja_inode_flag **node_flag_ptr,
+               uint8_t n)
+{
+       switch (type->type_class) {
+       case RCU_JA_LINEAR:
+               return ja_linear_node_clear_ptr(type, node, shadow_node, node_flag_ptr);
+       case RCU_JA_POOL:
+               return ja_pool_node_clear_ptr(type, node, shadow_node, node_flag_ptr, n);
+       case RCU_JA_PIGEON:
+               return ja_pigeon_node_clear_ptr(type, node, shadow_node, node_flag_ptr);
+       case RCU_JA_NULL:
+               return -ENOENT;
+       default:
+               assert(0);
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
 /*
  * ja_node_recompact_add: recompact a node, adding a new child.
  * TODO: for pool type, take selection bit(s) into account.
- * Return 0 on success, -ENOENT if need to retry, or other negative
+ * Return 0 on success, -EAGAIN if need to retry, or other negative
  * error value otherwise.
  */
 static
-int ja_node_recompact_add(struct cds_ja *ja,
+int ja_node_recompact(enum ja_recompact mode,
+               struct cds_ja *ja,
                unsigned int old_type_index,
                const struct cds_ja_type *old_type,
                struct cds_ja_inode *old_node,
                struct cds_ja_shadow_node *shadow_node,
-               struct cds_ja_inode_flag **old_node_flag, uint8_t n,
-               struct cds_ja_inode_flag *child_node_flag)
+               struct cds_ja_inode_flag **old_node_flag_ptr, uint8_t n,
+               struct cds_ja_inode_flag *child_node_flag,
+               struct cds_ja_inode_flag **nullify_node_flag_ptr)
 {
        unsigned int new_type_index;
        struct cds_ja_inode *new_node;
-       struct cds_ja_shadow_node *new_shadow_node;
+       struct cds_ja_shadow_node *new_shadow_node = NULL;
        const struct cds_ja_type *new_type;
-       struct cds_ja_inode_flag *new_node_flag;
+       struct cds_ja_inode_flag *new_node_flag, *old_node_flag;
        int ret;
        int fallback = 0;
 
-       if (!shadow_node || old_type_index == NODE_INDEX_NULL) {
-               new_type_index = 0;
-       } else {
-               new_type_index = old_type_index + 1;
+       old_node_flag = *old_node_flag_ptr;
+
+       switch (mode) {
+       case JA_RECOMPACT:
+               new_type_index = old_type_index;
+               break;
+       case JA_RECOMPACT_ADD:
+               if (!shadow_node || old_type_index == NODE_INDEX_NULL) {
+                       new_type_index = 0;
+               } else {
+                       new_type_index = old_type_index + 1;
+               }
+               break;
+       case JA_RECOMPACT_DEL:
+               if (old_type_index == 0) {
+                       new_type_index = NODE_INDEX_NULL;
+               } else {
+                       new_type_index = old_type_index - 1;
+               }
+               break;
+       default:
+               assert(0);
        }
 
 retry:         /* for fallback */
-       dbg_printf("Recompact to type %d\n", new_type_index);
+       dbg_printf("Recompact from type %d to type %d\n",
+                       old_type_index, new_type_index);
        new_type = &ja_types[new_type_index];
-       new_node = alloc_cds_ja_node(new_type);
-       if (!new_node)
-               return -ENOMEM;
-       new_node_flag = ja_node_flag(new_node, new_type_index);
-
-       dbg_printf("Recompact inherit lock from %p\n", shadow_node);
-       new_shadow_node = rcuja_shadow_set(ja->ht, new_node, shadow_node);
-       if (!new_shadow_node) {
-               free(new_node);
-               return -ENOMEM;
+       if (new_type_index != NODE_INDEX_NULL) {
+               new_node = alloc_cds_ja_node(new_type);
+               if (!new_node)
+                       return -ENOMEM;
+               new_node_flag = ja_node_flag(new_node, new_type_index);
+               dbg_printf("Recompact inherit lock from %p\n", shadow_node);
+               new_shadow_node = rcuja_shadow_set(ja->ht, new_node_flag, shadow_node, ja);
+               if (!new_shadow_node) {
+                       free(new_node);
+                       return -ENOMEM;
+               }
+               if (fallback)
+                       new_shadow_node->fallback_removal_count =
+                                               JA_FALLBACK_REMOVAL_COUNT;
+       } else {
+               new_node = NULL;
+               new_node_flag = NULL;
        }
-       if (fallback)
-               new_shadow_node->fallback_removal_count =
-                                       JA_FALLBACK_REMOVAL_COUNT;
 
-       assert(old_type->type_class != RCU_JA_PIGEON);
+       assert(mode != JA_RECOMPACT_ADD || old_type->type_class != RCU_JA_PIGEON);
+
+       if (new_type_index == NODE_INDEX_NULL)
+               goto skip_copy;
+
        switch (old_type->type_class) {
        case RCU_JA_LINEAR:
        {
@@ -633,6 +736,8 @@ retry:              /* for fallback */
                        ja_linear_node_get_ith_pos(old_type, old_node, i, &v, &iter);
                        if (!iter)
                                continue;
+                       if (mode == JA_RECOMPACT_DEL && *nullify_node_flag_ptr == iter)
+                               continue;
                        ret = _ja_node_set_nth(new_type, new_node,
                                        new_shadow_node,
                                        v, iter);
@@ -663,6 +768,8 @@ retry:              /* for fallback */
                                                j, &v, &iter);
                                if (!iter)
                                        continue;
+                               if (mode == JA_RECOMPACT_DEL && *nullify_node_flag_ptr == iter)
+                                       continue;
                                ret = _ja_node_set_nth(new_type, new_node,
                                                new_shadow_node,
                                                v, iter);
@@ -676,25 +783,62 @@ retry:            /* for fallback */
                break;
        }
        case RCU_JA_NULL:
-               /* Nothing to copy */
+               assert(mode == JA_RECOMPACT_ADD);
                break;
        case RCU_JA_PIGEON:
+       {
+               uint8_t nr_child;
+               unsigned int i;
+
+               assert(mode == JA_RECOMPACT_DEL);
+               nr_child = shadow_node->nr_child;
+               for (i = 0; i < nr_child; i++) {
+                       struct cds_ja_inode_flag *iter;
+
+                       iter = ja_pigeon_node_get_ith_pos(old_type, old_node, i);
+                       if (!iter)
+                               continue;
+                       if (mode == JA_RECOMPACT_DEL && *nullify_node_flag_ptr == iter)
+                               continue;
+                       ret = _ja_node_set_nth(new_type, new_node,
+                                       new_shadow_node,
+                                       i, iter);
+                       if (new_type->type_class == RCU_JA_POOL && ret) {
+                               goto fallback_toosmall;
+                       }
+                       assert(!ret);
+               }
+               break;
+       }
        default:
                assert(0);
                ret = -EINVAL;
                goto end;
        }
+skip_copy:
 
-       /* add node */
-       ret = _ja_node_set_nth(new_type, new_node,
-                       new_shadow_node,
-                       n, child_node_flag);
-       assert(!ret);
-       /* Return pointer to new recompacted new through old_node_flag */
-       *old_node_flag = new_node_flag;
+       if (mode == JA_RECOMPACT_ADD) {
+               /* add node */
+               ret = _ja_node_set_nth(new_type, new_node,
+                               new_shadow_node,
+                               n, child_node_flag);
+               assert(!ret);
+       }
+       /* Return pointer to new recompacted node through old_node_flag_ptr */
+       *old_node_flag_ptr = new_node_flag;
        if (old_node) {
-               ret = rcuja_shadow_clear(ja->ht, old_node, shadow_node,
-                               RCUJA_SHADOW_CLEAR_FREE_NODE);
+               int flags;
+
+               flags = RCUJA_SHADOW_CLEAR_FREE_NODE;
+               /*
+                * It is OK to free the lock associated with a node
+                * going to NULL, since we are holding the parent lock.
+                * This synchronizes removal with re-add of that node.
+                */
+               if (new_type_index == NODE_INDEX_NULL)
+                       flags = RCUJA_SHADOW_CLEAR_FREE_LOCK;
+               ret = rcuja_shadow_clear(ja->ht, old_node_flag, shadow_node,
+                               flags);
                assert(!ret);
        }
 
@@ -704,11 +848,12 @@ end:
 
 fallback_toosmall:
        /* fallback if next pool is too small */
-       ret = rcuja_shadow_clear(ja->ht, new_node, new_shadow_node,
+       assert(new_shadow_node);
+       ret = rcuja_shadow_clear(ja->ht, new_node_flag, new_shadow_node,
                        RCUJA_SHADOW_CLEAR_FREE_NODE);
        assert(!ret);
 
-       /* Last type: pigeon */
+       /* Choose fallback type: pigeon */
        new_type_index = (1UL << JA_TYPE_BITS) - 1;
        dbg_printf("Fallback to type %d\n", new_type_index);
        uatomic_inc(&ja->nr_fallback);
@@ -717,7 +862,7 @@ fallback_toosmall:
 }
 
 /*
- * Return 0 on success, -ENOENT if need to retry, or other negative
+ * Return 0 on success, -EAGAIN if need to retry, or other negative
  * error value otherwise.
  */
 static
@@ -739,40 +884,83 @@ int ja_node_set_nth(struct cds_ja *ja,
        type = &ja_types[type_index];
        ret = _ja_node_set_nth(type, node, shadow_node,
                        n, child_node_flag);
-       if (ret == -ENOSPC) {
+       switch (ret) {
+       case -ENOSPC:
                /* Not enough space in node, need to recompact. */
-               ret = ja_node_recompact_add(ja, type_index, type, node,
-                               shadow_node, node_flag, n, child_node_flag);
+               ret = ja_node_recompact(JA_RECOMPACT_ADD, ja, type_index, type, node,
+                               shadow_node, node_flag, n, child_node_flag, NULL);
+               break;
+       case -ERANGE:
+               /* Node needs to be recompacted. */
+               ret = ja_node_recompact(JA_RECOMPACT, ja, type_index, type, node,
+                               shadow_node, node_flag, n, child_node_flag, NULL);
+               break;
        }
        return ret;
 }
 
-struct cds_hlist_head *cds_ja_lookup(struct cds_ja *ja, uint64_t key)
+/*
+ * Return 0 on success, -EAGAIN if need to retry, or other negative
+ * error value otherwise.
+ */
+static
+int ja_node_clear_ptr(struct cds_ja *ja,
+               struct cds_ja_inode_flag **node_flag_ptr,       /* Pointer to location to nullify */
+               struct cds_ja_inode_flag **parent_node_flag_ptr,        /* Address of parent ptr in its parent */
+               struct cds_ja_shadow_node *shadow_node,         /* of parent */
+               uint8_t n)
+{
+       int ret;
+       unsigned int type_index;
+       const struct cds_ja_type *type;
+       struct cds_ja_inode *node;
+
+       dbg_printf("ja_node_clear_ptr for node %p, shadow %p, target ptr %p\n",
+               ja_node_ptr(*parent_node_flag_ptr), shadow_node, node_flag_ptr);
+
+       node = ja_node_ptr(*parent_node_flag_ptr);
+       type_index = ja_node_type(*parent_node_flag_ptr);
+       type = &ja_types[type_index];
+       ret = _ja_node_clear_ptr(type, node, shadow_node, node_flag_ptr, n);
+       if (ret == -EFBIG) {
+               /* Should to try recompaction. */
+               ret = ja_node_recompact(JA_RECOMPACT_DEL, ja, type_index, type, node,
+                               shadow_node, parent_node_flag_ptr, n, NULL,
+                               node_flag_ptr);
+       }
+       return ret;
+}
+
+struct cds_hlist_head cds_ja_lookup(struct cds_ja *ja, uint64_t key)
 {
        unsigned int tree_depth, i;
        struct cds_ja_inode_flag *node_flag;
+       struct cds_hlist_head head = { NULL };
 
        if (caa_unlikely(key > ja->key_max))
-               return NULL;
+               return head;
        tree_depth = ja->tree_depth;
        node_flag = rcu_dereference(ja->root);
 
        /* level 0: root node */
        if (!ja_node_ptr(node_flag))
-               return NULL;
+               return head;
 
        for (i = 1; i < tree_depth; i++) {
                uint8_t iter_key;
 
                iter_key = (uint8_t) (key >> (JA_BITS_PER_BYTE * (tree_depth - i - 1)));
-               node_flag = ja_node_get_nth(node_flag, NULL,
+               node_flag = ja_node_get_nth(node_flag, NULL, NULL,
                        iter_key);
+               dbg_printf("cds_ja_lookup iter key lookup %u finds node_flag %p\n",
+                               (unsigned int) iter_key, node_flag);
                if (!ja_node_ptr(node_flag))
-                       return NULL;
+                       return head;
        }
 
        /* Last level lookup succeded. We got an actual match. */
-       return (struct cds_hlist_head *) node_flag;
+       head.next = (struct cds_hlist_node *) node_flag;
+       return head;
 }
 
 /*
@@ -789,6 +977,7 @@ struct cds_hlist_head *cds_ja_lookup(struct cds_ja *ja, uint64_t key)
  */
 static
 int ja_attach_node(struct cds_ja *ja,
+               struct cds_ja_inode_flag **attach_node_flag_ptr,
                struct cds_ja_inode_flag **node_flag_ptr,
                struct cds_ja_inode_flag *node_flag,
                struct cds_ja_inode_flag *parent_node_flag,
@@ -797,8 +986,7 @@ int ja_attach_node(struct cds_ja *ja,
                struct cds_ja_node *child_node)
 {
        struct cds_ja_shadow_node *shadow_node = NULL,
-                       *parent_shadow_node = NULL,
-                       *iter_shadow_node;
+                       *parent_shadow_node = NULL;
        struct cds_ja_inode *node = ja_node_ptr(node_flag);
        struct cds_ja_inode *parent_node = ja_node_ptr(parent_node_flag);
        struct cds_hlist_head head;
@@ -807,38 +995,39 @@ int ja_attach_node(struct cds_ja *ja,
        struct cds_ja_inode_flag *created_nodes[JA_MAX_DEPTH];
        int nr_created_nodes = 0;
 
-       dbg_printf("Attach node at level %u\n", level);
+       dbg_printf("Attach node at level %u (node %p, node_flag %p)\n",
+               level, node, node_flag);
 
        assert(node);
-       shadow_node = rcuja_shadow_lookup_lock(ja->ht, node);
+       shadow_node = rcuja_shadow_lookup_lock(ja->ht, node_flag);
        if (!shadow_node) {
-               ret = -ENOENT;
+               ret = -EAGAIN;
                goto end;
        }
        if (parent_node) {
                parent_shadow_node = rcuja_shadow_lookup_lock(ja->ht,
-                                               parent_node);
+                                               parent_node_flag);
                if (!parent_shadow_node) {
-                       ret = -ENOENT;
+                       ret = -EAGAIN;
                        goto unlock_shadow;
                }
        }
 
+       if (node_flag_ptr && ja_node_ptr(*node_flag_ptr) != NULL) {
+               /*
+                * Attach point is non-NULL: it has been updated between
+                * RCU lookup and lock acquisition. We need to re-try
+                * lookup and attach.
+                */
+               ret = -EAGAIN;
+               goto unlock_parent;
+       }
+
        /* Create new branch, starting from bottom */
        CDS_INIT_HLIST_HEAD(&head);
        cds_hlist_add_head_rcu(&child_node->list, &head);
        iter_node_flag = (struct cds_ja_inode_flag *) head.next;
 
-       /* Create shadow node for the leaf node */
-       dbg_printf("leaf shadow node creation\n");
-       iter_shadow_node = rcuja_shadow_set(ja->ht,
-                       ja_node_ptr(iter_node_flag), NULL);
-       if (!iter_shadow_node) {
-               ret = -ENOMEM;
-               goto check_error;
-       }
-       created_nodes[nr_created_nodes++] = iter_node_flag;
-
        for (i = ja->tree_depth; i > (int) level; i--) {
                uint8_t iter_key;
 
@@ -874,8 +1063,8 @@ int ja_attach_node(struct cds_ja *ja,
 
        /* Publish new branch */
        dbg_printf("Publish branch %p, replacing %p\n",
-               iter_node_flag, *node_flag_ptr);
-       rcu_assign_pointer(*node_flag_ptr, iter_node_flag);
+               iter_node_flag, *attach_node_flag_ptr);
+       rcu_assign_pointer(*attach_node_flag_ptr, iter_node_flag);
 
        /* Success */
        ret = 0;
@@ -890,12 +1079,13 @@ check_error:
                        if (i)
                                flags |= RCUJA_SHADOW_CLEAR_FREE_NODE;
                        tmpret = rcuja_shadow_clear(ja->ht,
-                                       ja_node_ptr(created_nodes[i]),
+                                       created_nodes[i],
                                        NULL,
                                        flags);
                        assert(!tmpret);
                }
        }
+unlock_parent:
        if (parent_shadow_node)
                rcuja_shadow_unlock(parent_shadow_node);
 unlock_shadow:
@@ -906,22 +1096,23 @@ end:
 }
 
 /*
- * Lock the hlist head shadow node mutex, and add node to list of
- * duplicates. Failure can happen if concurrent removal removes the last
- * node with same key before we get the lock.
+ * Lock the parent containing the hlist head pointer, and add node to list of
+ * duplicates. Failure can happen if concurrent update changes the
+ * parent before we get the lock. We return -EAGAIN in that case.
  * Return 0 on success, negative error value on failure.
  */
 static
 int ja_chain_node(struct cds_ja *ja,
+               struct cds_ja_inode_flag *parent_node_flag,
                struct cds_hlist_head *head,
                struct cds_ja_node *node)
 {
        struct cds_ja_shadow_node *shadow_node;
 
-       shadow_node = rcuja_shadow_lookup_lock(ja->ht,
-               (struct cds_ja_inode *) head);
-       if (!shadow_node)
-               return -ENOENT;
+       shadow_node = rcuja_shadow_lookup_lock(ja->ht, parent_node_flag);
+       if (!shadow_node) {
+               return -EAGAIN;
+       }
        cds_hlist_add_head_rcu(&node->list, head);
        rcuja_shadow_unlock(shadow_node);
        return 0;
@@ -931,14 +1122,16 @@ int cds_ja_add(struct cds_ja *ja, uint64_t key,
                struct cds_ja_node *new_node)
 {
        unsigned int tree_depth, i;
-       struct cds_ja_inode_flag **node_flag_ptr;       /* in parent */
+       struct cds_ja_inode_flag **attach_node_flag_ptr,
+               **node_flag_ptr;
        struct cds_ja_inode_flag *node_flag,
                *parent_node_flag,
                *parent2_node_flag;
        int ret;
 
-       if (caa_unlikely(key > ja->key_max))
+       if (caa_unlikely(key > ja->key_max)) {
                return -EINVAL;
+       }
        tree_depth = ja->tree_depth;
 
 retry:
@@ -947,18 +1140,22 @@ retry:
        parent2_node_flag = NULL;
        parent_node_flag =
                (struct cds_ja_inode_flag *) &ja->root; /* Use root ptr address as key for mutex */
+       attach_node_flag_ptr = &ja->root;
        node_flag_ptr = &ja->root;
-       node_flag = rcu_dereference(*node_flag_ptr);
+       node_flag = rcu_dereference(ja->root);
 
        /* Iterate on all internal levels */
        for (i = 1; i < tree_depth; i++) {
                uint8_t iter_key;
 
+               dbg_printf("cds_ja_add iter attach_node_flag_ptr %p node_flag_ptr %p node_flag %p\n",
+                               *attach_node_flag_ptr, *node_flag_ptr, node_flag);
                if (!ja_node_ptr(node_flag)) {
-                       ret = ja_attach_node(ja, node_flag_ptr,
+                       ret = ja_attach_node(ja, attach_node_flag_ptr,
+                                       node_flag_ptr,
                                        parent_node_flag, parent2_node_flag,
                                        key, i, new_node);
-                       if (ret == -ENOENT || ret == -EEXIST)
+                       if (ret == -EAGAIN || ret == -EEXIST)
                                goto retry;
                        else
                                goto end;
@@ -967,8 +1164,13 @@ retry:
                parent2_node_flag = parent_node_flag;
                parent_node_flag = node_flag;
                node_flag = ja_node_get_nth(node_flag,
+                       &attach_node_flag_ptr,
                        &node_flag_ptr,
                        iter_key);
+               dbg_printf("cds_ja_add iter key lookup %u finds node_flag %p attach_node_flag_ptr %p node_flag_ptr %p\n",
+                               (unsigned int) iter_key, node_flag,
+                               *attach_node_flag_ptr,
+                               *node_flag_ptr);
        }
 
        /*
@@ -976,19 +1178,291 @@ retry:
         * level, or chain it if key is already present.
         */
        if (!ja_node_ptr(node_flag)) {
-               ret = ja_attach_node(ja, node_flag_ptr, parent_node_flag,
+               dbg_printf("cds_ja_add last attach_node_flag_ptr %p node_flag_ptr %p node_flag %p\n",
+                               *attach_node_flag_ptr, *node_flag_ptr, node_flag);
+               ret = ja_attach_node(ja, attach_node_flag_ptr,
+                               node_flag_ptr, parent_node_flag,
                                parent2_node_flag, key, i, new_node);
        } else {
                ret = ja_chain_node(ja,
-                       (struct cds_hlist_head *) ja_node_ptr(node_flag),
+                       parent_node_flag,
+                       (struct cds_hlist_head *) attach_node_flag_ptr,
                        new_node);
        }
-       if (ret == -ENOENT)
+       if (ret == -EAGAIN || ret == -EEXIST)
                goto retry;
 end:
        return ret;
 }
 
+/*
+ * Note: there is no need to lookup the pointer address associated with
+ * each node's nth item after taking the lock: it's already been done by
+ * cds_ja_del while holding the rcu read-side lock, and our node rules
+ * ensure that when a match value -> pointer is found in a node, it is
+ * _NEVER_ changed for that node without recompaction, and recompaction
+ * reallocates the node.
+ * However, when a child is removed from "linear" nodes, its pointer
+ * is set to NULL. We therefore check, while holding the locks, if this
+ * pointer is NULL, and return -ENOENT to the caller if it is the case.
+ */
+static
+int ja_detach_node(struct cds_ja *ja,
+               struct cds_ja_inode_flag **snapshot,
+               struct cds_ja_inode_flag ***snapshot_ptr,
+               uint8_t *snapshot_n,
+               int nr_snapshot,
+               uint64_t key,
+               struct cds_ja_node *node)
+{
+       struct cds_ja_shadow_node *shadow_nodes[JA_MAX_DEPTH];
+       struct cds_ja_inode_flag **node_flag_ptr = NULL,
+                       *parent_node_flag = NULL,
+                       **parent_node_flag_ptr = NULL;
+       struct cds_ja_inode_flag *iter_node_flag;
+       int ret, i, nr_shadow = 0, nr_clear = 0, nr_branch = 0;
+       uint8_t n = 0;
+
+       assert(nr_snapshot == ja->tree_depth + 1);
+
+       /*
+        * From the last internal level node going up, get the node
+        * lock, check if the node has only one child left. If it is the
+        * case, we continue iterating upward. When we reach a node
+        * which has more that one child left, we lock the parent, and
+        * proceed to the node deletion (removing its children too).
+        */
+       for (i = nr_snapshot - 2; i >= 1; i--) {
+               struct cds_ja_shadow_node *shadow_node;
+
+               shadow_node = rcuja_shadow_lookup_lock(ja->ht,
+                                       snapshot[i]);
+               if (!shadow_node) {
+                       ret = -EAGAIN;
+                       goto end;
+               }
+               assert(shadow_node->nr_child > 0);
+               shadow_nodes[nr_shadow++] = shadow_node;
+               if (shadow_node->nr_child == 1)
+                       nr_clear++;
+               nr_branch++;
+               if (shadow_node->nr_child > 1 || i == 1) {
+                       /* Lock parent and break */
+                       shadow_node = rcuja_shadow_lookup_lock(ja->ht,
+                                       snapshot[i - 1]);
+                       if (!shadow_node) {
+                               ret = -EAGAIN;
+                               goto end;
+                       }
+                       shadow_nodes[nr_shadow++] = shadow_node;
+                       node_flag_ptr = snapshot_ptr[i + 1];
+                       n = snapshot_n[i + 1];
+                       parent_node_flag_ptr = snapshot_ptr[i];
+                       parent_node_flag = snapshot[i];
+                       if (i > 1) {
+                               /*
+                                * Lock parent's parent, in case we need
+                                * to recompact parent.
+                                */
+                               shadow_node = rcuja_shadow_lookup_lock(ja->ht,
+                                               snapshot[i - 2]);
+                               if (!shadow_node) {
+                                       ret = -EAGAIN;
+                                       goto end;
+                               }
+                               shadow_nodes[nr_shadow++] = shadow_node;
+                       }
+                       break;
+               }
+       }
+
+       /*
+        * Check if node has been removed between RCU lookup and lock
+        * acquisition.
+        */
+       assert(node_flag_ptr);
+       if (ja_node_ptr(*node_flag_ptr) == NULL) {
+               ret = -ENOENT;
+               goto end;
+       }
+
+       /*
+        * At this point, we want to delete all nodes that are about to
+        * be removed from shadow_nodes (except the last one, which is
+        * either the root or the parent of the upmost node with 1
+        * child). OK to as to free lock here, because RCU read lock is
+        * held, and free only performed in call_rcu.
+        */
+
+       for (i = 0; i < nr_clear; i++) {
+               ret = rcuja_shadow_clear(ja->ht,
+                               shadow_nodes[i]->node_flag,
+                               shadow_nodes[i],
+                               RCUJA_SHADOW_CLEAR_FREE_NODE
+                               | RCUJA_SHADOW_CLEAR_FREE_LOCK);
+               assert(!ret);
+       }
+
+       iter_node_flag = parent_node_flag;
+       /* Remove from parent */
+       ret = ja_node_clear_ptr(ja,
+               node_flag_ptr,          /* Pointer to location to nullify */
+               &iter_node_flag,        /* Old new parent ptr in its parent */
+               shadow_nodes[nr_branch - 1],    /* of parent */
+               n);
+       if (ret)
+               goto end;
+
+       dbg_printf("ja_detach_node: publish %p instead of %p\n",
+               iter_node_flag, *parent_node_flag_ptr);
+       /* Update address of parent ptr in its parent */
+       rcu_assign_pointer(*parent_node_flag_ptr, iter_node_flag);
+
+end:
+       for (i = 0; i < nr_shadow; i++)
+               rcuja_shadow_unlock(shadow_nodes[i]);
+       return ret;
+}
+
+static
+int ja_unchain_node(struct cds_ja *ja,
+               struct cds_ja_inode_flag *parent_node_flag,
+               struct cds_hlist_head *head,
+               struct cds_ja_node *node)
+{
+       struct cds_ja_shadow_node *shadow_node;
+       struct cds_hlist_node *hlist_node;
+       int ret = 0, count = 0;
+
+       shadow_node = rcuja_shadow_lookup_lock(ja->ht, parent_node_flag);
+       if (!shadow_node)
+               return -EAGAIN;
+       /*
+        * Retry if another thread removed all but one of duplicates
+        * since check (that was performed without lock).
+        */
+       cds_hlist_for_each_rcu(hlist_node, head, list) {
+               count++;
+       }
+
+       if (count == 1) {
+               ret = -EAGAIN;
+               goto end;
+       }
+       cds_hlist_del_rcu(&node->list);
+end:
+       rcuja_shadow_unlock(shadow_node);
+       return ret;
+}
+
+/*
+ * Called with RCU read lock held.
+ */
+int cds_ja_del(struct cds_ja *ja, uint64_t key,
+               struct cds_ja_node *node)
+{
+       unsigned int tree_depth, i;
+       struct cds_ja_inode_flag *snapshot[JA_MAX_DEPTH];
+       struct cds_ja_inode_flag **snapshot_ptr[JA_MAX_DEPTH];
+       uint8_t snapshot_n[JA_MAX_DEPTH];
+       struct cds_ja_inode_flag *node_flag;
+       struct cds_ja_inode_flag **prev_node_flag_ptr;
+       int nr_snapshot;
+       int ret;
+
+       if (caa_unlikely(key > ja->key_max))
+               return -EINVAL;
+       tree_depth = ja->tree_depth;
+
+retry:
+       nr_snapshot = 0;
+       dbg_printf("cds_ja_del attempt: key %" PRIu64 ", node %p\n",
+               key, node);
+
+       /* snapshot for level 0 is only for shadow node lookup */
+       snapshot_n[0] = 0;
+       snapshot_n[1] = 0;
+       snapshot_ptr[nr_snapshot] = NULL;
+       snapshot[nr_snapshot++] = (struct cds_ja_inode_flag *) &ja->root;
+       node_flag = rcu_dereference(ja->root);
+       prev_node_flag_ptr = &ja->root;
+
+       /* Iterate on all internal levels */
+       for (i = 1; i < tree_depth; i++) {
+               uint8_t iter_key;
+
+               dbg_printf("cds_ja_del iter node_flag %p\n",
+                               node_flag);
+               if (!ja_node_ptr(node_flag)) {
+                       return -ENOENT;
+               }
+               iter_key = (uint8_t) (key >> (JA_BITS_PER_BYTE * (tree_depth - i - 1)));
+               snapshot_n[nr_snapshot + 1] = iter_key;
+               snapshot_ptr[nr_snapshot] = prev_node_flag_ptr;
+               snapshot[nr_snapshot++] = node_flag;
+               node_flag = ja_node_get_nth(node_flag,
+                       &prev_node_flag_ptr,
+                       NULL,
+                       iter_key);
+               dbg_printf("cds_ja_del iter key lookup %u finds node_flag %p, prev_node_flag_ptr %p\n",
+                               (unsigned int) iter_key, node_flag,
+                               prev_node_flag_ptr);
+       }
+
+       /*
+        * We reached bottom of tree, try to find the node we are trying
+        * to remove. Fail if we cannot find it.
+        */
+       if (!ja_node_ptr(node_flag)) {
+               dbg_printf("cds_ja_del: no node found for key %" PRIu64 "\n",
+                               key);
+               return -ENOENT;
+       } else {
+               struct cds_hlist_head hlist_head;
+               struct cds_hlist_node *hlist_node;
+               struct cds_ja_node *entry, *match = NULL;
+               int count = 0;
+
+               hlist_head.next =
+                       (struct cds_hlist_node *) ja_node_ptr(node_flag);
+               cds_hlist_for_each_entry_rcu(entry,
+                               hlist_node,
+                               &hlist_head,
+                               list) {
+                       dbg_printf("cds_ja_del: compare %p with entry %p\n", node, entry);
+                       if (entry == node)
+                               match = entry;
+                       count++;
+               }
+               if (!match) {
+                       dbg_printf("cds_ja_del: no node match for node %p key %" PRIu64 "\n", node, key);
+                       return -ENOENT;
+               }
+               assert(count > 0);
+               if (count == 1) {
+                       /*
+                        * Removing last of duplicates. Last snapshot
+                        * does not have a shadow node (external leafs).
+                        */
+                       snapshot_ptr[nr_snapshot] = prev_node_flag_ptr;
+                       snapshot[nr_snapshot++] = node_flag;
+                       ret = ja_detach_node(ja, snapshot, snapshot_ptr,
+                                       snapshot_n, nr_snapshot, key, node);
+               } else {
+                       ret = ja_unchain_node(ja, snapshot[nr_snapshot - 1],
+                               &hlist_head, match);
+               }
+       }
+       /*
+        * Explanation of -ENOENT handling: caused by concurrent delete
+        * between RCU lookup and actual removal. Need to re-do the
+        * lookup and removal attempt.
+        */
+       if (ret == -EAGAIN || ret == -ENOENT)
+               goto retry;
+       return ret;
+}
+
 struct cds_ja *_cds_ja_new(unsigned int key_bits,
                const struct rcu_flavor_struct *flavor)
 {
@@ -1002,13 +1476,13 @@ struct cds_ja *_cds_ja_new(unsigned int key_bits,
 
        switch (key_bits) {
        case 8:
-               ja->key_max = UINT8_MAX;
-               break;
        case 16:
-               ja->key_max = UINT16_MAX;
-               break;
+       case 24:
        case 32:
-               ja->key_max = UINT32_MAX;
+       case 40:
+       case 48:
+       case 56:
+               ja->key_max = (1ULL << key_bits) - 1;
                break;
        case 64:
                ja->key_max = UINT64_MAX;
@@ -1019,7 +1493,7 @@ struct cds_ja *_cds_ja_new(unsigned int key_bits,
 
        /* ja->root is NULL */
        /* tree_depth 0 is for pointer to root node */
-       ja->tree_depth = (key_bits >> JA_BITS_PER_BYTE) + 1;
+       ja->tree_depth = (key_bits >> JA_LOG2_BITS_PER_BYTE) + 1;
        assert(ja->tree_depth <= JA_MAX_DEPTH);
        ja->ht = rcuja_create_ht(flavor);
        if (!ja->ht)
@@ -1029,12 +1503,13 @@ struct cds_ja *_cds_ja_new(unsigned int key_bits,
         * Note: we should not free this node until judy array destroy.
         */
        root_shadow_node = rcuja_shadow_set(ja->ht,
-                       ja_node_ptr((struct cds_ja_inode_flag *) &ja->root),
-                       NULL);
+                       (struct cds_ja_inode_flag *) &ja->root,
+                       NULL, ja);
        if (!root_shadow_node) {
                ret = -ENOMEM;
                goto ht_node_error;
        }
+       root_shadow_node->level = 0;
 
        return ja;
 
@@ -1048,16 +1523,119 @@ ja_error:
        return NULL;
 }
 
+/*
+ * Called from RCU read-side CS.
+ */
+__attribute__((visibility("protected")))
+void rcuja_free_all_children(struct cds_ja_shadow_node *shadow_node,
+               struct cds_ja_inode_flag *node_flag,
+               void (*free_node_cb)(struct rcu_head *head))
+{
+       const struct rcu_flavor_struct *flavor;
+       unsigned int type_index;
+       struct cds_ja_inode *node;
+       const struct cds_ja_type *type;
+
+       flavor = cds_lfht_rcu_flavor(shadow_node->ja->ht);
+       node = ja_node_ptr(node_flag);
+       assert(node != NULL);
+       type_index = ja_node_type(node_flag);
+       type = &ja_types[type_index];
+
+       switch (type->type_class) {
+       case RCU_JA_LINEAR:
+       {
+               uint8_t nr_child =
+                       ja_linear_node_get_nr_child(type, node);
+               unsigned int i;
+
+               for (i = 0; i < nr_child; i++) {
+                       struct cds_ja_inode_flag *iter;
+                       struct cds_hlist_head head;
+                       struct cds_ja_node *entry;
+                       struct cds_hlist_node *pos;
+                       uint8_t v;
+
+                       ja_linear_node_get_ith_pos(type, node, i, &v, &iter);
+                       if (!iter)
+                               continue;
+                       head.next = (struct cds_hlist_node *) iter;
+                       cds_hlist_for_each_entry_rcu(entry, pos, &head, list) {
+                               flavor->update_call_rcu(&entry->head, free_node_cb);
+                       }
+               }
+               break;
+       }
+       case RCU_JA_POOL:
+       {
+               unsigned int pool_nr;
+
+               for (pool_nr = 0; pool_nr < (1U << type->nr_pool_order); pool_nr++) {
+                       struct cds_ja_inode *pool =
+                               ja_pool_node_get_ith_pool(type, node, pool_nr);
+                       uint8_t nr_child =
+                               ja_linear_node_get_nr_child(type, pool);
+                       unsigned int j;
+
+                       for (j = 0; j < nr_child; j++) {
+                               struct cds_ja_inode_flag *iter;
+                               struct cds_hlist_head head;
+                               struct cds_ja_node *entry;
+                               struct cds_hlist_node *pos;
+                               uint8_t v;
+
+                               ja_linear_node_get_ith_pos(type, node, j, &v, &iter);
+                               if (!iter)
+                                       continue;
+                               head.next = (struct cds_hlist_node *) iter;
+                               cds_hlist_for_each_entry_rcu(entry, pos, &head, list) {
+                                       flavor->update_call_rcu(&entry->head, free_node_cb);
+                               }
+                       }
+               }
+               break;
+       }
+       case RCU_JA_NULL:
+               break;
+       case RCU_JA_PIGEON:
+       {
+               uint8_t nr_child;
+               unsigned int i;
+
+               nr_child = shadow_node->nr_child;
+               for (i = 0; i < nr_child; i++) {
+                       struct cds_ja_inode_flag *iter;
+                       struct cds_hlist_head head;
+                       struct cds_ja_node *entry;
+                       struct cds_hlist_node *pos;
+
+                       iter = ja_pigeon_node_get_ith_pos(type, node, i);
+                       if (!iter)
+                               continue;
+                       head.next = (struct cds_hlist_node *) iter;
+                       cds_hlist_for_each_entry_rcu(entry, pos, &head, list) {
+                               flavor->update_call_rcu(&entry->head, free_node_cb);
+                       }
+               }
+               break;
+       }
+       default:
+               assert(0);
+       }
+}
+
 /*
  * There should be no more concurrent add to the judy array while it is
  * being destroyed (ensured by the caller).
  */
-int cds_ja_destroy(struct cds_ja *ja)
+int cds_ja_destroy(struct cds_ja *ja,
+               void (*free_node_cb)(struct rcu_head *head))
 {
        int ret;
 
        rcuja_shadow_prune(ja->ht,
-               RCUJA_SHADOW_CLEAR_FREE_NODE | RCUJA_SHADOW_CLEAR_FREE_LOCK);
+               RCUJA_SHADOW_CLEAR_FREE_NODE | RCUJA_SHADOW_CLEAR_FREE_LOCK,
+               free_node_cb);
        ret = rcuja_delete_ht(ja->ht);
        if (ret)
                return ret;
This page took 0.039612 seconds and 4 git commands to generate.