BPlusTree.h

// BPlusTree.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2008

@import "cxPersistStore.h"
@import "IPersistable.h"
@import "IPersistStore.h"
@import "pref.h"
@import "IPrefVisitor.h"
#include "Ceda/cxUtils/CedaAssert.h"
#include "Ceda/cxUtils/Tracer.h"
#include "Ceda/cxUtils/MakeIterator.h"
#include <utility>
#include <iterator>
#include <string.h>

/*
B+Tree
------

A B+Tree is a persistent map from Key to Payload that can grow very large, and minimises the number
of disk reads during a search by employing a very large fan-out.

                      B+Tree
                    /   |     \          
                   /    |      \
                  /    Node     \
                 /    / | \      \
                /          ...    \
               /            Node   \
              /            / | \    \
             /              ...      \ 
            /                         \
          Page  -->-- Page  ...  -->-- Page
                --<--           --<--


There are two types of node :  Leaf or internal nodes.  A node never needs to change its type.
Unlike many implementations we don't assume that leaf and internal nodes are "structually compatible".  
This allows leaf nodes to directly store the payload irrespective of its size.  It also allows for 
internal nodes and leaf nodes to have a different fanout.  A downside is that the binary code
is larger.

We choose to not store parent pointers in the nodes.  Note that recursive algorithms can pass the 
parent pointer if required.

Typically the recursive functions pass the height of the current node.  This decrements in each
recursive call from parent to child.  When the height reaches zero it can be assumed that a leaf
node has been reached.   Therefore there is no need for a leaf/internal nodes to store some common 
inherited state in order to distinguish internal and leaf nodes.

Apart from the root, the utilisation of a node varies between 50% and 100%.  We choose to always 
allocate full sized arrays in memory to avoid the CPU overheads of repeated memory allocations as 
the number of keys in a node changes.

Weak pointers
-------------

Let's assume that the tree structure (i.e. strong pointers) is dictated by the internal nodes.  Then
the weak pointers are

    1.  ptrs to the first and last page in the B+Tree.  These are "downwards" weak pointers
    2.  ptrs to the prev and next page in each page.  These are "sideways" weak pointers.

Persistence
-----------

Internal nodes and leaf nodes require a unique class name in order to support dynamic creation when 
they are loaded from disk.  We use the macro preprocessor for building unique names.

Relationship to PSpaces
-----------------------

A B+Tree is assumed to reside in at most one PSpace, and therefore it is protected my a single 
shared read / exclusive write mutex.

Therefore (for example) the search down through the B+Tree can ignore worrying about gaining locks
during the downwards recurse.

Concurrency
-----------

Ceda assumes that high concurrency can be obtained by using small, fined-grained mutative 
transactions that only need to briefly hold an exclusive write lock on the entire PSpace.
The time to gain an exclusive write lock is only about 1us.  Non algorithmic mutative 
transactions can be performed in only a few microseconds if there is no need to block on
I/O.  Write caching solves part of the problem - an object can be marked as dirty but not
written to disk as part of the transaction, because we relax the durability requirement 
(cf ACID requirements for a DBMS).

Consider a BTree with a billion records, and we have a number of "writer" threads that 
independently want to add thousands of entries to the BTree as quickly as possible.  A naive
approach won't achieve a good transaction throughput:  the problem is that in practise, when a 
writer thread has gained an exclusive lock it will often block on I/O in order to read the leaf
node from disk (because it is not currently resident in memory).  Note that we can't depend on keys
being clustered.

Our solution is for the BTree to employ delta-insertion (dI) and delta-deletion (dD) maps.
Each is implemented with an STL map (and hence employs a red-black tree).  In practise these
delta maps will accumulate a few thousand entries.  The idea is to allow an insert or delete
on a B-Tree to be post-poned,  by merely adding an entry to either dI or dD.  Note that dI
and dD persist on disk and tend to remain resident in memory.   The look up function on a
BTree needs to check the dI and dD sets before attempting a recursive descent through the
actual BTree itself.

It is assumed that a separate worker thread is reponsible for processing the dI and dD sets
so they don't become too big.  This is done so as to avoid blocking on I/O when an exclusive
write lock is held.

Let's consider the asynchronous processing of dI.  Note firstly that an STL map implicitly sorts
the entries by keys.  This is rather convenient because it means that as we iterate through the map
and cause BTree nodes to be loaded from disk we will tend to find the required nodes have already
been loaded.  We may even be fortunate enough to process a batch of keys that all need to be
inserted into the same leaf node.

Consider that the worker thread selects a "batch" of keys from dI that it intends to process.
This corresponds to a range [k1,k2].  Then it opens a shared read lock and loads the relevant BTree
nodes into memory.  This is achieved by recursing through the BTree as if there is an intention
to read all the leaf pages that overlap [k1,k2].  During this recurse, the nodes are touched
and therefore shouldn't be evicted in the near future.

Next the worker thread gains an exclusive write lock and actually inserts all the entries in the
batch.  Assuming that the nodes previously touched during the shared read lock, it will be possible
to process the batch without blocking on I/O.

Assumptions on the keys
-----------------------

The type 'Key' must support all the following

*   A default constructor
*   A copy constructor
*   An assignment operator
*   A global function that can compare two keys and returns -1,0,1 in a similar manner
    to strcmp().
*   operator<< to an Archive
*   operator>> from an Archive
*   operator<< to an xostream

Also, it is assumed that keys don't contain any pointers to IObjects that need to be
visited, and in particular there are no prefs that need to be visited.

Assumptions on the payloads
---------------------------

*   A default constructor
*   A copy constructor
*   An assignment operator
*   operator<< to an Archive
*   operator>> from an Archive
*   operator<< to an xostream

If payloads may contain prefs then _bPayLoadMayHavePrefs should be true.  In that case
operator<< to IObjectVisitor and operator<< to IPrefVisitor must be defined.

Search that inserts entry if not found
--------------------------------------

We require a function that combines insertion and search.

    _Payload* Find(const _Key& key)
    
This creates a new entry if one doesn't exist already

Multimaps and multisets
-----------------------

The macros takes a boolean flag for whether multiple entries with the same key are permitted.
This affects both leaf nodes and internal nodes.

    Search
    ------

    The search for a given key now finds a range of payload entries.  This leads to the concept of
    lower and upper bounds for the given key.

    For performance we want to return both bounds in the same call (rather than traverse
    the entire tree twice).  In fact, it seems best to directly model the concept of a range.

    Currently rFind(key) returns a pointer to the _Payload or nullptr if not found.  We instead want
    to return a pair of iterators.

    Alternative approach:  Consider instead that we only return the iterator correspondng to the
    lower bound, and we assume we subsequently iterate until we discover that the key has changed
    (which may be immediately!)

    A client writes code like thus:

        BTree::iterator i = btree.rFind(k);
        while(i != btree.end() && i->key == k)
        {
            < process i >
            ++i;
        }
    
    Insertion
    ---------
    
    The splitting of nodes when they overflow is based on the actual number of elements in the 
    node, and we don't want that to change.  So a tree remains balanced even though there are many 
    repeated keys - even in internal nodes.
    
    We favour insertion on the right of elements with the same key, so that we iterate through them
    in the order they were inserted.
    
    Deletion
    --------
    
    In the case of a multimap<string,OID> we want to delete the entry with given OID!  This means 
    we need to scan through the range of elements with the given key to find the entry with the
    given OID!

Memory leak issue
-----------------

The following code leaks 24 bytes on x86, 48 bytes on x64:

        typedef std::map<int,int> MAP;
        MAP m;
        new (&m) MAP();

It seems that the std::map ctor performs a memory allocation.

As a result of this, our move semantics must be accounted for consistently, and this 
for example includes the array of (key,payload) pairs in each leaf node.  We must assume
that only the first nk entries have actually been constructed.  So only these shall be
destructed.

-------
todo

*   Allow for deletion given an iterator
*   Allow for deletion of a range using a pair of iterators.
*   Allow for batched insertion and deletion
*   Allow for caching at each node until a batch is there to be done
*   Allow for inserts/deletes while the B+Tree is still transient (i.e. making it persistent
    can be delayed).
    
*   Allow for structured keys that reuse common prefixes.
*/

namespace ceda
{

template <typename T>
void DestructArrayElements(T* p, ssize_t n)
{
    for (ceda::ssize_t i=0 ; i < n ; ++i)
    {
        p->~T();
        ++p;
    }
}

template <typename T>
void ConstructArrayElements(T* p, ssize_t n)
{
    for (ceda::ssize_t i=0 ; i < n ; ++i)
    {
        new(p) T();
        ++p;
    }
}

template <typename T>
void ResizeArrayElements(T* p, ssize_t oldNum, ssize_t newNum)
{
    if (newNum > oldNum)
    {
        // Default construct the extra entrys required
        ConstructArrayElements(p + oldNum, newNum - oldNum);
    }
    else
    {
        // Destruct the surplus entrys no longer required
        DestructArrayElements(p + newNum, oldNum - newNum);
    }
}

/*
A macro used to generate a complete B+Tree implementation suitable for persistence

Arguments

    _bDefine,              Generate class definitions?
    
    _bImplement,           Generate class implementations?
    
    _bExport               Use __declspec(dllexport) to export the class definition from the DLL
    
    _BPlusTree,            Name of the B+Tree class to be defined. Must be a valid C++ identifier
    
    _Key,                  Type for the keys
    
    _Payload,              Type for the payload
                           Use 'void' to indicate there is no payload
    
    _CompareKeysFn,        The name of a global function that compares two keys 
                           and returns -1,0,1 in the manner of strcmp()
                           
    _minInternalKeys,      Number of keys in internal nodes will be between 
                          _minInternalKeys and 2*_minInternalKeys 
                           Must be at least 1
                           
    _minLeafKeys,          Number of keys in leaf nodes will be between 
                           _minLeafKeys and 2*_minLeafKeys 
                           Must be at least 1
                           
    _bPersist              Support persistence
    
    _bKeyMayHavePrefs,      Can _Key instances contain pref members?
    
    _bDeclareKeysReachable, Should prefs within a _Key be declared reachable when the key is 
                           inserted?
                           
    _bPayLoadMayHavePrefs,  Can _Payload instances contain pref members?
    
    _bDeclarePayloadsReachable, Should prefs within a _Payload be declared reachable when the 
                           _Payload is inserted?
    
    _bWriteToxostream       Support writing to an xostream
*/

@def mImplementBplusTree(
    bool _bDefine,bool _bImplement, bool _bExport,
    _BPlusTree,_Key,_Payload,_CompareKeysFn,
    int _minInternalKeys,int _minLeafKeys,
    bool _bPersist,
    bool _bKeyMayHavePrefs,bool _bDeclareKeysReachable,
    bool _bPayLoadMayHavePrefs,bool _bDeclarePayloadsReachable,
    bool _bPayloadsOrdered,
    bool _bWriteToxostream) =
{
    // Is tracing enabled (for debugging purposes)?
    @def bool bTraceDelete = false
    
    @def bool bHavePayload = (@str(_Payload) != "void")
    
    @def Node = void
    @def InternalKeyEntry = _BPlusTree@@@quote(InternalKeyEntry)
    @def InternalNode = _BPlusTree@@@quote(InternalNode)
    @def LeafKeyEntry = _BPlusTree@@@quote(LeafKeyEntry)
    @def LeafNode = _BPlusTree@@@quote(LeafNode)

    // Due to limitations of the current implementation of the delete code, we cannot support
    // the case where _minInternalKeys = 1 or _minLeafKeys = 1.
    // The problem is that this case allows for the number of key entries in a node to transition
    // to zero, and yet the code in various places assumes there is a 0th entry.
    // MergeInternalRightToLeft and BalanceInternalLeftToRight require 
    // right->m_numKeyEntrys > 0, and RecursiveDelete has a statement that initialises
    // oldkey from leaf->GetEntry(0).first after the number of keys has dropped to zero.
    // This limitation became apparent after being careful to destruct elements that were no longer
    // in use.
    // IDEA: This limitation will be lifted if we avoid arrays of elements that ineviatbly destruct
    // all elements.  Would be better to only destruct the first m_numKeyEntrys entries.
    @assert(_minInternalKeys > 1)
    @assert(_minLeafKeys > 1)

    // Let k be the number of keys stored in a given internal node that is not the root
    // Then we require MIN_NUM_INTERNAL_KEY_ENTRYS <= k <= MAX_NUM_INTERNAL_KEY_ENTRYS
    //
    // Let c be the number of child nodes under a given internal node that is not the root
    // Then c = k+1 
    // and we require MIN_NUM_INTERNAL_KEY_ENTRYS+1 <= c <= MAX_NUM_INTERNAL_KEY_ENTRYS+1
    //
    // For an internal node that is the root, we require
    //      1 <= k <= MAX_NUM_INTERNAL_KEY_ENTRYS
    //      2 <= c <= MAX_NUM_INTERNAL_KEY_ENTRYS+1
    @assert(_minInternalKeys > 0)
    @def int MIN_NUM_INTERNAL_KEY_ENTRYS = _minInternalKeys
    @def int MAX_NUM_INTERNAL_KEY_ENTRYS = _minInternalKeys*2

    // Let k be the number of (key,payload) pairs stored in a given leaf node that is not the root
    // Then we require MIN_NUM_LEAF_KEY_ENTRYS <= k <= MAX_NUM_LEAF_KEY_ENTRYS
    @assert(_minLeafKeys > 0)
    @def int MIN_NUM_LEAF_KEY_ENTRYS = _minLeafKeys
    @def int MAX_NUM_LEAF_KEY_ENTRYS = _minLeafKeys*2
    
    @def mAlloc(X) = @if (_bPersist) {$new X} @else {new X}
    @def mPointer(X) = @if (_bPersist) {ceda::cref<X>} @else {X*}
    @def mGetPtr(X) = @if (_bPersist) {X.Get()} @else {X}
    @def mClassDec(X) = @if (_bPersist) {$class @if(_bExport){+} X isa ceda::IPersistable} @else {class X}
    
    @def _mValueType = @if (bHavePayload) {std::pair<const _Key, _Payload >} @else {_Key} 
    
    @if (_bDefine)
    {
        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // InternalNode

        struct InternalKeyEntry
        {
            @if (!_bPersist)
            {
                InternalKeyEntry() : m_ptr(nullptr) {}
            }
            
            void swap(InternalKeyEntry& rhs)
            {
                ceda::memswap(this, &rhs, sizeof(InternalKeyEntry));
            }
        
            _Key m_key;                    
            mPointer(Node) m_ptr;    // Subtree with keys >= m_key
        };

        mClassDec(InternalNode)
        {
        public:
            InternalNode() : @if (!_bPersist) {m_ptr(nullptr),} m_numKeyEntrys(0) {}
            ~InternalNode();
            
            @if (_bPersist)
            {
                // Implementation of IObject
                void VisitObjects(ceda::IObjectVisitor& v) const;

                // Implementation of IPersistable
                void Serialise(ceda::Archive& ar) const;
                void Deserialise(ceda::InputArchive& ar);
                void VisitPrefs(ceda::IPrefVisitor& v) const;
            }

            InternalKeyEntry& GetEntry(ceda::ssize_t i) { return *((InternalKeyEntry*) m_entrys + i); }
            const InternalKeyEntry& GetEntry(ceda::ssize_t i) const { return *((const InternalKeyEntry*) m_entrys + i); }
            
            // Find the index of the largest key less than or equal to the given key, or else -1
            //
            //    K(0)  K(1) ...  K(i-1)  K(i)    K(i+1)   ....    K(n-1)
            //                                 ^
            //                                 |
            //                                key
            // Find the child node according to the key.  This is infallible because an internal node must 
            // contain at least one key entry (and two child nodes), and the partitioning based on key is
            // well defined.
            // Returns the node and its index position.  -1 means m_ptr was returned.
            std::pair<Node*,ceda::ssize_t> FindChildGivenKey2(const _Key& key);

            // Note that factoring out the code into FindChildGivenKey2() actually causes the search 
            // function to be about 10% slower for int-->int map with no I/O.
            Node* FindChildGivenKey(const _Key& key) { return FindChildGivenKey2(key).first; }

            ceda::ssize_t FindChildIndexGivenKey(const _Key& key) { return FindChildGivenKey2(key).second; }

            ///////////////////// State //////////////////////
            
            mPointer(Node) m_ptr;    // Subtree with keys < GetEntry(0).m_key
            ceda::ssize_t m_numKeyEntrys;
            ceda::octet_t m_entrys[MAX_NUM_INTERNAL_KEY_ENTRYS * sizeof(InternalKeyEntry)];
            //InternalKeyEntry m_entrys[MAX_NUM_INTERNAL_KEY_ENTRYS];
        };

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // LeafNode

        @if (bHavePayload)
        {
            typedef std::pair<_Key, _Payload > LeafKeyEntry;
        }
        @else
        {
            struct LeafKeyEntry
            {
                LeafKeyEntry() {}
                LeafKeyEntry(const _Key& key) : first(key) {}
                
                bool operator==(const LeafKeyEntry& rhs) const { return first == rhs.first; }
                bool operator!=(const LeafKeyEntry& rhs) const { return !operator==(rhs); }
                                        
                _Key first;
            };
        }
        
        mClassDec(LeafNode)
        {
        public:
            LeafNode() : m_numKeyEntrys(0) 
            {
                @if (!_bPersist)
                {
                    m_prev = nullptr;
                    m_next = nullptr;    
                }
            }
            ~LeafNode();

            @if (_bPersist)
            {
                // Implementation of IObject
                void VisitObjects(ceda::IObjectVisitor& v) const;

                // Implementation of IPersistable
                void Serialise(ceda::Archive& ar) const;
                void Deserialise(ceda::InputArchive& ar);
                void VisitPrefs(ceda::IPrefVisitor& v) const;
            }
            
            LeafKeyEntry& GetEntry(ceda::ssize_t i) { return *((LeafKeyEntry*) m_entrys + i); }
            const LeafKeyEntry& GetEntry(ceda::ssize_t i) const { return *((const LeafKeyEntry*) m_entrys + i); }
            
            ////////////////// State //////////////////////
            
            // Leaf nodes form a double linked list to allow forwards and reverse iteration in sorted order
            mPointer(LeafNode) m_prev;
            mPointer(LeafNode) m_next;
            
            ceda::ssize_t m_numKeyEntrys;
            
            ceda::octet_t m_entrys[MAX_NUM_LEAF_KEY_ENTRYS * sizeof(LeafKeyEntry)];
            //LeafKeyEntry m_entrys[MAX_NUM_LEAF_KEY_ENTRYS];
        };

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // _BPlusTree

        mClassDec(_BPlusTree)
        {
        private:
            // Define both iterators using a given policy
            class @if(_bExport){@api} IteratorPolicy
            {
            public:
                IteratorPolicy() : m_leaf(nullptr), m_offset(0) {}
                IteratorPolicy(LeafNode* leaf, ceda::ssize_t offset) : 
                    m_leaf(leaf), 
                    m_offset(offset) {}
            
                typedef std::bidirectional_iterator_tag iterator_category;
                typedef _mValueType value_type;
                typedef std::ptrdiff_t difference_type;      // just to keep compiler happy
                typedef value_type* pointer;
                typedef const value_type* const_pointer;
                typedef value_type& reference;
                typedef const value_type& const_reference;

                // Returns true if the iterator is pointing at an actual element
                // (i.e. not at one before first or one after the last)
                bool can_dereference() const
                {
                    return m_leaf != nullptr && 0 <= m_offset && m_offset < m_leaf->m_numKeyEntrys;
                }
                
                bool operator==(const IteratorPolicy& rhs) const
                {
                    return m_offset == rhs.m_offset && m_leaf == rhs.m_leaf; 
                }
                void next();
                void prev();
                
                @if (bHavePayload)
                {
                    value_type& deref() const;
                }
                const value_type& const_deref() const;

            private:
                /*
                Rules for how m_leaf, m_offset are defined:    

                    if (iterator points at an actual element of the B+Tree)
                       [i.e. it is valid to call operator*()]
                    {    
                        m_leaf = leaf node containing the element
                        m_offset = index of the element in m_leaf
                        
                        Note that 0 <= m_offset < m_leaf->m_numKeyEntrys
                    }    
                    else if (iterator corresponds to one before the first element    
                             of the B+Tree (i.e. rend()))
                    {        
                        if (B+Tree is empty)
                        {
                            m_leaf = nullptr
                            m_offset = -1
                        }
                        else
                        {
                            m_leaf = first leaf node
                            m_offset = -1
                        }
                    {
                    else (iterator corresponds to one after the last element
                          of the B+Tree (i.e. end()))
                    {      
                        if (B+Tree is empty)
                        {
                            m_leaf = nullptr
                            m_offset = 0
                        }
                        else
                        {
                            m_leaf = last leaf node
                            m_offset = m_leaf->m_numKeyEntrys
                        }
                    }
                        
                This approach allows for the bidirectional iterator to represent
                and distinguish between "one before first" and "one after last". 
                Furthermore if the B+Tree is non-empty then:
                
                    a) an iterator at "one before first" can be incremented so it 
                       points at first; and 
                       
                    b) an iterator at "one after last" can be decremented so it 
                       points at last.
                */
                LeafNode* m_leaf; // Is nullptr if and only if the B+Tree is empty
                ceda::ssize_t m_offset;

                friend class _BPlusTree;
            };
        
        public:
            _BPlusTree();
            _BPlusTree(const _BPlusTree& rhs);
            _BPlusTree& operator=(const _BPlusTree& rhs);
            @if (!_bPersist)
            {
                ~_BPlusTree();
            }
            
            typedef _Key key_type;
            @if (bHavePayload) {typedef _Payload mapped_type;}
            typedef _mValueType value_type;
            typedef size_t size_type;
            typedef std::ptrdiff_t difference_type;
            typedef value_type* pointer;
            typedef const value_type* const_pointer;
            typedef value_type& reference;
            typedef const value_type& const_reference;

            bool operator==(const _BPlusTree& rhs) const;
            bool operator!=(const _BPlusTree& rhs) const { return !operator==(rhs); }

            @if (!bHavePayload || _bPayloadsOrdered)
            {
                bool operator<(const _BPlusTree& rhs) const;
                bool operator<=(const _BPlusTree& rhs) const { return !rhs.operator<(*this); }
                bool operator>(const _BPlusTree& rhs) const { return rhs.operator<(*this); }
                bool operator>=(const _BPlusTree& rhs) const { return !operator<(rhs); }
            }
                        
            ceda::ssize_t size() const { return m_size; }
            bool empty() const { return m_size == 0; }

            ///////////////////////////////////////////////////////////////////////////////////////
            // iterators
            
            @if (bHavePayload)
            {
                typedef ceda::make_iterator<IteratorPolicy> iterator;
            }
            @else
            {
                // It makes no sense to support mutative iterators on a set
                typedef ceda::make_const_iterator<IteratorPolicy> iterator;
            }
            iterator begin()
            {
                return iterator(IteratorPolicy(mGetPtr(m_firstLeaf),0));
            }
            iterator end()
            {
                if (m_height < 0)
                {
                    return iterator(IteratorPolicy(nullptr,0));
                }
                else
                {
                    cxAssert(m_lastLeaf);
                    return iterator(IteratorPolicy(mGetPtr(m_lastLeaf), m_lastLeaf->m_numKeyEntrys));
                }
            }

            typedef ceda::make_const_iterator<IteratorPolicy> const_iterator;
            const_iterator begin() const { return const_cast<_BPlusTree*>(this)->begin(); }
            const_iterator end() const { return const_cast<_BPlusTree*>(this)->end(); }
            
            typedef std::reverse_iterator<const_iterator> const_reverse_iterator;
            const_reverse_iterator rbegin() const { return const_reverse_iterator(end()); }
            const_reverse_iterator rend() const { return const_reverse_iterator(begin()); }

            typedef std::reverse_iterator<iterator> reverse_iterator;
            reverse_iterator rbegin() { return reverse_iterator(end()); }
            reverse_iterator rend() { return reverse_iterator(begin()); }

            ///////////////////////////////////////////////////////////////////////////////////////

            ceda::ssize_t count(const _Key& key) const
            {
                return rFind(key) ? 1 : 0;
            }
            
            const_iterator find(const _Key& key) const;
            
            // Note that find doesn't mark the node as dirty.  However the act of dereferencing the
            // returned iterator will do that.
            iterator find(const _Key& key) 
            { 
                const_iterator i = const_cast<const _BPlusTree*>(this)->find(key);
                return iterator(i);
                //return reinterpret_cast<iterator&>(i);
            }

            const_iterator lower_bound(const _Key& key) const;
            iterator lower_bound(const _Key& key) 
            { 
                const_iterator i = const_cast<const _BPlusTree*>(this)->lower_bound(key);
                return iterator(i);
                //return reinterpret_cast<iterator&>(i);
            }

            const_iterator upper_bound(const _Key& key) const;
            iterator upper_bound(const _Key& key) 
            { 
                const_iterator i = const_cast<const _BPlusTree*>(this)->upper_bound(key);
                return iterator(i);
                //return reinterpret_cast<iterator&>(i);
            }

            @if (bHavePayload) 
            {
                const _Payload& operator[](const _Key& key) const
                { 
                    const _Payload* p = rFind(key);
                    cxAssert(p);
                    return *p;
                }
                
                _Payload& operator[](const _Key& key)
                {
                    wFindRet ret;
                    wFind(ret,key,true);
                    cxAssert(ret.m_payload);
                    @if (_bPersist)
                    {
                        cxAssert(ret.m_containingNode);
                        MarkPersistentAsDirtyWithoutTrace(ret.m_containingNode);
                    }
                    return *ret.m_payload;
                }
            }

            // Returns number of elements erased (0 or 1)
            size_type erase(const _Key& key)
            {
                return Delete(key) ? 1 : 0;
            }
            void erase(iterator i)
            {
                // todo: inefficient.
                @if (bHavePayload) 
                {
                    erase(i->first);
                }
                @else
                {
                    erase(*i);
                }
            }
            //void erase(iterator first, iterator last);
            
            std::pair<iterator,bool> insert(const value_type& x);

            void clear();
            void swap(_BPlusTree& rhs);
            
            ///////////////////////////////////////////////////////////////////////////////////////
            // Unconventional API    

            @if (bHavePayload) 
            {
                static ceda::ssize_t GetPayloadOffset()
                {
                    return offsetof(LeafKeyEntry,second);
                }
            }
            
            @if (_bPersist)
            {
                // Implementation of IObject
                void VisitObjects(ceda::IObjectVisitor& v) const;

                // Implementation of ISerialisable
                void VisitPrefs(ceda::IPrefVisitor& v) const;
                void Serialise(ceda::Archive& ar) const;
                void Deserialise(ceda::InputArchive& ar);
            }

            // Provides the return value for wFind
            struct wFindRet
            {
                @if (bHavePayload) 
                {
                    @if (_bPersist)
                    {
                        // The B+Tree leaf node that contains the payload.  Should be marked as dirty
                        // if the payload is subsequently modified.
                        // Can be null if wFind() called with allowCreate = false and no entry with the
                        // provided key exists.
                        ceda::ptr<ceda::IPersistable> m_containingNode;
                    }
                    _Payload* m_payload;         // The payload for the requested key
                }
                LeafNode* m_leaf;           // Leaf node containing the key
                ceda::ssize_t m_index;                // Index position in m_leaf
                bool m_created;             // Was a (key,payload) entry created?
            };
            
            /*
            Find the payload with the given key.  Returns the address of the payload in ret.m_payload.
            
            If allowCreate = true then 
            
                *   If no entry currently exists for the given key this function creates a new entry 
                    (with the payload initialsed using the default ctor).
                    
                *   ret.m_created indicates whether an entry was created (because no entry with 
                    the given key previously existed)
                
            If allowCreate = false then
            
                *   If no entry exists for the given key this function returns 
                    ret.m_payload = nullptr and ret.m_containingNode = null.
                    
                *   Always returns ret.m_created = false.
            */
            void wFind(wFindRet& ret,const _Key& key,bool allowCreate);

            @if (bHavePayload) 
            {
                // Insert the given key and payload into the B+Tree.  Returns false if an entry with the given
                // key is already present.
                bool Insert(const _Key& key, const _Payload& payload, bool allowReplace = true);
            }
            @else
            {
                // Insert the given key into the set.  Returns false if an entry with the given key is already present.
                bool Insert(const _Key& key);
            }
            
            ceda::ssize_t GetNumNodes() const;
            
            @if (_bWriteToxostream)
            {
                void Write(ceda::xostream& os) const;
            }

            void Validate() const;

            @if (bHavePayload) 
            {
                // Find the payload for given key.  Returns nullptr if not found.
                const _Payload* rFind(const _Key& key) const;
            }
            @else
            {
                // Test whether the set contains the given key
                bool rFind(const _Key& key) const;
            }
            
        private:
            bool Delete(const _Key& key);

            @if (!_bPersist)
            {
                void DeleteNodes(ceda::ssize_t height, Node* node);
                void DeleteNodes();
            }
            
            /////////////////////////////////// Search //////////////////////////////////////////

            // Search the BTree from the given node for the LeafKeyEntry with the given key
            // Returns -1 if not found
            ceda::ssize_t SearchLeaf(const _Key& key, LeafNode* node) const;

            ceda::ssize_t LowerBoundInLeaf(const _Key& key, LeafNode* node) const;
            ceda::ssize_t UpperBoundInLeaf(const _Key& key, LeafNode* node) const;

            /////////////////////////////////// Insertion ///////////////////////////////////////

            LeafNode* wFindInLeaf(wFindRet& ret, LeafNode* left, const _Key& key, bool allowCreate);
            void InsertIntoInternalNode(InternalNode* node, const InternalKeyEntry& e, InternalKeyEntry& overflow);
            void wFindInNode(wFindRet& ret,Node* left, ceda::ssize_t height, const _Key& key, InternalKeyEntry& rightEntry, bool allowCreate);
            void wRecursiveFind(wFindRet& ret,InternalNode* left, ceda::ssize_t height, const _Key& key, InternalKeyEntry& rightEntry, bool allowCreate);

            /////////////////////////////////// Deletion ///////////////////////////////////////

            void MergeLeafRightToLeft(LeafNode* left, LeafNode* right);
            void ReplaceKeyInAnchor(InternalNode* node, const _Key& oldKey, const _Key& newKey);
            void BalanceLeafRightToLeft(LeafNode* left, LeafNode* right, InternalNode* anchor);
            void BalanceLeafLeftToRight(LeafNode* left, LeafNode* right, InternalNode* anchor);
            void MergeInternalRightToLeft(InternalNode* left, InternalNode* right, InternalNode* anchor);
            void BalanceInternalLeftToRight(InternalNode* left, InternalNode* right, InternalNode* anchor);
            void BalanceInternalRightToLeft(InternalNode* left, InternalNode* right, InternalNode* anchor);
            bool RecursiveDelete(
                ceda::ssize_t height, Node* thisNode, Node* lNode, Node* rNode, 
                InternalNode* lAnchor, InternalNode* rAnchor,
                InternalNode* parent,
                Node*& deleteNode,
                const _Key& key);

            /////////////////////////////////// Debugging ///////////////////////////////////////

            void ValidateNode(ceda::ssize_t height, Node* node, bool isRoot, const _Key* k1, const _Key* k2) const;

            ceda::ssize_t GetNumNodes(ceda::ssize_t height, Node* node) const;
            void WriteNode(ceda::xostream& os, ceda::ssize_t height, Node* node) const;

        private:
            ceda::ssize_t m_size;               // Total number of (key,payload) entries stored in the B+Tree
            mPointer(LeafNode) m_firstLeaf;	    // pointer to first leaf node in a linked list.  Never nullptr
            mPointer(LeafNode) m_lastLeaf;	    // pointer to last leaf node in a linked list.  Never nullptr
            mPointer(Node) m_root;              // pointer to root node.  Could be leaf node or internal node
            ceda::ssize_t m_height;             // 0 if root node is a leaf node
        };
    }
    @if (_bImplement)
    {
        @def AsInternalNode = _BPlusTree@@@quote(AsInternalNode)
        @def AsLeafNode = _BPlusTree@@@quote(AsLeafNode)
        @def MoveKey = _BPlusTree@@@quote(MoveKey)
        @def CopyConstructKey = _BPlusTree@@@quote(CopyConstructKey)
        @def ConstructPayload = _BPlusTree@@@quote(ConstructPayload)

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // Downcasts

        inline InternalNode* AsInternalNode(Node* n)
        {
            return (InternalNode*) n;
        }

        inline LeafNode* AsLeafNode(Node* n)
        {
            return (LeafNode*) n;
        }

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // Moving elements
        
        // These functions assume that keys and payloads are relocateable.  

        /////////// Key

        inline void MoveKey(_Key& dst, _Key& src)
        {
            memcpy(&dst, &src, sizeof(_Key));
        }
        
        /*
        inline void SwapKey(_Key& dst, _Key& src)
        {
            ceda::memswap(&dst, &src, sizeof(_Key));
        }
        */
        
        inline void CopyConstructKey(_Key& dst, const _Key& src)
        {
            new (&dst) _Key(src);
        }
        
        ////////// Payload
        
        @if (bHavePayload)
        {
            inline void ConstructPayload(_Payload& payload)
            {
                new (&payload) _Payload();
            }
        }
                
        /////////// LeafKeyEntry
        /*
        inline void MoveLeafKeyEntry(LeafKeyEntry& dst, LeafKeyEntry& src)
        {
            memcpy(&dst, &src, sizeof(LeafKeyEntry));
        }
        */
        //////////// LeafNode entries

        inline void DestructLeafEntry(LeafNode* node, ceda::ssize_t pos)
        {
            node->GetEntry(pos).~LeafKeyEntry();
        }

        inline void CopyConstructLeafEntry(LeafNode* node, ceda::ssize_t pos, const LeafKeyEntry& e)
        {
            new (&node->GetEntry(pos)) LeafKeyEntry(e);
        }

        inline void MoveLeafEntries(const LeafNode* src, ceda::ssize_t s, LeafNode* dst, ceda::ssize_t d, ceda::ssize_t count)
        {
            //Tracer() << "MoveLeafEntries src= " << src << " dst= " << dst << "  s= " << s << " d= " << d << "  count= " << count << '\n';
            //Tracer() << "count * sizeof(LeafKeyEntry) = " << count * sizeof(LeafKeyEntry) << '\n';
            memcpy(&dst->GetEntry(d),&src->GetEntry(s), count * sizeof(LeafKeyEntry) );
        } 

        inline void ShiftLeafEntries(LeafNode* node, ceda::ssize_t s, ceda::ssize_t d, ceda::ssize_t count)
        {
            //Tracer() << "ShiftLeafEntries node= " << node << "  s= " << s << " d= " << d << "  count= " << count << '\n';
            //Tracer() << "count * sizeof(LeafKeyEntry) = " << count * sizeof(LeafKeyEntry) << '\n';
            memmove(&node->GetEntry(d),&node->GetEntry(s), count * sizeof(LeafKeyEntry) );
        } 
        
        //////////// InternalNode entries

        inline void DestructInternalEntry(InternalNode* node, ceda::ssize_t pos)
        {
            node->GetEntry(pos).~InternalKeyEntry();
        }

        inline void CopyConstructInternalEntry(InternalNode* node, ceda::ssize_t pos, const InternalKeyEntry& e)
        {
            new (&node->GetEntry(pos)) InternalKeyEntry(e);
        }

        inline void MoveInternalEntries(const InternalNode* src, ceda::ssize_t s, InternalNode* dst, ceda::ssize_t d, ceda::ssize_t count)
        {
            //Tracer() << "MoveInternalEntries src= " << src << " dst= " << dst << "  s= " << s << " d= " << d << "  count= " << count << '\n';
            memcpy(&dst->GetEntry(d),&src->GetEntry(s), count * sizeof(InternalKeyEntry) );
        } 

        inline void ShiftInternalEntries(InternalNode* node, ceda::ssize_t s, ceda::ssize_t d, ceda::ssize_t count)
        {
            //Tracer() << "ShiftInternalEntries node= " << node << "  s= " << s << " d= " << d << "  count= " << count << '\n';
            memmove(&node->GetEntry(d),&node->GetEntry(s), count * sizeof(InternalKeyEntry) );
        } 

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // InternalNode

        InternalNode::~InternalNode()
        {
            ceda::DestructArrayElements(&GetEntry(0), m_numKeyEntrys);
        }

        @if (_bPersist)
        {
            void InternalNode::VisitObjects(ceda::IObjectVisitor& v) const
            { 
                v << m_ptr;
                for (ceda::ssize_t i=0 ; i < m_numKeyEntrys ; ++i)
                {
                    @if (_bKeyMayHavePrefs)
                    {
                        v << GetEntry(i).m_key;
                    }
                    v << GetEntry(i).m_ptr;
                }
            }

            void InternalNode::VisitPrefs(ceda::IPrefVisitor& v) const
            { 
                v << m_ptr;
                for (ceda::ssize_t i=0 ; i < m_numKeyEntrys ; ++i)
                {
                    @if (_bKeyMayHavePrefs)
                    {
                        v << GetEntry(i).m_key;
                    }
                    v << GetEntry(i).m_ptr;
                }
            }

            const ceda::schema_t InternalNode@@_SCHEMA = 1;

            void InternalNode::Serialise(ceda::Archive& ar) const
            {
                ceda::SerialiseSchema(ar, InternalNode@@_SCHEMA);
                ar << m_ptr;
                SerialiseVariableLengthUint(ar,m_numKeyEntrys);
                for (ceda::ssize_t i=0 ; i < m_numKeyEntrys ; ++i)
                {
                    ar << GetEntry(i).m_key
                       << GetEntry(i).m_ptr;
                }
            }

            void InternalNode::Deserialise(ceda::InputArchive& ar)
            {
                ceda::schema_t schema;
                ceda::DeserialiseSchema(ar,InternalNode@@_SCHEMA,schema);
                ar >> m_ptr;
                
                ceda::ssize_t n;
                DeserialiseVariableLengthUint(ar,n);
                cxAssert(0 <= n && n <= MAX_NUM_INTERNAL_KEY_ENTRYS);
                ceda::ResizeArrayElements(&GetEntry(0), m_numKeyEntrys, n);
                m_numKeyEntrys = n;
                
                for (ceda::ssize_t i=0 ; i < m_numKeyEntrys ; ++i)
                {
                    ar >> GetEntry(i).m_key
                       >> GetEntry(i).m_ptr;
                }
            }
        }
                
        std::pair<Node*,ceda::ssize_t> InternalNode::FindChildGivenKey2(const _Key& key)
        {
            /*
               As an example, for nk = 4 we have the following picture
            
               Key               K0       K1       K2       K3  
                                   \         \        \        \
               Ptr           P       P0       P1       P2       P3
                             |       |        |        |        |
               Range    [-inf,K0)  [K0,K1)  [K1,K2)  [K2,K3)  [K3,+inf)
            */
            ceda::ssize_t nk = m_numKeyEntrys;
            cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);
            
            if (_CompareKeysFn(key,GetEntry(0).m_key) < 0)
            {
                // key < K0
                return std::pair<Node*,ceda::ssize_t>(mGetPtr(m_ptr),-1);
            }
            else
            {
                /*
                Binary search of the node
                For convenience let's define  K(nk) = +inf
                
                Claim: At each iteration mid = floor((lo+hi)/2) must be in the range [lo,hi)
                Proof: By the condition in the while loop we know that lo < hi.
                       Clearly mid >= lo, and mid <= hi.
                       Suppose mid = hi. Then hi <= (lo+hi)/2.  I.e. 2hi <= lo+hi. I.e. hi <= lo.
                       This contradicts lo < hi.

                Claim: An invariant of the binary search is that the given key is in the half open 
                       interval [ K(lo-1), K(hi) )
                Proof (by induction)
                    1.   This invariant is met with the initial settings of lo=1 and hi=nk, 
                         because in this else block we know that key >= K(0) = K(lo-1), and 
                         K(hi) = K(hk) = +inf
                    
                    2.   At each iteration lo < hi therefore mid = floor( (lo+hi)/2 ) must in in the range
                         [lo,hi).
                         a.  If key >= K(mid) then lo becomes mid+1, and K(lo-1) = K(mid+1-1) = K(mid).  
                             Therefore after changing lo we still have key >= K(lo-1).  I.e. the invariant 
                             remains true
                         b.  Otherwise if key < K(mid) then hi is assigned to mid.  Therefore key < K(hi)
                             remains true.
                
                Claim : The binary search exits with lo = hi
                Proof : 
                   Initially lo = 1 and hi = nk.  Now 1 <= nk therefore lo <= hi
                   At each iteration mid is in the range [lo,hi).  If lo is set to mid+1 then lo <= hi.  
                   Otherwise if hi is set to mid then lo <= mid.  Therefore it is not possible for the loop
                   to exit with lo > hi.  It also clearly can't exit with hi < lo.  Therefore we deduce 
                   that it must exit with lo = hi.
                   It only remains to show that it doesn't get stuck in an infinite loop.  This follows 
                   because each iteration always increases lo or decreases hi.
                */
                ceda::ssize_t lo = 1;
                ceda::ssize_t hi = nk;
                while (lo < hi)
                {
                    ceda::ssize_t mid = (lo + hi)/2;
                    cxAssert(1 <= mid && mid < nk);
                    
                    if (_CompareKeysFn(key,GetEntry(mid).m_key) >= 0) lo = mid + 1;
                    else                                           hi = mid;
                }
                cxAssert(lo == hi);
                cxAssert(1 <= lo && lo <= nk);
                return std::pair<Node*,ceda::ssize_t>(mGetPtr(GetEntry(lo-1).m_ptr),lo-1);
            }
        }

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // LeafNode

        LeafNode::~LeafNode()
        {
            ceda::DestructArrayElements(&GetEntry(0), m_numKeyEntrys);
        }
            
        @if (_bPersist)
        {
            void LeafNode::VisitObjects(ceda::IObjectVisitor& v) const
            { 
                v << m_prev << m_next;
                for (ceda::ssize_t i=0 ; i < m_numKeyEntrys ; ++i)
                {
                    @if (_bKeyMayHavePrefs)
                    {
                        v << GetEntry(i).first;
                    }
                    @if (_bPayLoadMayHavePrefs)
                    {
                        v << GetEntry(i).second;
                    }
                }
            }

            void LeafNode::VisitPrefs(ceda::IPrefVisitor& v) const
            { 
                // Note that prev and next ptrs are weak pointers so we don't visit them
                for (ceda::ssize_t i=0 ; i < m_numKeyEntrys ; ++i)
                {
                    @if (_bKeyMayHavePrefs)
                    {
                        v << GetEntry(i).first;
                    }
                    @if (_bPayLoadMayHavePrefs)
                    {
                        v << GetEntry(i).second;
                    }
                }
            }

            const ceda::schema_t LeafNode@@_SCHEMA = 1;
            
            void LeafNode::Serialise(ceda::Archive& ar) const
            {
                ceda::SerialiseSchema(ar, LeafNode@@_SCHEMA);
                ar << m_prev
                   << m_next;
                   
                SerialiseVariableLengthUint(ar,m_numKeyEntrys);
                for (ceda::ssize_t i=0 ; i < m_numKeyEntrys ; ++i)
                {
                    ar << GetEntry(i).first;
                    @if (bHavePayload)
                    {
                        ar << GetEntry(i).second;
                    }
                }
            }

            void LeafNode::Deserialise(ceda::InputArchive& ar)
            {
                ceda::schema_t schema;
                ceda::DeserialiseSchema(ar,LeafNode@@_SCHEMA,schema);
                ar >> m_prev
                   >> m_next;
                   
                ceda::ssize_t n;
                DeserialiseVariableLengthUint(ar,n);
                cxAssert(0 <= n && n <= MAX_NUM_LEAF_KEY_ENTRYS);
                ceda::ResizeArrayElements(&GetEntry(0), m_numKeyEntrys, n);
                m_numKeyEntrys = n;
                   
                for (ceda::ssize_t i=0 ; i < m_numKeyEntrys ; ++i)
                {
                    ar >> GetEntry(i).first;
                    @if (bHavePayload)
                    {
                       ar >> GetEntry(i).second;
                    }
                }
            }
        }

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // _BPlusTree

        _BPlusTree::_BPlusTree() :
            m_size(0),
            m_height(-1)
        {
            cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS > 0);
            cxAssert(MIN_NUM_LEAF_KEY_ENTRYS > 0);
            @if (!_bPersist)
            {
                m_firstLeaf = nullptr;
                m_lastLeaf = nullptr;
                m_root = nullptr;
            }
        }
        
        @if (!_bPersist)
        {
            _BPlusTree::~_BPlusTree()
            {
                DeleteNodes();
            }
        }
        
        _BPlusTree::_BPlusTree(const _BPlusTree& rhs) :
            m_size(0),
            m_height(-1)
        {
            @if (!_bPersist)
            {
                m_firstLeaf = nullptr;
                m_lastLeaf = nullptr;
                m_root = nullptr;
            }
            
            // todo : inefficient
            const_iterator i = rhs.begin();
            while(i != rhs.end())
            {
                insert(*i);
                ++i;
            }
        }
        
        _BPlusTree& _BPlusTree::operator=(const _BPlusTree& rhs)
        {
            // Use the copy and swap idiom
            _BPlusTree temp(rhs);
            temp.swap(*this);           // Non throwing swap
            return *this;
        }
        
        void _BPlusTree::clear()
        {
            @if (_bPersist)
            {
                if (m_height == 0)
                {
                    ceda::AsyncPermanentlyDeleteSubTree(AsLeafNode(m_root.Get()));
                }
                else if (m_height > 0)
                {
                    ceda::AsyncPermanentlyDeleteSubTree(AsInternalNode(m_root.Get()));
                }
                m_root.Clear();
                m_firstLeaf.Clear();
                m_lastLeaf.Clear();
                MarkAsDirtyWithoutTrace();
            }
            @else
            {
                DeleteNodes();
                m_firstLeaf = nullptr;
                m_lastLeaf = nullptr;
                m_root = nullptr;
            }
            m_size = 0;
            m_height = -1;
        }

        void _BPlusTree::swap(_BPlusTree& rhs)
        {
            //todo: assert that rhs belongs to the same PersistStore.
            using std::swap;
            swap(m_size, rhs.m_size);
            swap(m_height, rhs.m_height);
            swap(m_root, rhs.m_root);
            swap(m_firstLeaf, rhs.m_firstLeaf);
            swap(m_lastLeaf, rhs.m_lastLeaf);
            @if (_bPersist)
            {
                MarkAsDirtyWithoutTrace();
                rhs.MarkAsDirtyWithoutTrace();
            }
        }

        @if (!bHavePayload || _bPayloadsOrdered)
        {
            bool _BPlusTree::operator<(const _BPlusTree& rhs) const
            {
                const_iterator i = begin();
                const_iterator j = rhs.begin();
                while(i != end())
                {
                    if (j == rhs.end())
                    {
                        return false;
                    }
                    if (*i < *j) return true;
                    if (*i != *j) return false;
                    ++i; ++j;
                }
                return j != rhs.end();
            }
        }
        
        bool _BPlusTree::operator==(const _BPlusTree& rhs) const
        {
            if (m_size != rhs.m_size) return false;
            const_iterator i = begin();
            const_iterator j = rhs.begin();
            while(i != end())
            {
                cxAssert(j != rhs.end());
                if (*i != *j) return false;
                ++i; ++j;
            }
            cxAssert(j == rhs.end());
            return true;
        }

        @if (_bPersist)
        {
            void _BPlusTree::VisitObjects(ceda::IObjectVisitor& v) const
            { 
                v << m_firstLeaf 
                  << m_lastLeaf
                  << m_root;
            }

            void _BPlusTree::VisitPrefs(ceda::IPrefVisitor& v) const
            { 
                // Note that m_firstLeaf and m_lastLeaf are weak pointers, so we don't visit them.
                v << m_root;
            }

            const ceda::schema_t _BPlusTree@@_SCHEMA = 1;

            void _BPlusTree::Serialise(ceda::Archive& ar) const
            {
                ceda::SerialiseSchema(ar, _BPlusTree@@_SCHEMA);
                SerialiseVariableLengthUint(ar,m_size);
                ar << m_firstLeaf
                   << m_lastLeaf
                   << m_root;
                   
                // m_height >= -1 so we serialise m_height+1.
                SerialiseVariableLengthUint(ar,m_height+1);
            }

            void _BPlusTree::Deserialise(ceda::InputArchive& ar)
            {
                ceda::schema_t schema;
                ceda::DeserialiseSchema(ar,_BPlusTree@@_SCHEMA,schema);
                DeserialiseVariableLengthUint(ar,m_size);
                ar >> m_firstLeaf
                   >> m_lastLeaf
                   >> m_root;
                DeserialiseVariableLengthUint(ar,m_height); --m_height;
                cxAssert(m_size >= 0);
                cxAssert(m_height >= -1);
            }
        }
        
        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // Searching a B+Tree

        ceda::ssize_t _BPlusTree::SearchLeaf(const _Key& key, LeafNode* leaf) const
        {
            cxAssert(leaf);
            ceda::ssize_t nk = leaf->m_numKeyEntrys;
            cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
            
            // Use binary search on this node.  We have keys  K0, .... , Kn-1
            ceda::ssize_t lo = 0;
            ceda::ssize_t hi = nk-1;
            while (lo <= hi)
            {
                ceda::ssize_t mid = (lo + hi)/2;
                int c = _CompareKeysFn(key, leaf->GetEntry(mid).first);
                if      (c > 0) lo = mid + 1;
                else if (c < 0) hi = mid - 1;
                else            return mid;  // found
            }
            
            // not found
            return -1;
        }

        ceda::ssize_t _BPlusTree::LowerBoundInLeaf(const _Key& key, LeafNode* leaf) const
        {
            cxAssert(leaf);
            ceda::ssize_t nk = leaf->m_numKeyEntrys;
            cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
            
            ceda::ssize_t lo = 0;
            ceda::ssize_t hi = nk;
            while (lo < hi)
            {
                ceda::ssize_t mid = (lo + hi)/2;
                cxAssert(0 <= mid && mid < nk);
                int c = _CompareKeysFn(key, leaf->GetEntry(mid).first);
                if      (c > 0) lo = mid + 1;
                else            hi = mid;
            }
            cxAssert(lo == hi);
            cxAssert(0 <= hi && hi <= nk);
            return hi;
        }

        ceda::ssize_t _BPlusTree::UpperBoundInLeaf(const _Key& key, LeafNode* leaf) const
        {
            cxAssert(leaf);
            ceda::ssize_t nk = leaf->m_numKeyEntrys;
            cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);

            ceda::ssize_t lo = 0;
            ceda::ssize_t hi = nk;
            while (lo < hi)
            {
                ceda::ssize_t mid = (lo + hi)/2;
                cxAssert(0 <= mid && mid < nk);
                int c = _CompareKeysFn(key, leaf->GetEntry(mid).first);
                if      (c < 0) hi = mid;
                else            lo = mid + 1;
            }
            cxAssert(lo == hi);
            cxAssert(0 <= hi && hi <= nk);
            return hi;
        }

        _BPlusTree::const_iterator _BPlusTree::find(const _Key& key) const
        {
            // Find the Leaf node containing the given key
            if (Node* node = mGetPtr(m_root))
            {
                cxAssert(m_height >= 0);
                for (ceda::ssize_t i=0 ; i < m_height ; ++i)
                {
                    // Let node be the child of node according to the key testing
                    node = AsInternalNode(node)->FindChildGivenKey(key);
                    cxAssert(node);
                }
                ceda::ssize_t j = SearchLeaf(key,AsLeafNode(node));

                return (j == -1) ? end() : const_iterator(IteratorPolicy(AsLeafNode(node), j));
            }
            else
            {
                cxAssert(m_height == -1);
                return const_iterator(IteratorPolicy(nullptr,0));
            }
        }

        _BPlusTree::const_iterator _BPlusTree::lower_bound(const _Key& key) const
        {
            // Find the Leaf node containing the given key
            if (Node* node = mGetPtr(m_root))
            {
                cxAssert(m_height >= 0);
                for (ceda::ssize_t i=0 ; i < m_height ; ++i)
                {
                    // Let node be the child of node according to the key testing
                    node = AsInternalNode(node)->FindChildGivenKey(key);
                    cxAssert(node);
                }
                ceda::ssize_t j = LowerBoundInLeaf(key,AsLeafNode(node));
                cxAssert(0 <= j && j <= AsLeafNode(node)->m_numKeyEntrys);
                if (j == AsLeafNode(node)->m_numKeyEntrys)
                {
                    if (LeafNode* next = mGetPtr(AsLeafNode(node)->m_next))
                    {
                        return const_iterator(IteratorPolicy(next,0));
                    }
                }
                return const_iterator(IteratorPolicy(AsLeafNode(node), j));
            }
            else
            {
                cxAssert(m_height == -1);
                return const_iterator(IteratorPolicy(nullptr,0));
            }
        }

        _BPlusTree::const_iterator _BPlusTree::upper_bound(const _Key& key) const
        {
            // Find the Leaf node containing the given key
            if (Node* node = mGetPtr(m_root))
            {
                cxAssert(m_height >= 0);
                for (ceda::ssize_t i=0 ; i < m_height ; ++i)
                {
                    // Let node be the child of node according to the key testing
                    node = AsInternalNode(node)->FindChildGivenKey(key);
                    cxAssert(node);
                }
                ceda::ssize_t j = UpperBoundInLeaf(key,AsLeafNode(node));
                cxAssert(0 <= j && j <= AsLeafNode(node)->m_numKeyEntrys);
                if (j == AsLeafNode(node)->m_numKeyEntrys)
                {
                    if (LeafNode* next = mGetPtr(AsLeafNode(node)->m_next))
                    {
                        return const_iterator(IteratorPolicy(next,0));
                    }
                }
                return const_iterator(IteratorPolicy(AsLeafNode(node), j));
            }
            else
            {
                cxAssert(m_height == -1);
                return const_iterator(IteratorPolicy(nullptr,0));
            }
        }

        @def rFindReturnType = @if (bHavePayload) {const _Payload*} @else {bool}
        
        rFindReturnType _BPlusTree::rFind(const _Key& key) const
        {
            // Find the Leaf node containing the given key
            if (Node* node = mGetPtr(m_root))
            {
                cxAssert(m_height >= 0);
                for (ceda::ssize_t i=0 ; i < m_height ; ++i)
                {
                    // Let node be the child of node according to the key testing
                    node = AsInternalNode(node)->FindChildGivenKey(key);
                    cxAssert(node);
                }
                ceda::ssize_t j = SearchLeaf(key,AsLeafNode(node));
                @if (bHavePayload)
                {
                    return (j == -1) ? nullptr : &AsLeafNode(node)->GetEntry(j).second;
                }
                @else
                {
                    return j != -1;
                }
            }
            else
            {
                cxAssert(m_height == -1);
                @if (bHavePayload)
                {
                    return nullptr;
                }
                @else
                {
                    return false;
                }
            }
        }

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // Insertion into a B+Tree

        /*
        Insertion for given key involves the following steps

        1.  Recurse down the tree according to the key until reach the relevant leaf node
            If key is found in leaf node then an entry for that key already exists.

        2.  As the stack unwinds we may need to insert a node into an internal node.  This can
            cause an internal node to split,  causing a further insertion into its parent and so on
            
        3.  If the root node overflows then a new internal node must be created for the root, and the
            height of the tree increases by 1.
        */

        /*
        Find or create an entry with the given key under the given leaf node (which was found after recursing
        down through the B+Tree with the given key.
        
        allowCreate indicates whether it is permissible to create a new entry with the given key.

        If an insertion would make the leaf node overflow, then (at least conceptually)
            1. the key is inserted anyway
            2. A new 'overflow' leaf node is created that is inserted to the immediate right.  The 
               linked list of leaf node pointers are updated accordingly.
            3. Our (left) leaf node is split into two about the median, by copying the top half of the
               entries into the 'overflow' leaf node.
        */
        LeafNode* _BPlusTree::wFindInLeaf(wFindRet& ret,LeafNode* left, const _Key& key, bool allowCreate)
        {
            cxAssert(left);

            ceda::ssize_t nk = left->m_numKeyEntrys;
            
            // Note that if the root node is a leaf then it may contain zero entries.
            cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
            
            // Use binary search to find the insertion position.  We have keys  K0, .... , Kn-1
            ceda::ssize_t insertPos;
            {
                ceda::ssize_t lo = 0;
                ceda::ssize_t hi = nk-1;
                while (lo <= hi)
                {
                    ceda::ssize_t mid = (lo + hi)/2;
                    cxAssert(0 <= mid && mid < nk);
                    int c = _CompareKeysFn(key, left->GetEntry(mid).first);
                    if      (c > 0) lo = mid + 1;
                    else if (c < 0) hi = mid - 1;
                    else            
                    {
                        // _Key has been found
                        @if (bHavePayload)
                        {
                            @if (_bPersist)
                            {
                                ret.m_containingNode = left;
                            }
                            ret.m_payload = &left->GetEntry(mid).second;
                        }
                        ret.m_leaf = left;
                        ret.m_index = mid;
                        ret.m_created = false;
                        return nullptr;
                    }
                }

                // Correct insertion position is at lo.  Note that lo = nk is possible!
                insertPos = lo;
            }
            
            // Key not found
            if (allowCreate)
            {
                // Macro to copy construct 'key' in LeafNode 'node' at position 'insertPos'
                // A default constructed payloaded is created
                @def mCopyConstructKeyAndDefaultConstructPayload(node,insertPos) =
                {
                    LeafKeyEntry& e = node->GetEntry(insertPos);

                    CopyConstructKey(e.first,key);
                    @if (_bPersist)
                    {
                        @if (_bDeclareKeysReachable) {DeclareReachableX(node,e.first);}
                    }

                    @if (bHavePayload)
                    {
                        ConstructPayload(e.second);
                        @if (_bPersist)
                        {
                            @if (_bDeclarePayloadsReachable) {DeclareReachableX(node,e.second);}
                            ret.m_containingNode = node;
                        }
                        ret.m_payload = &e.second;
                    }

                    ret.m_leaf = node;
                    ret.m_index = insertPos;
                }

                ret.m_created = true;

                ++m_size;
                @if (_bPersist)
                {
                    MarkAsDirtyWithoutTrace();
                }

                // Note that normally insertPos shouldn't be zero because the downwards recurse only gets to 
                // this leaf node if its key is at least as large as the smallest key.  However, can have 
                // insertPos = 0 when the root node is a leaf node.
                cxAssert(0 <= insertPos && insertPos <= nk);
                
                if (nk == MAX_NUM_LEAF_KEY_ENTRYS)
                {
                    // Leaf node is already full, so this insertion will overflow
                    
                    // Create a new leaf node and insert to the immediate right of our node
                    LeafNode* next = mGetPtr(left->m_next);
                    LeafNode* right = mAlloc(LeafNode);
                    right->m_prev = left;
                    right->m_next = next;
                    if (next)
                    {
                        cxAssert(next->m_prev == left);
                        next->m_prev = right;
                        @if (_bPersist)
                        {
                            next->MarkAsDirtyWithoutTrace();
                        }
                    }
                    else
                    {
                        cxAssert(m_lastLeaf == left);
                        m_lastLeaf = right;
                    }
                    left->m_next = right;
                    @if (_bPersist)
                    {
                        left->MarkAsDirtyWithoutTrace();
                        DeclareReachable(left,right);
                    }
                    
                    // The total number of entrys (including the entry to be inserted) is MAX_NUM_LEAF_KEY_ENTRYS+1
                    // Let numLeftEntrys be the number of entrys to remain in 'left' (including perhaps the 
                    // entry to be inserted), and the remainder are copied into 'right'.
                    const ceda::ssize_t numLeftEntrys = (MAX_NUM_LEAF_KEY_ENTRYS + 1)/2;
                    
                    if (insertPos < numLeftEntrys)
                    {
                        // Number to keep in left, allowing for the insertion
                        ceda::ssize_t n = numLeftEntrys-1;

                        // Move top half of the entries into 'right', leaving n entries behind
                        MoveLeafEntries(left, n, right, 0, nk-n);
                        
                        // Shift entrys after insertPos upwards by one position to make space
                        ShiftLeafEntries(left, insertPos, insertPos+1, n-insertPos);

                        // Set the new entry
                        mCopyConstructKeyAndDefaultConstructPayload(left,insertPos)
                    }
                    else
                    {
                        // Insertion position in right node
                        ceda::ssize_t rightInsertPos = insertPos - numLeftEntrys;
                        cxAssert(rightInsertPos >= 0);

                        // Copy entries from left to right up to the insertion position
                        MoveLeafEntries(left, numLeftEntrys, right, 0, rightInsertPos);

                        // Set the new entry
                        mCopyConstructKeyAndDefaultConstructPayload(right,rightInsertPos)
                        
                        // Copy remaining entries from left to right
                        MoveLeafEntries(left, insertPos, right, rightInsertPos+1, nk-insertPos);
                    }
                    
                    left->m_numKeyEntrys = numLeftEntrys;
                    right->m_numKeyEntrys = MAX_NUM_LEAF_KEY_ENTRYS + 1 - numLeftEntrys;
                    
                    return right;
                }
                else
                {
                    // Shift entries after insertPos upwards by one position to make space
                    ShiftLeafEntries(left, insertPos, insertPos+1, nk-insertPos);
                            
                    // Set the new entry
                    mCopyConstructKeyAndDefaultConstructPayload(left,insertPos)
                                        
                    ++left->m_numKeyEntrys;
                    @if (_bPersist)
                    {
                        left->MarkAsDirtyWithoutTrace();
                    }
                    
                    return nullptr;
                }
            }
            else
            {
                @if (bHavePayload)
                {
                    @if (_bPersist)
                    {
                        ret.m_containingNode = ceda::null;
                    }
                    ret.m_payload = nullptr;
                }
                ret.m_leaf = nullptr;
                ret.m_index = -1;
                ret.m_created = false;
                return nullptr;
            }
        }

        /*
        Insert a new entry e into the given internal node.

        If the insertion would make the node overflow, then (at least conceptually)

            1. e is inserted anyway

            2. A new 'overflow' internal node is created that will be inserted to the immediate right.

            3. Our (left) internal node is split into two about the median, by copying the top half of the
               entries into the 'overflow' internal node.
               
            4. The returned InternalKeyEntry contains the key and ptr to the overflow node. This needs to
               be inserted into the parent node.
        */
        void _BPlusTree::InsertIntoInternalNode(InternalNode* left, const InternalKeyEntry& e, InternalKeyEntry& overflow)
        {
            cxAssert(left);

            ceda::ssize_t nk = left->m_numKeyEntrys;
            
            // Note that if the root node is an internal node then it may contain one entry.
            cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);
            
            /*
            Use binary search to find the insertion position.  We have keys  K(0), .... , K(nk-1)
            
            Let K = e.m_key be the key to be inserted
            For convenience, define K(-1) = -inf,  and K(nk) = +inf.


            Claim:  Each assignment to hi makes it strictly decrease
            Proof:  hi can only be replaced by hi' = mid-1 where mid = floor( (lo+hi) / 2)
                    mid = floor( (lo+hi) / 2)  =>  mid <= (lo+hi)/2 
                                               =>  2*mid <= lo + hi
                                               =>  2*mid <= 2*hi   (because lo <= hi in while condition)
                                               =>  mid <= hi
                                               =>  mid-1 < hi
                                               =>  hi' < hi

            Claim:  Each assignment to lo makes it strictly increase
            Proof:  lo can only be replaced by lo' = mid+1 where mid = floor( (lo+hi) / 2)
                    mid = floor( (lo+hi) / 2)  =>  mid >= floor( (lo+lo)/2 )   (because lo <= hi)  
                                               =>  mid >= lo
                                               =>  mid+1 > lo
                                               =>  lo' > lo
            
            Claim:  An invariant of the loop is hi < nk
            Proof:  Initially hi = nk-1 < nk.  At each iteration hi cannot increase
            
            Claim:  At each iteration  mid = floor( (lo+hi)/2 ) satisfies mid < nk
            Proof:  From a proof above we see that mid <= hi.  Now hi < nk therefore mid < nk
            
            Claim:  An invariant of the binary search is :  K is in [ K(lo-1), K(hi+1) )
            Proof: (by induction)
                *   Initially lo = 0, hi = nk-1.
                    So [ K(lo-1), K(hi+1) ) = [ K(-1), K(nk) ) = [-inf, +inf) which must contain K
                    
                *   At each iteration mid = floor( (lo+hi) / 2).
                    Therefore lo <= mid <= hi
                    Therefore lo-1 < mid < hi+1
                    1.  if K > K(mid) then lo becomes lo' = mid+1. 
                        lo-1 < mid < hi+1  =>  lo-1 < lo'-1 < hi+1.  Hence the interval strictly decreases 
                        in size but remains non-empty.
                        K > K(mid) =>  K > K(lo'-1)  so the invariant remains true
                    2.  if K < K(mid) then hi becomes hi' = mid-1
                        lo-1 < mid < hi+1  =>  lo-1 < hi'+1 < hi+1.  Hence the interval strictly decreases 
                        in size but remains non-empty.
                        K < K(mid) =>  K < K(hi'+1)  so the invariant remains true
                        
            Claim:  The loop eventually exits with lo = hi+1
            Proof:  At each iteration either lo strictly increases or else hi strictly decreases,  
                    therefore an infinite loop is not possible.
                    The invariant implies  lo-1 < hi+1  =>  lo < hi+2
                    The iteration exits with lo > hi.
                    So we have   hi < lo < hi+2
                    This implies lo = h1+1.
            */
            ceda::ssize_t insertPos;
            {
                ceda::ssize_t lo = 0;
                ceda::ssize_t hi = nk-1;
                while (lo <= hi)
                {
                    ceda::ssize_t mid = (lo + hi)/2;
                    cxAssert(0 <= mid && mid < nk);
                    int c = _CompareKeysFn(e.m_key, left->GetEntry(mid).m_key);
                    
                    // This node shouldn't already contain the key
                    cxAssert(c != 0);
                    
                    if      (c > 0) lo = mid + 1;
                    else            hi = mid - 1;
                }
                cxAssert(lo == hi+1);

                // Correct insertion position is at lo.  Note that lo = nk is possible!
                insertPos = lo;
            }
            cxAssert(0 <= insertPos && insertPos <= nk);
            
            if (nk == MAX_NUM_INTERNAL_KEY_ENTRYS)
            {
                // This internal node is already full, so this insertion will overflow
                
                // Create a new internal node and insert to the immediate right of our node
                InternalNode* right = mAlloc(InternalNode);
                @if (_bPersist)
                {
                    left->MarkAsDirtyWithoutTrace();
                    DeclareReachable(left,right);
                }
                overflow.m_ptr = right;
                
                // The total number of entrys (including the entry to be inserted but excluding the key to be
                // moved up into the parent) is MAX_NUM_INTERNAL_KEY_ENTRYS
                // Let numLeftEntrys be the number of entrys to remain in 'left' (including perhaps the 
                // entry to be inserted), and the remainder are copied into 'right'.
                const ceda::ssize_t numLeftEntrys = MAX_NUM_INTERNAL_KEY_ENTRYS / 2;

                if (insertPos <= numLeftEntrys)
                {
                    // Copy top half of the entries into 'right', leaving numLeftEntrys behind
                    MoveInternalEntries(left, numLeftEntrys, right, 0, nk - numLeftEntrys);
                    
                    // Shift entrys after insertPos upwards by 1 to make space for the insertion
                    ShiftInternalEntries(left, insertPos, insertPos+1, numLeftEntrys-insertPos);

                    // Insert at insertPos into left
                    CopyConstructInternalEntry(left,insertPos,e);
                    @if (_bPersist && _bDeclareKeysReachable)
                    {
                        DeclareReachableX(left,left->GetEntry(insertPos).m_key);
                    }
                }
                else
                {
                    // Insertion position in right node
                    ceda::ssize_t rightInsertPos = insertPos - (numLeftEntrys + 1);
                    cxAssert(rightInsertPos >= 0);

                    // Copy entries from left to right up to the insertion position
                    MoveInternalEntries(left, numLeftEntrys + 1, right, 0, rightInsertPos);

                    // Now insert the new entry
                    CopyConstructInternalEntry(right,rightInsertPos,e);
                    @if (_bPersist && _bDeclareKeysReachable)
                    {
                        DeclareReachableX(right,right->GetEntry(rightInsertPos).m_key);
                    }

                    // Copy remaining entries from left to right
                    MoveInternalEntries(left, insertPos, right, rightInsertPos+1, nk-insertPos);
                }

                // Key for the middle entry will be moved up into the parent
                right->m_ptr = left->GetEntry(numLeftEntrys).m_ptr;
                
                // todo : comments in macro arguments like _Key upset the call to the destructor!
                // Having comments appear in macro arguments seems to be a bad idea.  Perhaps
                // eager evaluation of macro arguments would be better?
                typedef _Key K;
                overflow.m_key.~K();
                
                MoveKey(overflow.m_key, left->GetEntry(numLeftEntrys).m_key);
                
                left->m_numKeyEntrys = numLeftEntrys;
                right->m_numKeyEntrys = MAX_NUM_INTERNAL_KEY_ENTRYS - numLeftEntrys;
            }
            else
            {
                // Shift entries after insertPos upwards by one position to make space
                ShiftInternalEntries(left, insertPos, insertPos+1, nk-insertPos);
                
                CopyConstructInternalEntry(left,insertPos,e);
                @if (_bPersist && _bDeclareKeysReachable)
                {
                    DeclareReachableX(left,left->GetEntry(insertPos).m_key);
                }
                
                ++left->m_numKeyEntrys;
                
                overflow.m_ptr = nullptr;
            }
            
            @if (_bPersist)
            {
                left->MarkAsDirtyWithoutTrace();
            }
        }
        
        void _BPlusTree::wFindInNode(wFindRet& ret,Node* left, ceda::ssize_t height, const _Key& key, InternalKeyEntry& rightEntry, bool allowCreate)
        {
            if (height > 0) 
            {
                wRecursiveFind(ret,AsInternalNode(left),height,key,rightEntry,allowCreate);
            }
            else
            {
                if (LeafNode* right = wFindInLeaf(ret,AsLeafNode(left),key,allowCreate))
                {
                    // Leaf node 'left' overflowed, causing creation of another leaf node 'right'
                    
                    // We really do need to make a second copy of the key.  In a B+Tree (but not a 
                    // BTree) InternalNodes normally record copies of (some of) the keys that occur in 
                    // the leaf nodes.
                    
                    // Fill rightEntry.  Note that fill() returns an address of an instance that 
                    // hasn't been constructed yet.
                    rightEntry.m_key = right->GetEntry(0).first;
                    rightEntry.m_ptr = right;
                }
            }
        }

        /*
        Insert (key,payload) into the sub-tree rooted by left which is assumed to be at the given height 
        from the leaf nodes (a zero height implies that left if a leaf).  Returns false if insertion failed
        because the given key already exists in the B+Tree.

        If 'left' overflows into a new node then the pointer to the new node is written to 'right'. 
        Otherwise 'right' is set to nullptr.
        */
        void _BPlusTree::wRecursiveFind(wFindRet& ret,InternalNode* left, ceda::ssize_t height, const _Key& key, InternalKeyEntry& rightEntry, bool allowCreate)
        {
            cxAssert(height >= 0);
            cxAssert(left);
            
            // In the manner of the normal search, linearly recurse down the B+Tree
            
            Node* childLeft = left->FindChildGivenKey(key);
            cxAssert(childLeft);

            InternalKeyEntry childRight;
            wFindInNode(ret,childLeft,height-1,key,childRight,allowCreate);
            
            // As the stack unwinds, insert entrys and split nodes as required
            if (childRight.m_ptr)
            {
                // The insertion into the (left) child node caused it to overflow into a new right child 
                // node, so we in turn have to insert a key and right child pointer into this internal
                // node.
                InsertIntoInternalNode(left,childRight,rightEntry);
            }
        }
        
        void _BPlusTree::wFind(wFindRet& ret,const _Key& key,bool allowCreate)
        {
            // Hmmm.  Should we be so eager?
            if (m_height == -1)
            {
                cxAssert(!m_root);
                LeafNode* n = mAlloc(LeafNode);
                m_firstLeaf = m_lastLeaf = n;
                m_root = n;
                m_height = 0;
                @if (_bPersist)
                {
                    MarkAsDirtyWithoutTrace();
                    DeclareReachable(this,n);
                }
            }
            
            cxAssert(m_height >= 0);
            cxAssert(m_root);
            
            InternalKeyEntry right;
            wFindInNode(ret,mGetPtr(m_root),m_height,key,right,allowCreate);
            if (right.m_ptr)
            {
                // The root node overflowed into 'right', so the height of the B+Tree will increase by one
                
                // Create a new (internal) root node with one key and two sub-tree pointers.
                InternalNode* newRoot = mAlloc(InternalNode);
                newRoot->m_ptr = m_root;                              // Left ptr to old root

                new (&newRoot->GetEntry(0)) InternalKeyEntry();
                newRoot->GetEntry(0).swap(right);
                newRoot->m_numKeyEntrys = 1;

                m_root = newRoot;
                ++m_height;
                @if (_bPersist)
                {
                    MarkAsDirtyWithoutTrace();
                    DeclareReachable(this,newRoot);
                }
            }
        }

        std::pair<_BPlusTree::iterator,bool> _BPlusTree::insert(const value_type& x)
        {
            wFindRet ret;
            wFind(ret,@if (bHavePayload) {x.first} @else {x},true);
            cxAssert(ret.m_leaf);
            @if (bHavePayload)
            {
                if (ret.m_created)
                {
                    cxAssert(ret.m_payload);
                    *ret.m_payload = x.second;
                    @if (_bPersist)
                    {
                        cxAssert(ret.m_containingNode);
                        MarkPersistentAsDirtyWithoutTrace(ret.m_containingNode);
                        @if (_bDeclareKeysReachable || _bDeclarePayloadsReachable)
                        {
                            DeclareReachableX(ret.m_containingNode,*ret.m_payload); 
                        }
                    }
                }
            }
            return std::pair<iterator,bool>(iterator(IteratorPolicy(ret.m_leaf, ret.m_index)),ret.m_created);
        }
        
        @if (bHavePayload)
        {
            bool _BPlusTree::Insert(const _Key& key, const _Payload& payload,bool allowReplace)
            {
                wFindRet ret;
                wFind(ret,key,true);
                cxAssert(ret.m_payload);
                if (allowReplace || ret.m_created)
                {
                    *ret.m_payload = payload;
                    @if (_bPersist)
                    {
                        cxAssert(ret.m_containingNode);
                        MarkPersistentAsDirtyWithoutTrace(ret.m_containingNode);
                        @if (_bDeclareKeysReachable || _bDeclarePayloadsReachable)
                        {
                            DeclareReachableX(ret.m_containingNode,*ret.m_payload); 
                        }
                    }
                }
                return ret.m_created;
            }
        }
        @else
        {
            bool _BPlusTree::Insert(const _Key& key)
            {
                wFindRet ret;
                wFind(ret,key,true);
                return ret.m_created;
            }
        }
        
        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // Validate the B+Tree

        // Invariants to check
        // 1.   Keys are in the expected range
        // 2.   Keys in a node strictly increase from left to right
        // 3.   Min and max fanout is respected

        // Ensure that all keys are in the range [*k1,*k2).  
        // If k1 != nullptr then check that all keys in this node satisfy k >= *k1
        // If k2 != nullptr then check that all keys in this node satisfy k < *k2
        // Ensure that fanout is in appropriate range
        void _BPlusTree::ValidateNode(ceda::ssize_t height, Node* node, bool isRoot, const _Key* k1, const _Key* k2) const
        {
            cxAssert(node);
            cxAssert(height >= 0);
            
            if (height > 0)
            {
                /*
                   As an example, for nk = 4 we have the following picture
                
                   Key               K0       K1       K2       K3  
                                       \         \        \        \
                   Ptr           P       P0       P1       P2       P3
                                 |       |        |        |        |
                   Range    [-inf,K0)  [K0,K1)  [K1,K2)  [K2,K3)  [K3,+inf)
                */
                InternalNode* inode = AsInternalNode(node);

                ceda::ssize_t nk = inode->m_numKeyEntrys;
                cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);
                
                if (!isRoot) cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= nk);

                // Check that keys strictly increase from left to right
                for (ceda::ssize_t i=1 ; i < nk ; ++i)
                {
                    cxAssert(_CompareKeysFn(inode->GetEntry(i-1).m_key, inode->GetEntry(i).m_key) < 0);
                }
                
                // Check that keys are in [*k1,*k2)
                if (k1) cxAssert(_CompareKeysFn(*k1,inode->GetEntry(0).m_key) <= 0);
                if (k2) cxAssert(_CompareKeysFn(inode->GetEntry(nk-1).m_key, *k2) < 0);

                // Recurse into child nodes
                ValidateNode(height-1,mGetPtr(inode->m_ptr), false, nullptr, &inode->GetEntry(0).m_key);
                for (ceda::ssize_t i=0 ; i < nk ; ++i)
                {
                    ValidateNode(height-1,mGetPtr(inode->GetEntry(i).m_ptr), false, &inode->GetEntry(i).m_key, 
                        (i == nk-1) ? nullptr : &inode->GetEntry(i+1).m_key);
                }
            }
            else
            {
                LeafNode* leaf = AsLeafNode(node);
                ceda::ssize_t nk = leaf->m_numKeyEntrys;

                // Note that if the root node is a leaf then it may contain zero entries.
                cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
                
                if (!isRoot) cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= nk);
                
                // Check that keys increase from left to right
                for (ceda::ssize_t i=1 ; i < nk ; ++i)
                {
                    cxAssert(_CompareKeysFn(leaf->GetEntry(i-1).first, leaf->GetEntry(i).first) < 0);
                }
                
                // Check that keys are in [*k1,*k2)
                if (nk > 0)
                {
                    if (k1) cxAssert(_CompareKeysFn(*k1, leaf->GetEntry(0).first) <= 0);
                    if (k2) cxAssert(_CompareKeysFn(leaf->GetEntry(nk-1).first, *k2) < 0);
                }
            }
        }

        void _BPlusTree::Validate() const
        {
            if (m_height >= 0) 
            {
                ValidateNode(m_height, mGetPtr(m_root), true, nullptr, nullptr);
            }
        }

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // Delete all nodes

        @if (!_bPersist)
        {
            void _BPlusTree::DeleteNodes(ceda::ssize_t height, Node* node)
            {
                cxAssert(node);
                if (height > 0)
                {
                    InternalNode* inode = AsInternalNode(node);
                    DeleteNodes(height-1,mGetPtr(inode->m_ptr));
                    ceda::ssize_t nk = inode->m_numKeyEntrys;
                    cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);
                    for (ceda::ssize_t i=0 ; i < nk ; ++i)
                    {
                        DeleteNodes(height-1,mGetPtr(inode->GetEntry(i).m_ptr));
                    }
                    delete inode;
                }
                else
                {
                    delete AsLeafNode(node);
                }
            }

            void _BPlusTree::DeleteNodes()
            {
                if (m_height >= 0)
                {
                    DeleteNodes(m_height,mGetPtr(m_root));
                }
            }
        }

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // Calculate total number of nodes for debugging purposes
        
        ceda::ssize_t _BPlusTree::GetNumNodes(ceda::ssize_t height, Node* node) const
        {
            cxAssert(node);
            if (height > 0)
            {
                InternalNode* inode = AsInternalNode(node);
                
                ceda::ssize_t c = 1 + GetNumNodes(height-1,mGetPtr(inode->m_ptr));

                ceda::ssize_t nk = inode->m_numKeyEntrys;
                cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);

                for (ceda::ssize_t i=0 ; i < nk ; ++i)
                {
                    c += GetNumNodes(height-1,mGetPtr(inode->GetEntry(i).m_ptr));
                }
                return c;
            }
            else
            {
                return 1;
            }
        }

        ceda::ssize_t _BPlusTree::GetNumNodes() const
        {
            if (m_height >= 0)
            {
                return GetNumNodes(m_height,mGetPtr(m_root));
            }
            else
            {
                return 0;
            }
        }

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // Write B+Tree to ostream for debugging purpose

        @if (_bWriteToxostream)
        {
            void _BPlusTree::WriteNode(ceda::xostream& os, ceda::ssize_t height, Node* node) const
            {
                cxAssert(node);
                os << '{' << '\n';
                {
                    ceda::Indenter indent(os,4);
                    
                    if (height > 0)
                    {
                        InternalNode* inode = AsInternalNode(node);
                        
                        WriteNode(os,height-1,mGetPtr(inode->m_ptr));

                        ceda::ssize_t nk = inode->m_numKeyEntrys;
                        cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);

                        for (ceda::ssize_t i=0 ; i < nk ; ++i)
                        {
                            os << inode->GetEntry(i).m_key << '\n';
                            WriteNode(os,height-1,mGetPtr(inode->GetEntry(i).m_ptr));
                        }
                    }
                    else
                    {
                        LeafNode* leaf = AsLeafNode(node);
                        ceda::ssize_t nk = leaf->m_numKeyEntrys;
                        cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);

                        for (ceda::ssize_t i=0 ; i < nk ; ++i)
                        {
                            @if (bHavePayload) 
                            {
                                os << leaf->GetEntry(i).first << " --> " << leaf->GetEntry(i).second << '\n';
                            }
                            @else
                            {
                                os << leaf->GetEntry(i).first << '\n';
                            }
                        }
                    }
                }
                os << '}' << '\n';
            }

            void _BPlusTree::Write(ceda::xostream& os) const
            {
                os << "Height = " << m_height << '\n';
                if (m_height >= 0)
                {
                    WriteNode(os,m_height,mGetPtr(m_root));
                }
            }
        }
        
        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // Delete an entry from the B+Tree
        /*
        Merge the given right node into the left node, rendering the right node empty so it may be 
        deleted.  Fix up the double linked list of leaf pages as required for deletion of the right
        node.

        The given left and right nodes must not be nullptr and be consecutive leaf nodes.


                +-------------------------+   next  +-------------------------+  next
                |  K(0)  K(1) ... K(nL-1) |---->----|  K(0)  K(1) ... K(nR-1) |--->---
                |                         |         |                         | 
                |                         |----<----|                         |---<---
                +-------------------------+   prev  +-------------------------+  prev
                          left                                 right
                            
        */
        void _BPlusTree::MergeLeafRightToLeft(LeafNode* left, LeafNode* right)
        {
            cxAssert(left);
            cxAssert(right);
            cxAssert(left->m_next = right);
            cxAssert(right->m_prev = left);
            
            ceda::ssize_t nL = left->m_numKeyEntrys;
            cxAssert(0 <= nL && nL <= MAX_NUM_LEAF_KEY_ENTRYS);
            
            ceda::ssize_t nR = right->m_numKeyEntrys;
            cxAssert(0 <= nR && nR <= MAX_NUM_LEAF_KEY_ENTRYS);
            
            ceda::ssize_t nTot = nL+nR;
            cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= nTot && nTot <= MAX_NUM_LEAF_KEY_ENTRYS);

            // Move all entries from right to left
            MoveLeafEntries(right,0, left,nL, nR);
            
            left->m_numKeyEntrys = nTot;
            @if (bTraceDelete)
            {
                Tracer() << "Merge right to left : Left now has " << nTot << '\n';
                ceda::TraceIndenter indent;
            }

            // Update double linked list of pages as per deletion of right
            left->m_next = right->m_next;
            @if (_bPersist)
            {
                left->MarkAsDirtyWithoutTrace();
            }
            if (right->m_next)
            {
                cxAssert(m_lastLeaf != right);
                cxAssert(right->m_next->m_prev == right);
                right->m_next->m_prev = left;
                @if (_bPersist)
                {
                    right->m_next->MarkAsDirtyWithoutTrace();
                }
            }
            else
            {
                cxAssert(m_lastLeaf == right);
                m_lastLeaf = left;
                @if (_bPersist)
                {
                    MarkAsDirtyWithoutTrace();
                }
            }

            right->m_numKeyEntrys = 0;

            @if (_bPersist)
            {
                @if (_bKeyMayHavePrefs || _bPayLoadMayHavePrefs)
                {
                    // It is important to zero the pointers in 'right' before it is asynchronously
                    // deleted or else the deletion framework (which visits prefs in order to 
                    // recursively delete whole subtrees) may delete more than we bargained for!
                    right->m_prev = nullptr;
                    right->m_next = nullptr;
                    right->MarkAsDirtyWithoutTrace();
                    ceda::AsyncPermanentlyDeleteSubTree(right);
                }
                @else
                {        
                    @if (CEDA_VALIDATE_MARK_AS_DIRTY)
                    {
                        right->MarkAsDirtyWithoutTrace();
                    }
                    ceda::AsyncPermanentlyDeleteObject(right);
                }
            }
            @else
            {
                delete right;
            }
        }

        // Find the index of the largest key less than or equal to the given old key, and replace that key
        // with the given new key value.  This should be compatible with the existing ordering of the keys.
        void _BPlusTree::ReplaceKeyInAnchor(InternalNode* node, const _Key& oldKey, const _Key& newKey)
        {
            cxAssert(node);
            cxAssert(_CompareKeysFn(oldKey,newKey) != 0);

            ceda::ssize_t i = node->FindChildIndexGivenKey(oldKey);
            cxAssert(0 <= i && i < node->m_numKeyEntrys);

            node->GetEntry(i).m_key = newKey;
            @if (_bPersist)
            {
                node->MarkAsDirtyWithoutTrace();
            }

            // Ensure that new key is compatible with ordering constraint on the keys
            if (i > 0)
            {
                cxAssert(_CompareKeysFn(node->GetEntry(i-1).m_key, newKey) < 0);
            }
            if (i+1 < node->m_numKeyEntrys)
            {
                cxAssert(_CompareKeysFn(node->GetEntry(i+1).m_key, newKey) > 0);
            }
        }

        /*
        Called because left has underflowed and therefore entries will be transferred from right 
        (which is assumed to have enough surplus entries)
        The given left and right nodes must not be nullptr and be consecutive leaf nodes.


                                  shift ns entrys from right to left
                                         +-----------------+
                                         |                 |
                                         |           |<----------->|
                                        \|/                ns (number to shift)
                +-------------------------+   next  +-------------------------+
                |  K(0)  K(1) ... K(nL-1) |---->----|  K(0)  K(1) ... K(nR-1) |
                |                         |         |                         | 
                |                         |----<----|                         |
                +-------------------------+   prev  +-------------------------+
                          left                                 right
                      (has a deficit)                      (has a surplus)


        Updates the key recorded in the given anchor node
        */
        void _BPlusTree::BalanceLeafRightToLeft(LeafNode* left, LeafNode* right, InternalNode* anchor)
        {
            @if (bTraceDelete)
            {
                Tracer() << "BalanceLeafRightToLeft\n";
                ceda::TraceIndenter indent;
            }
            cxAssert(left);
            cxAssert(right);
            cxAssert(anchor);
            cxAssert(left->m_next = right);
            cxAssert(right->m_prev = left);

            // Left has a deficit
            ceda::ssize_t nL = left->m_numKeyEntrys;
            cxAssert(0 <= nL && nL < MIN_NUM_LEAF_KEY_ENTRYS);
            
            // Right has a surplus
            ceda::ssize_t nR = right->m_numKeyEntrys;
            cxAssert(MIN_NUM_LEAF_KEY_ENTRYS < nR && nR <= MAX_NUM_LEAF_KEY_ENTRYS);

            ceda::ssize_t newNumLeft = (nL + nR)/2;
            cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= newNumLeft && newNumLeft <= MAX_NUM_LEAF_KEY_ENTRYS);

            // Number to shift
            ceda::ssize_t ns = newNumLeft - nL;
            cxAssert(ns > 0);

            ceda::ssize_t newNumRight = nR - ns;
            cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= newNumRight && newNumRight <= MAX_NUM_LEAF_KEY_ENTRYS);
            
            // Move entries from right to left until they have about the same size
            MoveLeafEntries(right,0, left,nL, ns);
            
            // Shift remaining entries in right down to position 0
            ShiftLeafEntries(right,ns,0,newNumRight);
            
            left->m_numKeyEntrys = newNumLeft;
            right->m_numKeyEntrys = newNumRight;
            
            @if (_bPersist)
            {
                left->MarkAsDirtyWithoutTrace();
                right->MarkAsDirtyWithoutTrace();
            }

            // Previously the key at index position 0 in the right leaf node
            const _Key& oldkey = left->GetEntry(nL).first;
            
            // The new key at index position 0 in the right leaf node
            const _Key& newkey = right->GetEntry(0).first;

            // The separator in the anchor has to be adjusted as required
            ReplaceKeyInAnchor(anchor, oldkey, newkey);
        }

        /*
        Called because right has underflowed and therefore entries will be transferred from left 
        (which is assumed to have enough surplus entries)

        The given left and right nodes must not be nullptr and be consecutive leaf nodes.

                                  shift ns entrys from left to right
                                      +-------------------+
                                      |                   |
                                      |                   |
                                 |<-------->|            \|/   
                   +--------------------------+   next  +-------------------------+
                   |  K(0) K(1)   ... K(nL-1) |---->----|  K(0)  K(1) ... K(nR-1) |
                   |                          |         |                         | 
                   |                          |----<----|                         |
                   +--------------------------+   prev  +-------------------------+
                              left                                 right
                          (has a surplus)                      (has a deficit)

        Updates the key recorded in the given anchor node
        */
        void _BPlusTree::BalanceLeafLeftToRight(LeafNode* left, LeafNode* right, InternalNode* anchor)
        {
            @if (bTraceDelete)
            {
                Tracer() << "BalanceLeafLeftToRight\n";
                ceda::TraceIndenter indent;
            }

            cxAssert(left);
            cxAssert(right);
            cxAssert(anchor);
            cxAssert(left->m_next = right);
            cxAssert(right->m_prev = left);

            // Left has a surplus
            ceda::ssize_t nL = left->m_numKeyEntrys;
            cxAssert(MIN_NUM_LEAF_KEY_ENTRYS < nL && nL <= MAX_NUM_LEAF_KEY_ENTRYS);
            
            // Right has a deficit
            ceda::ssize_t nR = right->m_numKeyEntrys;
            cxAssert(0 <= nR && nR < MIN_NUM_LEAF_KEY_ENTRYS);

            ceda::ssize_t newNumRight = (nL + nR)/2;
            cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= newNumRight && newNumRight <= MAX_NUM_LEAF_KEY_ENTRYS);

            ceda::ssize_t ns = newNumRight - nR;
            cxAssert(ns > 0);

            ceda::ssize_t newNumLeft = nL - ns;
            cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= newNumLeft && newNumLeft <= MAX_NUM_LEAF_KEY_ENTRYS);

            // Shift entries in right upwards from position 0
            ShiftLeafEntries(right,0,ns,nR);
            
            // Move entries from left to right
            MoveLeafEntries(left,newNumLeft, right,0, ns);
            
            left->m_numKeyEntrys = newNumLeft;
            right->m_numKeyEntrys = newNumRight;
            
            @if (_bPersist)
            {
                left->MarkAsDirtyWithoutTrace();
                right->MarkAsDirtyWithoutTrace();
            }

            // Previously the key at index position 0 in the right leaf node
            const _Key& oldkey = right->GetEntry(ns).first;
            
            // The new key at index position 0 in the right leaf node
            const _Key& newkey = right->GetEntry(0).first;

            // The separator in the anchor has to be adjusted as required
            ReplaceKeyInAnchor(anchor, oldkey, newkey);
        }

        /*
        The given left and right nodes must not be nullptr and be consecutive internal nodes with the given
        anchor node.

        Merge the given right node into the left node, rendering the right node empty so it may be 
        deleted.

        This function doesn't actually remove the relevant key K(i) from the anchor node.

                                     +--------------------+
                                     |            K(i)    |   (anchor)
                                     +--/------------\----+ 
                                       /              \
                                     ...              ...
                                     /                  \
              +-------------------------------+    +------------------------------+
              |       K(0)  K(1) ... K(i-1)   |    |     K(i+1)   K(i+2) ...      |
              |   /    \     \         \      |    |     /  \        \            |
              |  P(-1)  P(0)  P(1) ... P(i-1) |    |  P(i)   P(i+1)   P(i+2) ...  |
              |                               |    |                              |
              +-------------------------------+    +------------------------------+
                          left                                  right

        The merge process involves the following steps
            1.  Find the key K(i) in the anchor
            2.  Copy K(i), P(i) to the new position in left
            3.  Copy all entries from right to left
        */
        void _BPlusTree::MergeInternalRightToLeft(InternalNode* left, InternalNode* right, InternalNode* anchor)
        {
            cxAssert(left);
            cxAssert(right);
            cxAssert(anchor);

            ceda::ssize_t nL = left->m_numKeyEntrys;
            cxAssert(0 <= nL && nL < MAX_NUM_INTERNAL_KEY_ENTRYS);
            
            ceda::ssize_t nR = right->m_numKeyEntrys;
            cxAssert(1 <= nR && nR < MAX_NUM_INTERNAL_KEY_ENTRYS);
            
            ceda::ssize_t nTot = nL + nR + 1;
            cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= nTot && nTot <= MAX_NUM_INTERNAL_KEY_ENTRYS);

            @if (bTraceDelete)
            {
                Tracer() << "Merge right to left : nL= " << nL << "  nR= " << nR << '\n';
                ceda::TraceIndenter indent;
            }

            // Get index position of the key in the anchor.  This is the largest key below right->GetEntry(0).m_key
            ceda::ssize_t ai = anchor->FindChildIndexGivenKey(right->GetEntry(0).m_key);
            cxAssert(0 <= ai && ai < anchor->m_numKeyEntrys);
            const _Key& anchorKey = anchor->GetEntry(ai).m_key;
            
            if (nL > 0) cxAssert(_CompareKeysFn(left->GetEntry(nL-1).m_key, anchorKey) < 0);
            cxAssert(_CompareKeysFn(anchorKey, right->GetEntry(0).m_key) < 0);

            // Copy K(i), P(i) to the new position in left
            CopyConstructKey(left->GetEntry(nL).m_key, anchorKey);
            left->GetEntry(nL).m_ptr = right->m_ptr;

            // Copy all entries from right to left
            MoveInternalEntries(right,0, left,nL+1, nR);
            
            left->m_numKeyEntrys = nTot;
            @if (_bPersist)
            {
                left->MarkAsDirtyWithoutTrace();
            }
            
            @if (bTraceDelete)
            {
                Tracer() << "Merge right to left : Left now has " << nTot << '\n';
            }

            right->m_numKeyEntrys = 0;

            @if (_bPersist)
            {
                @if (_bKeyMayHavePrefs)
                {
                    // It is important to zero the pointers in right before it is asynchronously
                    // deleted or else the deletion framework (which visits prefs in order to 
                    // recursively delete whole subtrees) may delete more than we bargained for!
                    right->m_ptr = nullptr;
                    right->MarkAsDirtyWithoutTrace();
                    ceda::AsyncPermanentlyDeleteSubTree(right);
                }
                @else
                {
                    @if (CEDA_VALIDATE_MARK_AS_DIRTY)
                    {
                        right->MarkAsDirtyWithoutTrace();
                    }
                    ceda::AsyncPermanentlyDeleteObject(right);
                }
            }
            @else
            {
                delete right;
            }
        }

        /*
        left and right are consecutive internal nodes with the given anchor node.

        A deletion from right has caused it to underflow.  Therefore we want to rebalance by moving entries
        from the left node to the right node.  It is assumed that left has sufficient surplus keys.

                                     +--------------------+
                                     |            K(i)    |   (anchor)
                                     +--/------------\----+ 
                                       /              \
                                     ...              ...
                                     /                  \
              +-------------------------------+    +------------------------------+
              |       K(0)  K(1) ... K(i-1)   |    |     K(i+1)   K(i+2) ...      |
              |   /    \     \         \      |    |     /  \        \            |
              |  P(-1)  P(0)  P(1) ... P(i-1) |    |  P(i)   P(i+1)   P(i+2) ...  |
              |                               |    |                              |
              +-------------------------------+    +------------------------------+
                          left                                  right
                     (has a surplus)                       (has a deficit)

        It is useful to conceptualise the merge of the left and right nodes.  The above figure has been
        labelled accordingly.

        We see that the anchor contains the key K(i) associated with the left most sub-tree P(i) of the 
        right node.  Therefore K(i) needs to be copied into right as entries are shifted from left to 
        right.

        The shift process involves the following steps
            1.  Find the key K(i) in the anchor
            2.  Calculate the number of entries to be shifted from left to right
            3.  Shift the entries in right upwards
            4.  Move K(i), P(i) to the new position in right
            5.  Move entries from left to right
            6.  Set the new anchor key value and right->m_ptr.
        */
        void _BPlusTree::BalanceInternalLeftToRight(InternalNode* left, InternalNode* right, InternalNode* anchor)
        {
            cxAssert(left);
            cxAssert(right);
            cxAssert(anchor);

            // Left has a surplus
            ceda::ssize_t nL = left->m_numKeyEntrys;
            cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS < nL && nL <= MAX_NUM_INTERNAL_KEY_ENTRYS);
            
            // Right has a deficit
            ceda::ssize_t nR = right->m_numKeyEntrys;
            cxAssert(1 <= nR && nR < MIN_NUM_INTERNAL_KEY_ENTRYS);
            
            ceda::ssize_t newNumRight = (nL + nR)/2;
            cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= newNumRight && newNumRight <= MAX_NUM_INTERNAL_KEY_ENTRYS);

            ceda::ssize_t ns = newNumRight - nR;
            cxAssert(ns > 0);

            ceda::ssize_t newNumLeft = nL - ns;
            cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= newNumLeft && newNumLeft <= MAX_NUM_INTERNAL_KEY_ENTRYS);
            
            @if (bTraceDelete)
            {
                Tracer() << "BalanceInternalLeftToRight : nL=" << nL << "  nR=" << nR << " new nR=" << newNumRight
                     << "  ns= " << ns << " new nL= " << newNumLeft << '\n';
                ceda::TraceIndenter indent;
            }

            // Get index position of the key in the anchor.  This is the largest key below right->GetEntry(0).m_key
            ceda::ssize_t ai = anchor->FindChildIndexGivenKey(right->GetEntry(0).m_key);
            cxAssert(0 <= ai && ai < anchor->m_numKeyEntrys);
            _Key& anchorKey = anchor->GetEntry(ai).m_key;
            
            @if (bTraceDelete)
            {
                Tracer() << "ai = " << ai << '\n';
            }
            
            cxAssert(_CompareKeysFn(left->GetEntry(nL-1).m_key, anchorKey) < 0);
            cxAssert(_CompareKeysFn(anchorKey, right->GetEntry(0).m_key) < 0);

            /*
            Note that we move the anchor key K(i) down into right, and move the new anchor 
            key K(newNumLeft) out of left, without needing any copy construction or copy assignment
            operations in order to duplicate key values.
            */

            // Shift all nR entries in right upwards from position 0 to position ns
            ShiftInternalEntries(right,0,ns,nR);
            
            // Move the anchor key downwards into right to position ns-1
            MoveKey(right->GetEntry(ns-1).m_key, anchorKey);
            right->GetEntry(ns-1).m_ptr = right->m_ptr;
            
            // Move ns-1 entries from left to right
            MoveInternalEntries(left,newNumLeft+1, right,0, ns-1);
            
            // Move new anchor key upwards out of left
            MoveKey(anchorKey, left->GetEntry(newNumLeft).m_key);
            right->m_ptr = left->GetEntry(newNumLeft).m_ptr;
            
            left->m_numKeyEntrys = newNumLeft;
            right->m_numKeyEntrys = newNumRight;
            
            @if (_bPersist)
            {
                anchor->MarkAsDirtyWithoutTrace();
                left->MarkAsDirtyWithoutTrace();
                right->MarkAsDirtyWithoutTrace();
            }
        }

        /*
        left and right are consecutive internal nodes with the given anchor node.

        A deletion from left has caused it to underflow.  Therefore we want to rebalance by moving entries
        from the right node to the left node.  It is assumed that right has sufficient surplus keys.

                                     +--------------------+
                                     |            K(i)    |   (anchor)
                                     +--/------------\----+ 
                                       /              \
                                     ...              ...
                                     /                  \
              +-------------------------------+    +------------------------------+
              |       K(0)  K(1) ... K(i-1)   |    |     K(i+1)   K(i+2) ...      |
              |   /    \     \         \      |    |     /  \        \            |
              |  P(-1)  P(0)  P(1) ... P(i-1) |    |  P(i)   P(i+1)   P(i+2) ...  |
              |                               |    |                              |
              +-------------------------------+    +------------------------------+
                          left                                  right
                     (has a deficit)                       (has a surplus)

        It is useful to conceptualise the merge of the left and right nodes.  The above figure has been
        labelled accordingly.

        We see that the anchor contains the key K(i) associated with the left most sub-tree P(i) of the 
        right node.  Therefore K(i) needs to be copied into left as entries are shifted from right to 
        left.

        The shift process involves the following steps
            1.  Calculate ns = number of entries to be shifted to left
            2.  Find the key K(i) in the anchor
            3.  Copy K(i), P(i) to the new position in left
            4.  Copy ns-1 entries from right to left
            5.  Set the new anchor key value and right->m_ptr.
            6.  Shift the entries in right downwards to position 0
        */
        void _BPlusTree::BalanceInternalRightToLeft(InternalNode* left, InternalNode* right, InternalNode* anchor)
        {
            cxAssert(left);
            cxAssert(right);
            cxAssert(anchor);

            // Left has a deficit
            ceda::ssize_t nL = left->m_numKeyEntrys;
            cxAssert(0 <= nL && nL < MIN_NUM_INTERNAL_KEY_ENTRYS);
            
            // Right has a surplus
            ceda::ssize_t nR = right->m_numKeyEntrys;
            cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS < nR && nR <= MAX_NUM_INTERNAL_KEY_ENTRYS);
            
            ceda::ssize_t newNumLeft = (nL + nR)/2;
            cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= newNumLeft && newNumLeft <= MAX_NUM_INTERNAL_KEY_ENTRYS);

            ceda::ssize_t ns = newNumLeft - nL;
            cxAssert(ns > 0);

            ceda::ssize_t newNumRight = nR - ns;
            cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= newNumRight && newNumRight <= MAX_NUM_INTERNAL_KEY_ENTRYS);

            // Get index position of the key in the anchor.  This is the largest key below right->GetEntry(0).m_key
            ceda::ssize_t ai = anchor->FindChildIndexGivenKey(right->GetEntry(0).m_key);
            cxAssert(0 <= ai && ai < anchor->m_numKeyEntrys);
            _Key& anchorKey = anchor->GetEntry(ai).m_key;
            
            if (nL > 0) cxAssert(_CompareKeysFn(left->GetEntry(nL-1).m_key, anchorKey) < 0);
            cxAssert(_CompareKeysFn(anchorKey, right->GetEntry(0).m_key) < 0);
            
            /*
            Note that we move the anchor key K(i) down into left, and move the new anchor 
            key out of right, without needing any key assignment operations.
            */

            // Copy K(i), P(i) to the new position in left
            MoveKey(left->GetEntry(nL).m_key, anchorKey);
            left->GetEntry(nL).m_ptr = right->m_ptr;

            // Copy ns-1 entries from right to left
            MoveInternalEntries(right,0, left,nL+1, ns-1);
            
            // Set the new anchor key value and right->m_ptr.
            MoveKey(anchorKey, right->GetEntry(ns-1).m_key);
            right->m_ptr = right->GetEntry(ns-1).m_ptr;
            
            // Shift the entries in right downwards to position 0
            ShiftInternalEntries(right,ns,0,newNumRight);
            
            left->m_numKeyEntrys = newNumLeft;
            right->m_numKeyEntrys = newNumRight;
            
            @if (_bPersist)
            {
                anchor->MarkAsDirtyWithoutTrace();
                left->MarkAsDirtyWithoutTrace();
                right->MarkAsDirtyWithoutTrace();
            }
        }

        /*
        height is the height of thisNode (so height = 0 indicates that thisNode is a leaf node).
        lNode is the node at the same height to the left of thisNode.
        lNode is nullptr if there is no node to the left of thisNode

        rNode is the node at the same height to the right of thisNode.
        rNode is nullptr if there is no node to the right of thisNode

        lAnchor is the anchor node between lNode and thisNode.  lAnchor=nullptr <=> lNode=nullptr
        rAnchor is the anchor node between rNode and thisNode.  rAnchor=nullptr <=> rNode=nullptr
        */
        bool _BPlusTree::RecursiveDelete(
            ceda::ssize_t height, Node* thisNode, Node* lNode, Node* rNode, 
            InternalNode* lAnchor, InternalNode* rAnchor,
            InternalNode* parent,
            Node*& deleteNode,
            const _Key& key)
        {
            cxAssert(height >= 0);
            cxAssert(thisNode);
            cxAssert( (lNode == nullptr) == (lAnchor == nullptr) );
            cxAssert( (rNode == nullptr) == (rAnchor == nullptr) );

            @if (bTraceDelete)
            {
                Tracer() << "RecursiveDelete.  height = " << height << " key = " << key << '\n';
                ceda::TraceIndenter indent(2);
            }
            
            if (height > 0)
            {
                Node* next_lNode;
                Node* next_rNode;
                InternalNode* next_lAnchor;
                InternalNode* next_rAnchor;
                
                InternalNode* iThisNode = AsInternalNode(thisNode);
                
                std::pair<Node*,ceda::ssize_t> r = iThisNode->FindChildGivenKey2(key);
                cxAssert(-1 <= r.second && r.second < iThisNode->m_numKeyEntrys);
                Node* nextNode = r.first;
                cxAssert(nextNode);
                
                if (r.second == -1)
                {
                    cxAssert(mGetPtr(iThisNode->m_ptr) == nextNode);
                    if (lNode)
                    {
                        //next_lNode = greatest entry ptr of lNode;
                        InternalNode* ilNode = AsInternalNode(lNode);
                        cxAssert(ilNode->m_numKeyEntrys > 0);
                        next_lNode = mGetPtr(ilNode->GetEntry(ilNode->m_numKeyEntrys-1).m_ptr);
                    }
                    else
                    {
                        cxAssert(!lAnchor);
                        next_lNode = nullptr;
                    }
                    next_lAnchor = lAnchor;
                }
                else
                {
                    cxAssert(mGetPtr(iThisNode->m_ptr) != nextNode);
                    cxAssert(mGetPtr(iThisNode->GetEntry(r.second).m_ptr) == nextNode);

                    // next_lNode = entry ptr that immediately precedes nextNode
                    next_lNode = (r.second == 0) ? mGetPtr(iThisNode->m_ptr) : mGetPtr(iThisNode->GetEntry(r.second-1).m_ptr);

                    next_lAnchor = iThisNode;
                }
                
                if (r.second == iThisNode->m_numKeyEntrys-1)
                {
                    if (rNode)
                    {
                        // next_rNode = least entry ptr of rNode
                        next_rNode = mGetPtr(AsInternalNode(rNode)->m_ptr);
                    }
                    else
                    {
                        cxAssert(!rAnchor);
                        next_rNode = nullptr;
                    }
                    next_rAnchor = rAnchor;
                }
                else
                {
                    // next_rNode = entry ptr that immediately follows nextNode
                    next_rNode = mGetPtr(iThisNode->GetEntry(r.second+1).m_ptr);
                    
                    next_rAnchor = iThisNode;
                }
                
                Node* childDeleteNode = nullptr;
                
                if (!RecursiveDelete(height-1, nextNode, next_lNode, next_rNode, next_lAnchor, next_rAnchor, iThisNode, childDeleteNode, key))
                {
                    // Deletion failed because entry not found
                    return false;
                }
                
                if (childDeleteNode)
                {
                    // remove deleteNode from thisNode, for now ignoring underflow
                    
                    ceda::ssize_t deletePos = (childDeleteNode == nextNode) ? r.second : r.second+1;
                    @if (bTraceDelete)
                    {
                        Tracer() << "Internal node deletePos = " << deletePos << '\n';
                        ceda::TraceIndenter indent;
                    }
                    
                    // The left most sub-tree under thisNode (i.e. associated with thisNode->m_ptr) is never 
                    // deleted, because merging of a node into its neighbour may only occur when they are 
                    // consecutive nodes under the same parent, and the right node is always merged into 
                    // the left rather than the reverse.
                    cxAssert(deletePos != -1);
                    
                    cxAssert(0 <= deletePos && deletePos < iThisNode->m_numKeyEntrys);
                    
                    // Erase entry from thisNode
                    DestructInternalEntry(iThisNode,deletePos);
                    ShiftInternalEntries(iThisNode,deletePos+1,deletePos,iThisNode->m_numKeyEntrys-1-deletePos);

                    --iThisNode->m_numKeyEntrys;
                    @if (_bPersist)
                    {
                        iThisNode->MarkAsDirtyWithoutTrace();
                    }
                    
                    if (parent)
                    {
                        cxAssert(height < m_height);
                        
                        if (iThisNode->m_numKeyEntrys < MIN_NUM_INTERNAL_KEY_ENTRYS)
                        {
                            // thisNode has underflowed

                            ceda::ssize_t s1 = (lNode) ? AsInternalNode(lNode)->m_numKeyEntrys : 0;
                            ceda::ssize_t s2 = (rNode) ? AsInternalNode(rNode)->m_numKeyEntrys : 0;
                            bool useLeft = s1 > s2;
                            ceda::ssize_t maxs = (useLeft) ? s1 : s2;
                            
                            if (maxs > MIN_NUM_INTERNAL_KEY_ENTRYS)
                            {
                                if (useLeft)
                                {
                                    // Move surplus entries from lNode to thisNode
                                    @if (bTraceDelete)
                                    {
                                        Tracer() << "BalanceInternalLeftToRight\n";
                                        ceda::TraceIndenter indent;
                                    }
                                    BalanceInternalLeftToRight(AsInternalNode(lNode), iThisNode, lAnchor);
                                }
                                else
                                {
                                    // Move surplus entries from rNode to thisNode
                                    @if (bTraceDelete)
                                    {
                                        Tracer() << "BalanceInternalRightToLeft\n";
                                        ceda::TraceIndenter indent;
                                    }
                                    BalanceInternalRightToLeft(iThisNode, AsInternalNode(rNode), rAnchor);
                                }
                                
                                // No more nodes need removal
                                deleteNode = nullptr;
                            }
                            else
                            {
                                if (parent == lAnchor)
                                {
                                    @if (bTraceDelete)
                                    {
                                        Tracer() << "MergeInternalRightToLeft with lAnchor\n";
                                        ceda::TraceIndenter indent;
                                    }
                                    MergeInternalRightToLeft(AsInternalNode(lNode), iThisNode, lAnchor);
                                    
                                    // thisNode has been emptied and needs to be deleted from its parent node
                                    deleteNode = iThisNode;
                                }
                                else
                                {
                                    // At least one of the two anchors must be this node's parent
                                    cxAssert(parent == rAnchor);

                                    @if (bTraceDelete)
                                    {
                                        Tracer() << "MergeInternalRightToLeft with rAnchor\n";
                                        ceda::TraceIndenter indent;
                                    }
                                    MergeInternalRightToLeft(iThisNode, AsInternalNode(rNode), rAnchor);

                                    // rNode has been emptied and needs to be deleted from its parent node.
                                    deleteNode = rNode;
                                }
                            }
                        }
                    }
                    else 
                    {
                        cxAssert(height == m_height);
                        cxAssert(mGetPtr(m_root) == thisNode);

                        // Collapse root
                        if (iThisNode->m_numKeyEntrys == 0)
                        {
                            @if (bTraceDelete)
                            {
                                Tracer() << "Collapse root\n";
                            }
                            m_root = iThisNode->m_ptr;
                            
                            @if (_bPersist)
                            {
                                @if (_bKeyMayHavePrefs)
                                {
                                    // It is important to zero the pointers in iThisNode before it is asynchronously
                                    // deleted or else the deletion framework (which visits prefs in order to 
                                    // recursively delete whole subtrees) may delete more than we bargained for!
                                    iThisNode->m_ptr = nullptr;
                                    iThisNode->MarkAsDirtyWithoutTrace();
                                    ceda::AsyncPermanentlyDeleteSubTree(iThisNode);
                                }
                                @else
                                {
                                    ceda::AsyncPermanentlyDeleteObject(iThisNode);
                                }
                            }
                            @else
                            {
                                delete iThisNode;
                            }
                            
                            --m_height;
                            @if (_bPersist)
                            {
                                MarkAsDirtyWithoutTrace();
                            }
                        }                
                    }
                }
            }
            else
            {
                LeafNode* leaf = AsLeafNode(thisNode);
                ceda::ssize_t nk = leaf->m_numKeyEntrys;
                cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
                
                // Look for exact match to key in leaf node.
                ceda::ssize_t deletePos = SearchLeaf(key, leaf);
                @if (bTraceDelete)
                {
                    Tracer() << "Leaf node deletePos = " << deletePos << '\n';
                    ceda::TraceIndenter indent;
                }
                if (deletePos == -1)
                {
                    // Key not found so delete can't be performed.
                    return false;
                }

                cxAssert(0 <= deletePos && deletePos < leaf->m_numKeyEntrys);

                // Erase entry from leaf node
                DestructLeafEntry(leaf,deletePos);
                ShiftLeafEntries(leaf,deletePos+1,deletePos,nk-1-deletePos);
                
                --leaf->m_numKeyEntrys;
                --m_size;
                @if (_bPersist)
                {
                    leaf->MarkAsDirtyWithoutTrace();
                    MarkAsDirtyWithoutTrace();
                }
                                
                if (m_height == 0) cxAssert(!parent);
                
                if (m_height > 0 && leaf->m_numKeyEntrys < MIN_NUM_LEAF_KEY_ENTRYS)
                {
                    cxAssert(parent);
                    @if (bTraceDelete)
                    {
                        Tracer() << "Leaf has underflowed n=" << leaf->m_numKeyEntrys << '\n';
                        ceda::TraceIndenter indent;
                    }
                    
                    // leaf has underflowed
                    
                    ceda::ssize_t s1 = (lNode) ? AsLeafNode(lNode)->m_numKeyEntrys : 0;
                    ceda::ssize_t s2 = (rNode) ? AsLeafNode(rNode)->m_numKeyEntrys : 0;
                    bool useLeft = s1 > s2;
                    ceda::ssize_t maxs = (useLeft) ? s1 : s2;
                    @if (bTraceDelete)
                    {
                        Tracer() << "s1=" << s1 << "  s2=" << s2 << "  maxs=" << maxs << '\n';
                    }
                    
                    if (maxs > MIN_NUM_LEAF_KEY_ENTRYS)
                    {
                        // Shift entries from the neighbourNode into thisNode.  The separator in the 
                        // anchorNode has to be adjusted as required.
                        
                        if (useLeft)
                        {
                            // Assumes MIN_NUM_LEAF_KEY_ENTRYS > 1
                            cxAssert(leaf->m_numKeyEntrys > 0);

                            // Move surplus entries from lNode to leaf
                            BalanceLeafLeftToRight(AsLeafNode(lNode), leaf, lAnchor);
                        }
                        else
                        {
                            BalanceLeafRightToLeft(leaf, AsLeafNode(rNode), rAnchor);
                        }
                        
                        // No more nodes need removal
                        deleteNode = nullptr;
                    }
                    else
                    {
                        if (parent == lAnchor)
                        {
                            @if (bTraceDelete)
                            {
                                Tracer() << "MergeLeafRightToLeft.  Delete leaf\n";
                                ceda::TraceIndenter indent;
                            }
                            MergeLeafRightToLeft(AsLeafNode(lNode), leaf);
                            
                            // leaf has been emptied and needs to be deleted from its parent node
                            deleteNode = leaf;  
                        }
                        else
                        {
                            // At least one of the two anchors must be this node's parent
                            cxAssert(parent == rAnchor);

                            @if (bTraceDelete)
                            {
                                Tracer() << "MergeLeafRightToLeft.  Delete rNode\n";
                                ceda::TraceIndenter indent;
                            }
                            MergeLeafRightToLeft(leaf, AsLeafNode(rNode));

                            // rNode has been emptied and needs to be deleted from its parent node.
                            deleteNode = rNode;
                        }
                    }
                }
                else
                {
                    deleteNode = nullptr;
                }
            }
            return true;
        }

        bool _BPlusTree::Delete(const _Key& key)
        {
            if (m_height == -1)
            {
                return false;
            }
            else
            {
                Node* deleteNode;
                return RecursiveDelete(m_height, mGetPtr(m_root), nullptr, nullptr, nullptr, nullptr, nullptr, deleteNode, key);
            }
        }

        ///////////////////////////////////////////////////////////////////////////////////////////////////
        // _BPlusTree::IteratorPolicy

        void _BPlusTree::IteratorPolicy::next()
        {
            cxAssert(m_leaf);
            cxAssert(-1 <= m_offset && m_offset < m_leaf->m_numKeyEntrys);
            
            if (++m_offset == m_leaf->m_numKeyEntrys)
            {
                if (LeafNode* next = mGetPtr(m_leaf->m_next))
                {
                    m_leaf = next;
                    m_offset = 0;
                }
            }
        }
        
        void _BPlusTree::IteratorPolicy::prev()
        {
            cxAssert(m_leaf);
            cxAssert(0 <= m_offset && m_offset <= m_leaf->m_numKeyEntrys);
            if (--m_offset < 0)
            {
                if (LeafNode* prev = mGetPtr(m_leaf->m_prev))
                {
                    m_leaf = prev;
                    m_offset = prev->m_numKeyEntrys-1;
                }
            }
        }

        const _BPlusTree::value_type& _BPlusTree::IteratorPolicy::const_deref() const
        {
            cxAssert(m_leaf);
            cxAssert(0 <= m_offset && m_offset < m_leaf->m_numKeyEntrys);
            @if (bHavePayload) 
            {
                const LeafKeyEntry& e = m_leaf->GetEntry(m_offset);
                return reinterpret_cast<const value_type&>(e);
            }
            @else
            {
                return m_leaf->GetEntry(m_offset).first;
            }
        }
        
        @if (bHavePayload) 
        {
            _BPlusTree::value_type& _BPlusTree::IteratorPolicy::deref() const
            {
                // todo: need to mark as dirty as required
                
                cxAssert(m_leaf);
                cxAssert(0 <= m_offset && m_offset < m_leaf->m_numKeyEntrys);
                
                @if (_bPersist)
                {
                    m_leaf->MarkAsDirtyWithoutTrace();
                }
                
                LeafKeyEntry& e = m_leaf->GetEntry(m_offset);
                return reinterpret_cast<value_type&>(e);
            }
        }
    }
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// btreeset

// Make a cref<BTree> look like the BTree itself
template <typename _BTree>
struct btreeset : public cref<_BTree>
{
    typedef _BTree BTree;
    typedef typename BTree::key_type key_type;
    typedef typename BTree::value_type value_type;
    typedef typename BTree::size_type size_type;
    typedef typename BTree::pointer pointer;
    typedef typename BTree::const_pointer const_pointer;
    typedef typename BTree::reference reference;
    typedef typename BTree::const_reference const_reference;

    btreeset() {}
    btreeset(const btreeset& _rhs) 
    {
        this->Set($new BTree(*_rhs));
    }
    btreeset& operator=(const btreeset& _rhs)
    {
        if (this != &_rhs)
        {
            this->Set($new BTree(*_rhs));
        }
        return *this;
    } 
    
    void OnCreate(bool local) { this->Set($new BTree); }
    
    bool operator<(const btreeset& _rhs) const { return **this < *_rhs; }
    bool operator==(const btreeset& _rhs) const { return **this == *_rhs; }
    
    bool operator!=(const btreeset& _rhs) const { return !operator==(_rhs); }
    bool operator<=(const btreeset& _rhs) const { return !_rhs.operator<(*this); }
    bool operator>(const btreeset& _rhs) const { return _rhs.operator<(*this); }
    bool operator>=(const btreeset& _rhs) const { return !operator<(_rhs); }

    typedef typename BTree::const_iterator const_iterator;
    const_iterator begin() const { return (**this).begin(); }
    const_iterator end() const { return (**this).end(); }

    typedef typename BTree::const_reverse_iterator const_reverse_iterator;
    const_reverse_iterator rbegin() const { return (**this).rbegin(); }
    const_reverse_iterator rend() const { return (**this).rend(); }

    typedef typename BTree::iterator iterator;
    iterator begin() { return (**this).begin(); }
    iterator end() { return (**this).end(); }

    typedef typename BTree::reverse_iterator reverse_iterator;
    reverse_iterator rbegin() { return (**this).rbegin(); }
    reverse_iterator rend() { return (**this).rend(); }
    
    const_iterator find(const key_type& key) const { return (**this).find(key); }
    iterator find(const key_type& key) { return (**this).find(key); }
    
    const_iterator lower_bound(const key_type& key) const { return (**this).lower_bound(key); }
    iterator lower_bound(const key_type& key) { return (**this).lower_bound(key); }

    const_iterator upper_bound(const key_type& key) const { return (**this).upper_bound(key); }
    iterator upper_bound(const key_type& key) { return (**this).upper_bound(key); }

    size_type erase(const key_type& key) { return (**this).erase(key); }
    void erase(iterator i) { (**this).erase(i); }
    std::pair<iterator,bool> insert(const value_type& x) { return (**this).insert(x); }
    void clear() { (**this).clear(); }
    bool empty() const { return (**this).empty(); }
    ssize_t size() const { return (**this).size(); }
    ssize_t count(const key_type& key) const { return (**this).count(key); }
    void swap(btreeset& _rhs) { (**this).swap(*_rhs); }
};

///////////////////////////////////////////////////////////////////////////////////////////////////
// btreemap

// Make a cref<BTree> look like the BTree itself
template <typename _BTree>
struct btreemap : public btreeset<_BTree>
{
    using typename btreeset<_BTree>::BTree;
    using typename btreeset<_BTree>::key_type;
    using typename btreeset<_BTree>::value_type;
    using typename btreeset<_BTree>::size_type;
    using typename btreeset<_BTree>::pointer;
    using typename btreeset<_BTree>::const_pointer;
    using typename btreeset<_BTree>::reference;
    using typename btreeset<_BTree>::const_reference;
    typedef typename _BTree::mapped_type mapped_type;

    // Returns the address of the mapped value for the given key, or nullptr if not found
    const mapped_type* find_2(const key_type& key) const { return (**this).rFind(key); }

    const mapped_type& operator[](const key_type& key) const { return (**this)[key]; }
    mapped_type& operator[](const key_type& key) { return (**this)[key]; }
};

} // namespace ceda