BPlusTree.h
// BPlusTree.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2008
#pragma once
#ifndef Ceda_cxUtils_BPlusTree_H
#define Ceda_cxUtils_BPlusTree_H
#include "cxUtils.h"
#include "CedaAssert.h"
#include "Tracer.h"
#include "MakeIterator.h"
#include <utility>
#include <iterator>
#include <cstddef>
#include <string.h>
#define CEDA_BPLUSTREE_TRACE_DELETE 0
/*
B+Tree
------
A B+Tree is a transient map from Key to Payload that achieves good memory by employing a very large fan-out.
B+Tree
/ | \
/ | \
/ Node \
/ / | \ \
/ ... \
/ Node \
/ / | \ \
/ ... \
/ \
Page -->-- Page ... -->-- Page
--<-- --<--
There are two types of node : Leaf or internal nodes. A node never needs to change its type.
Unlike many implementations we don't assume that leaf and internal nodes are "structually compatible".
This allows leaf nodes to directly store the payload irrespective of its size. It also allows for
internal nodes and leaf nodes to have a different fanout. A downside is that the binary code
is larger.
We choose to not store parent pointers in the nodes. Note that recursive algorithms can pass the
parent pointer if required.
Typically the recursive functions pass the height of the current node. This decrements in each
recursive call from parent to child. When the height reaches zero it can be assumed that a leaf
node has been reached. Therefore there is no need for a leaf/internal nodes to store some common
inherited state in order to distinguish internal and leaf nodes.
Apart from the root, the utilisation of a node varies between 50% and 100%. We choose to always
allocate full sized arrays in memory to avoid the CPU overheads of repeated memory allocations as
the number of keys in a node changes.
Weak pointers
-------------
Let's assume that the tree structure (i.e. strong pointers) is dictated by the internal nodes. Then
the weak pointers are
1. ptrs to the first and last page in the B+Tree. These are "downwards" weak pointers
2. ptrs to the prev and next page in each page. These are "sideways" weak pointers.
Concurrency
-----------
Ceda assumes that high concurrency can be obtained by using small, fined-grained mutative
transactions that only need to briefly hold an exclusive write lock on the entire PSpace.
The time to gain an exclusive write lock is only about 1us. Non algorithmic mutative
transactions can be performed in only a few microseconds if there is no need to block on
I/O. Write caching solves part of the problem - an object can be marked as dirty but not
written to disk as part of the transaction, because we relax the durability requirement
(cf ACID requirements for a DBMS).
Consider a BTree with a billion records, and we have a number of "writer" threads that
independently want to add thousands of entries to the BTree as quickly as possible. A naive
approach won't achieve a good transaction throughput: the problem is that in practise, when a
writer thread has gained an exclusive lock it will often block on I/O in order to read the leaf
node from disk (because it is not currently resident in memory). Note that we can't depend on keys
being clustered.
Our solution is for the BTree to employ delta-insertion (dI) and delta-deletion (dD) maps.
Each is implemented with an STL map (and hence employs a red-black tree). In practise these
delta maps will accumulate a few thousand entries. The idea is to allow an insert or delete
on a B-Tree to be post-poned, by merely adding an entry to either dI or dD. Note that dI
and dD persist on disk and tend to remain resident in memory. The look up function on a
BTree needs to check the dI and dD sets before attempting a recursive descent through the
actual BTree itself.
It is assumed that a separate worker thread is reponsible for processing the dI and dD sets
so they don't become too big. This is done so as to avoid blocking on I/O when an exclusive
write lock is held.
Let's consider the asynchronous processing of dI. Note firstly that an STL map implicitly sorts
the entries by keys. This is rather convenient because it means that as we iterate through the map
and cause BTree nodes to be loaded from disk we will tend to find the required nodes have already
been loaded. We may even be fortunate enough to process a batch of keys that all need to be
inserted into the same leaf node.
Consider that the worker thread selects a "batch" of keys from dI that it intends to process.
This corresponds to a range [k1,k2]. Then it opens a shared read lock and loads the relevant BTree
nodes into memory. This is achieved by recursing through the BTree as if there is an intention
to read all the leaf pages that overlap [k1,k2]. During this recurse, the nodes are touched
and therefore shouldn't be evicted in the near future.
Next the worker thread gains an exclusive write lock and actually inserts all the entries in the
batch. Assuming that the nodes previously touched during the shared read lock, it will be possible
to process the batch without blocking on I/O.
Assumptions on the keys
-----------------------
The type 'Key' must support all the following
* A default constructor
* A copy constructor
* An assignment operator
* A global function that can compare two keys and returns -1,0,1 in a similar manner
to strcmp().
* operator<< to an Archive
* operator>> from an Archive
* operator<< to an xostream
Assumptions on the payloads
---------------------------
* A default constructor
* A copy constructor
* An assignment operator
* operator<< to an Archive
* operator>> from an Archive
* operator<< to an xostream
Search that inserts entry if not found
--------------------------------------
We require a function that combines insertion and search.
Payload* Find(const Key& key)
This creates a new entry if one doesn't exist already
Multimaps and multisets
-----------------------
The macros takes a boolean flag for whether multiple entries with the same key are permitted.
This affects both leaf nodes and internal nodes.
Search
------
The search for a given key now finds a range of payload entries. This leads to the concept of
lower and upper bounds for the given key.
For performance we want to return both bounds in the same call (rather than traverse
the entire tree twice). In fact, it seems best to directly model the concept of a range.
Currently rFind(key) returns a pointer to the Payload or nullptr if not found. We instead want
to return a pair of iterators.
Alternative approach: Consider instead that we only return the iterator correspondng to the
lower bound, and we assume we subsequently iterate until we discover that the key has changed
(which may be immediately!)
A client writes code like thus:
BTree::iterator i = btree.rFind(k);
while(i != btree.end() && i->key == k)
{
< process i >
++i;
}
Insertion
---------
The splitting of nodes when they overflow is based on the actual number of elements in the
node, and we don't want that to change. So a tree remains balanced even though there are many
repeated keys - even in internal nodes.
We favour insertion on the right of elements with the same key, so that we iterate through them
in the order they were inserted.
Deletion
--------
In the case of a multimap<string,OID> we want to delete the entry with given OID! This means
we need to scan through the range of elements with the given key to find the entry with the
given OID!
Memory leak issue
-----------------
The following code leaks 24 bytes on x86, 48 bytes on x64:
typedef std::map<int,int> MAP;
MAP m;
new (&m) MAP();
It seems that the std::map ctor performs a memory allocation.
As a result of this, our move semantics must be accounted for consistently, and this
for example includes the array of (key,payload) pairs in each leaf node. We must assume
that only the first nk entries have actually been constructed. So only these shall be
destructed.
-------
todo
* Allow for deletion given an iterator
* Allow for deletion of a range using a pair of iterators.
* Allow for batched insertion and deletion
* Allow for caching at each node until a batch is there to be done
* Allow for inserts/deletes while the B+Tree is still transient (i.e. making it persistent
can be delayed).
* Allow for structured keys that reuse common prefixes.
*/
namespace ceda
{
template <typename T>
void DestructArrayElements(T* p, ssize_t n)
{
for (ssize_t i=0 ; i < n ; ++i)
{
p->~T();
++p;
}
}
template <typename T>
void ConstructArrayElements(T* p, ssize_t n)
{
for (ssize_t i=0 ; i < n ; ++i)
{
new(p) T();
++p;
}
}
template <typename T>
void ResizeArrayElements(T* p, ssize_t oldNum, ssize_t newNum)
{
if (newNum > oldNum)
{
// Default construct the extra entrys required
ConstructArrayElements(p + oldNum, newNum - oldNum);
}
else
{
// Destruct the surplus entrys no longer required
DestructArrayElements(p + newNum, oldNum - newNum);
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// BPlusTree
/*
template parameters:
Key, Type for the keys
Payload, Type for the payload
Use 'void' to indicate there is no payload
MIN_INTERNAL_KEYS, Number of keys in internal nodes will be between
MIN_INTERNAL_KEYS and 2*MIN_INTERNAL_KEYS
Must be at least 1
MIN_LEAF_KEYS, Number of keys in leaf nodes will be between
MIN_LEAF_KEYS and 2*MIN_LEAF_KEYS
Must be at least 1
Compare A binary predicate that takes two element keys as arguments and returns a
bool. The expression comp(a,b), where comp is an object of this type and
a and b are key values, shall return true if a is considered to go before b
in the strict weak ordering the function defines.
The map object uses this expression to determine both the order the elements
follow in the container and whether two element keys are equivalent (by
comparing them reflexively: they are equivalent if !comp(a,b) && !comp(b,a)).
No two elements in a map container can have equivalent keys.
This can be a function pointer or a function object (see constructor for an
example). This defaults to less<T>, which returns the same as applying the
less-than operator (a<b).
*/
template <typename Key, typename Payload, int MIN_INTERNAL_KEYS, int MIN_LEAF_KEYS, class Compare = std::less<Key> >
class BPlusTree
{
private:
static bool less(const Key& k1, const Key& k2)
{
Compare c;
return c(k1,k2);
}
static bool equals(const Key& k1, const Key& k2)
{
return !less(k1,k2) && !less(k2,k1);
}
// Let k be the number of keys stored in a given internal node that is not the root
// Then we require MIN_NUM_INTERNAL_KEY_ENTRYS <= k <= MAX_NUM_INTERNAL_KEY_ENTRYS
//
// Let c be the number of child nodes under a given internal node that is not the root
// Then c = k+1
// and we require MIN_NUM_INTERNAL_KEY_ENTRYS+1 <= c <= MAX_NUM_INTERNAL_KEY_ENTRYS+1
//
// For an internal node that is the root, we require
// 1 <= k <= MAX_NUM_INTERNAL_KEY_ENTRYS
// 2 <= c <= MAX_NUM_INTERNAL_KEY_ENTRYS+1
static const int MIN_NUM_INTERNAL_KEY_ENTRYS = MIN_INTERNAL_KEYS;
static const int MAX_NUM_INTERNAL_KEY_ENTRYS = MIN_INTERNAL_KEYS*2;
// Let k be the number of (key,payload) pairs stored in a given leaf node that is not the root
// Then we require MIN_NUM_LEAF_KEY_ENTRYS <= k <= MAX_NUM_LEAF_KEY_ENTRYS
static const int MIN_NUM_LEAF_KEY_ENTRYS = MIN_LEAF_KEYS;
static const int MAX_NUM_LEAF_KEY_ENTRYS = MIN_LEAF_KEYS*2;
typedef void Node;
///////////////////////////////////////////////////////////////////////////////////////////////////
// InternalNode
struct InternalKeyEntry
{
InternalKeyEntry() : m_ptr(nullptr) {}
void swap(InternalKeyEntry& rhs)
{
memswap(this, &rhs, sizeof(InternalKeyEntry));
}
Key m_key;
Node* m_ptr; // Subtree with keys >= m_key
};
class InternalNode
{
public:
InternalNode() : m_ptr(nullptr), m_numKeyEntrys(0) {}
~InternalNode()
{
DestructArrayElements(&GetEntry(0), m_numKeyEntrys);
}
InternalKeyEntry& GetEntry(ssize_t i) { return *((InternalKeyEntry*) m_entrys + i); }
const InternalKeyEntry& GetEntry(ssize_t i) const { return *((const InternalKeyEntry*) m_entrys + i); }
// Find the index of the largest key less than or equal to the given key, or else -1
//
// K(0) K(1) ... K(i-1) K(i) K(i+1) .... K(n-1)
// ^
// |
// key
// Find the child node according to the key. This is infallible because an internal node must
// contain at least one key entry (and two child nodes), and the partitioning based on key is
// well defined.
// Returns the node and its index position. -1 means m_ptr was returned.
std::pair<Node*,ssize_t> FindChildGivenKey2(const Key& key)
{
/*
As an example, for nk = 4 we have the following picture
Key K0 K1 K2 K3
\ \ \ \
Ptr P P0 P1 P2 P3
| | | | |
Range [-inf,K0) [K0,K1) [K1,K2) [K2,K3) [K3,+inf)
*/
ssize_t nk = m_numKeyEntrys;
cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);
if (less(key,GetEntry(0).m_key))
{
// key < K0
return std::pair<Node*,ssize_t>(m_ptr,-1);
}
else
{
/*
Binary search of the node
For convenience let's define K(nk) = +inf
Claim: At each iteration mid = floor((lo+hi)/2) must be in the range [lo,hi)
Proof: By the condition in the while loop we know that lo < hi.
Clearly mid >= lo, and mid <= hi.
Suppose mid = hi. Then hi <= (lo+hi)/2. I.e. 2hi <= lo+hi. I.e. hi <= lo.
This contradicts lo < hi.
Claim: An invariant of the binary search is that the given key is in the half open
interval [ K(lo-1), K(hi) )
Proof (by induction)
1. This invariant is met with the initial settings of lo=1 and hi=nk,
because in this else block we know that key >= K(0) = K(lo-1), and
K(hi) = K(hk) = +inf
2. At each iteration lo < hi therefore mid = floor( (lo+hi)/2 ) must in in the range
[lo,hi).
a. If key >= K(mid) then lo becomes mid+1, and K(lo-1) = K(mid+1-1) = K(mid).
Therefore after changing lo we still have key >= K(lo-1). I.e. the invariant
remains true
b. Otherwise if key < K(mid) then hi is assigned to mid. Therefore key < K(hi)
remains true.
Claim : The binary search exits with lo = hi
Proof :
Initially lo = 1 and hi = nk. Now 1 <= nk therefore lo <= hi
At each iteration mid is in the range [lo,hi). If lo is set to mid+1 then lo <= hi.
Otherwise if hi is set to mid then lo <= mid. Therefore it is not possible for the loop
to exit with lo > hi. It also clearly can't exit with hi < lo. Therefore we deduce
that it must exit with lo = hi.
It only remains to show that it doesn't get stuck in an infinite loop. This follows
because each iteration always increases lo or decreases hi.
*/
ssize_t lo = 1;
ssize_t hi = nk;
while (lo < hi)
{
ssize_t mid = (lo + hi)/2;
cxAssert(1 <= mid && mid < nk);
if (less(key,GetEntry(mid).m_key)) hi = mid;
else lo = mid + 1;
}
cxAssert(lo == hi);
cxAssert(1 <= lo && lo <= nk);
return std::pair<Node*,ssize_t>(GetEntry(lo-1).m_ptr,lo-1);
}
}
// Note that factoring out the code into FindChildGivenKey2() actually causes the search
// function to be about 10% slower for int-->int map with no I/O.
Node* FindChildGivenKey(const Key& key) { return FindChildGivenKey2(key).first; }
ssize_t FindChildIndexGivenKey(const Key& key) { return FindChildGivenKey2(key).second; }
///////////////////// State //////////////////////
Node* m_ptr; // Subtree with keys < GetEntry(0).m_key
ssize_t m_numKeyEntrys;
octet_t m_entrys[MAX_NUM_INTERNAL_KEY_ENTRYS * sizeof(InternalKeyEntry)];
//InternalKeyEntry m_entrys[MAX_NUM_INTERNAL_KEY_ENTRYS];
};
static void DestructInternalEntry(InternalNode* node, ssize_t pos)
{
node->GetEntry(pos).~InternalKeyEntry();
}
static void CopyConstructInternalEntry(InternalNode* node, ssize_t pos, const InternalKeyEntry& e)
{
new (&node->GetEntry(pos)) InternalKeyEntry(e);
}
static void MoveInternalEntries(const InternalNode* src, ssize_t s, InternalNode* dst, ssize_t d, ssize_t count)
{
//Tracer() << "MoveInternalEntries src= " << src << " dst= " << dst << " s= " << s << " d= " << d << " count= " << count << '\n';
memcpy(&dst->GetEntry(d),&src->GetEntry(s), count * sizeof(InternalKeyEntry) );
}
static void ShiftInternalEntries(InternalNode* node, ssize_t s, ssize_t d, ssize_t count)
{
//Tracer() << "ShiftInternalEntries node= " << node << " s= " << s << " d= " << d << " count= " << count << '\n';
memmove(&node->GetEntry(d),&node->GetEntry(s), count * sizeof(InternalKeyEntry) );
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// LeafNode
typedef std::pair<Key, Payload > LeafKeyEntry;
class LeafNode
{
public:
LeafNode() : m_numKeyEntrys(0)
{
m_prev = nullptr;
m_next = nullptr;
}
~LeafNode()
{
DestructArrayElements(&GetEntry(0), m_numKeyEntrys);
}
LeafKeyEntry& GetEntry(ssize_t i) { return *((LeafKeyEntry*) m_entrys + i); }
const LeafKeyEntry& GetEntry(ssize_t i) const { return *((const LeafKeyEntry*) m_entrys + i); }
////////////////// State //////////////////////
// Leaf nodes form a double linked list to allow forwards and reverse iteration in sorted order
LeafNode* m_prev;
LeafNode* m_next;
ssize_t m_numKeyEntrys;
octet_t m_entrys[MAX_NUM_LEAF_KEY_ENTRYS * sizeof(LeafKeyEntry)];
//LeafKeyEntry m_entrys[MAX_NUM_LEAF_KEY_ENTRYS];
};
/////////// LeafKeyEntry
/*
static void MoveLeafKeyEntry(LeafKeyEntry& dst, LeafKeyEntry& src)
{
memcpy(&dst, &src, sizeof(LeafKeyEntry));
}
*/
//////////// LeafNode entries
static void DestructLeafEntry(LeafNode* node, ssize_t pos)
{
node->GetEntry(pos).~LeafKeyEntry();
}
static void CopyConstructLeafEntry(LeafNode* node, ssize_t pos, const LeafKeyEntry& e)
{
new (&node->GetEntry(pos)) LeafKeyEntry(e);
}
static void MoveLeafEntries(const LeafNode* src, ssize_t s, LeafNode* dst, ssize_t d, ssize_t count)
{
//Tracer() << "MoveLeafEntries src= " << src << " dst= " << dst << " s= " << s << " d= " << d << " count= " << count << '\n';
//Tracer() << "count * sizeof(LeafKeyEntry) = " << count * sizeof(LeafKeyEntry) << '\n';
memcpy(&dst->GetEntry(d),&src->GetEntry(s), count * sizeof(LeafKeyEntry) );
}
static void ShiftLeafEntries(LeafNode* node, ssize_t s, ssize_t d, ssize_t count)
{
//Tracer() << "ShiftLeafEntries node= " << node << " s= " << s << " d= " << d << " count= " << count << '\n';
//Tracer() << "count * sizeof(LeafKeyEntry) = " << count * sizeof(LeafKeyEntry) << '\n';
memmove(&node->GetEntry(d),&node->GetEntry(s), count * sizeof(LeafKeyEntry) );
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// Downcasts
static InternalNode* AsInternalNode(Node* n)
{
return (InternalNode*) n;
}
static LeafNode* AsLeafNode(Node* n)
{
return (LeafNode*) n;
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// Moving elements
// These functions assume that keys and payloads are relocateable.
/////////// Key
static void MoveKey(Key& dst, Key& src)
{
memcpy(&dst, &src, sizeof(Key));
}
/*
static void SwapKey(Key& dst, Key& src)
{
memswap(&dst, &src, sizeof(Key));
}
*/
static void CopyConstructKey(Key& dst, const Key& src)
{
new (&dst) Key(src);
}
////////// Payload
static void ConstructPayload(Payload& payload)
{
new (&payload) Payload();
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// IteratorPolicy
// Define both iterators using a given policy
class IteratorPolicy
{
public:
IteratorPolicy() : m_leaf(nullptr), m_offset(0) {}
IteratorPolicy(LeafNode* leaf, ssize_t offset) :
m_leaf(leaf),
m_offset(offset) {}
typedef std::bidirectional_iterator_tag iterator_category;
typedef std::pair<const Key, Payload > value_type;
typedef std::ptrdiff_t difference_type; // just to keep compiler happy
typedef value_type* pointer;
typedef const value_type* const_pointer;
typedef value_type& reference;
typedef const value_type& const_reference;
// Returns true if the iterator is pointing at an actual element
// (i.e. not at one before first or one after the last)
bool can_dereference() const
{
return m_leaf != nullptr && 0 <= m_offset && m_offset < m_leaf->m_numKeyEntrys;
}
bool operator==(const IteratorPolicy& rhs) const
{
return m_offset == rhs.m_offset && m_leaf == rhs.m_leaf;
}
void next()
{
cxAssert(m_leaf);
cxAssert(-1 <= m_offset && m_offset < m_leaf->m_numKeyEntrys);
if (++m_offset == m_leaf->m_numKeyEntrys)
{
if (LeafNode* next = m_leaf->m_next)
{
m_leaf = next;
m_offset = 0;
}
}
}
void prev()
{
cxAssert(m_leaf);
cxAssert(0 <= m_offset && m_offset <= m_leaf->m_numKeyEntrys);
if (--m_offset < 0)
{
if (LeafNode* prev = m_leaf->m_prev)
{
m_leaf = prev;
m_offset = prev->m_numKeyEntrys-1;
}
}
}
value_type& deref() const
{
// todo: need to mark as dirty as required
cxAssert(m_leaf);
cxAssert(0 <= m_offset && m_offset < m_leaf->m_numKeyEntrys);
LeafKeyEntry& e = m_leaf->GetEntry(m_offset);
return reinterpret_cast<value_type&>(e);
}
const value_type& const_deref() const
{
cxAssert(m_leaf);
cxAssert(0 <= m_offset && m_offset < m_leaf->m_numKeyEntrys);
const LeafKeyEntry& e = m_leaf->GetEntry(m_offset);
return reinterpret_cast<const value_type&>(e);
}
private:
/*
Rules for how m_leaf, m_offset are defined:
if (iterator points at an actual element of the B+Tree)
[i.e. it is valid to call operator*()]
{
m_leaf = leaf node containing the element
m_offset = index of the element in m_leaf
Note that 0 <= m_offset < m_leaf->m_numKeyEntrys
}
else if (iterator corresponds to one before the first element
of the B+Tree (i.e. rend()))
{
if (B+Tree is empty)
{
m_leaf = nullptr
m_offset = -1
}
else
{
m_leaf = first leaf node
m_offset = -1
}
{
else (iterator corresponds to one after the last element
of the B+Tree (i.e. end()))
{
if (B+Tree is empty)
{
m_leaf = nullptr
m_offset = 0
}
else
{
m_leaf = last leaf node
m_offset = m_leaf->m_numKeyEntrys
}
}
This approach allows for the bidirectional iterator to represent
and distinguish between "one before first" and "one after last".
Furthermore if the B+Tree is non-empty then:
a) an iterator at "one before first" can be incremented so it
points at first; and
b) an iterator at "one after last" can be decremented so it
points at last.
*/
LeafNode* m_leaf; // Is nullptr if and only if the B+Tree is empty
ssize_t m_offset;
friend class BPlusTree;
};
public:
typedef Key key_type;
typedef Payload mapped_type;
typedef std::pair<const Key, Payload > value_type;
typedef size_t size_type;
typedef std::ptrdiff_t difference_type;
typedef value_type* pointer;
typedef const value_type* const_pointer;
typedef value_type& reference;
typedef const value_type& const_reference;
BPlusTree() :
m_size(0),
m_height(-1)
{
// Due to limitations of the current implementation of the delete code, we cannot support
// the case where MIN_INTERNAL_KEYS = 1 or MIN_LEAF_KEYS = 1.
// The problem is that this case allows for the number of key entries in a node to transition
// to zero, and yet the code in various places assumes there is a 0th entry.
// MergeInternalRightToLeft and BalanceInternalLeftToRight require
// right->m_numKeyEntrys > 0, and RecursiveDelete has a statement that initialises
// oldkey from leaf->GetEntry(0).first after the number of keys has dropped to zero.
// This limitation became apparent after being careful to destruct elements that were no longer
// in use.
// IDEA: This limitation will be lifted if we avoid arrays of elements that inevitably destruct
// all elements. Would be better to only destruct the first m_numKeyEntrys entries.
static_assert(MIN_INTERNAL_KEYS > 1, "bad template parameter for min internal node keys in B+Tree");
static_assert(MIN_LEAF_KEYS > 1, "bad template parameter for min leaf node keys in B+Tree");
//static_assert(MIN_NUM_INTERNAL_KEY_ENTRYS > 0);
//static_assert(MIN_NUM_LEAF_KEY_ENTRYS > 0);
m_firstLeaf = nullptr;
m_lastLeaf = nullptr;
m_root = nullptr;
}
~BPlusTree()
{
DeleteNodes();
}
BPlusTree(const BPlusTree& rhs) :
m_size(0),
m_height(-1)
{
m_firstLeaf = nullptr;
m_lastLeaf = nullptr;
m_root = nullptr;
// todo : inefficient
const_iterator i = rhs.begin();
while(i != rhs.end())
{
insert(*i);
++i;
}
}
BPlusTree& operator=(const BPlusTree& rhs)
{
// Use the copy and swap idiom
BPlusTree temp(rhs);
temp.swap(*this); // Non throwing swap
return *this;
}
void clear()
{
DeleteNodes();
m_firstLeaf = nullptr;
m_lastLeaf = nullptr;
m_root = nullptr;
m_size = 0;
m_height = -1;
}
void swap(BPlusTree& rhs)
{
//todo: assert that rhs belongs to the same PersistStore.
using std::swap;
swap(m_size, rhs.m_size);
swap(m_height, rhs.m_height);
swap(m_root, rhs.m_root);
swap(m_firstLeaf, rhs.m_firstLeaf);
swap(m_lastLeaf, rhs.m_lastLeaf);
}
bool operator==(const BPlusTree& rhs) const
{
if (m_size != rhs.m_size) return false;
const_iterator i = begin();
const_iterator j = rhs.begin();
while(i != end())
{
cxAssert(j != rhs.end());
if (*i != *j) return false;
++i; ++j;
}
cxAssert(j == rhs.end());
return true;
}
bool operator!=(const BPlusTree& rhs) const { return !operator==(rhs); }
bool operator<(const BPlusTree& rhs) const
{
const_iterator i = begin();
const_iterator j = rhs.begin();
while(i != end())
{
if (j == rhs.end())
{
return false;
}
if (*i < *j) return true;
if (*i != *j) return false;
++i; ++j;
}
return j != rhs.end();
}
bool operator<=(const BPlusTree& rhs) const { return !rhs.operator<(*this); }
bool operator>(const BPlusTree& rhs) const { return rhs.operator<(*this); }
bool operator>=(const BPlusTree& rhs) const { return !operator<(rhs); }
ssize_t size() const { return m_size; }
bool empty() const { return m_size == 0; }
///////////////////////////////////////////////////////////////////////////////////////
// iterators
typedef make_iterator<IteratorPolicy> iterator;
iterator begin()
{
return iterator(IteratorPolicy(m_firstLeaf,0));
}
iterator end()
{
if (m_height < 0)
{
return iterator(IteratorPolicy(nullptr,0));
}
else
{
cxAssert(m_lastLeaf);
return iterator(IteratorPolicy(m_lastLeaf, m_lastLeaf->m_numKeyEntrys));
}
}
typedef make_const_iterator<IteratorPolicy> const_iterator;
const_iterator begin() const { return const_cast<BPlusTree*>(this)->begin(); }
const_iterator end() const { return const_cast<BPlusTree*>(this)->end(); }
typedef std::reverse_iterator<const_iterator> const_reverse_iterator;
const_reverse_iterator rbegin() const { return const_reverse_iterator(end()); }
const_reverse_iterator rend() const { return const_reverse_iterator(begin()); }
typedef std::reverse_iterator<iterator> reverse_iterator;
reverse_iterator rbegin() { return reverse_iterator(end()); }
reverse_iterator rend() { return reverse_iterator(begin()); }
///////////////////////////////////////////////////////////////////////////////////////
ssize_t count(const Key& key) const
{
return rFind(key) ? 1 : 0;
}
const_iterator find(const Key& key) const
{
// Find the Leaf node containing the given key
if (Node* node = m_root)
{
cxAssert(m_height >= 0);
for (ssize_t i=0 ; i < m_height ; ++i)
{
// Let node be the child of node according to the key testing
node = AsInternalNode(node)->FindChildGivenKey(key);
cxAssert(node);
}
ssize_t j = SearchLeaf(key,AsLeafNode(node));
return (j == -1) ? end() : const_iterator(IteratorPolicy(AsLeafNode(node), j));
}
else
{
cxAssert(m_height == -1);
return const_iterator(IteratorPolicy(nullptr,0));
}
}
// Note that find doesn't mark the node as dirty. However the act of dereferencing the
// returned iterator will do that.
iterator find(const Key& key)
{
const_iterator i = const_cast<const BPlusTree*>(this)->find(key);
return iterator(i);
//return reinterpret_cast<iterator&>(i);
}
const_iterator lower_bound(const Key& key) const
{
// Find the Leaf node containing the given key
if (Node* node = m_root)
{
cxAssert(m_height >= 0);
for (ssize_t i=0 ; i < m_height ; ++i)
{
// Let node be the child of node according to the key testing
node = AsInternalNode(node)->FindChildGivenKey(key);
cxAssert(node);
}
ssize_t j = LowerBoundInLeaf(key,AsLeafNode(node));
cxAssert(0 <= j && j <= AsLeafNode(node)->m_numKeyEntrys);
if (j == AsLeafNode(node)->m_numKeyEntrys)
{
if (LeafNode* next = AsLeafNode(node)->m_next)
{
return const_iterator(IteratorPolicy(next,0));
}
}
return const_iterator(IteratorPolicy(AsLeafNode(node), j));
}
else
{
cxAssert(m_height == -1);
return const_iterator(IteratorPolicy(nullptr,0));
}
}
iterator lower_bound(const Key& key)
{
const_iterator i = const_cast<const BPlusTree*>(this)->lower_bound(key);
return iterator(i);
//return reinterpret_cast<iterator&>(i);
}
const_iterator upper_bound(const Key& key) const
{
// Find the Leaf node containing the given key
if (Node* node = m_root)
{
cxAssert(m_height >= 0);
for (ssize_t i=0 ; i < m_height ; ++i)
{
// Let node be the child of node according to the key testing
node = AsInternalNode(node)->FindChildGivenKey(key);
cxAssert(node);
}
ssize_t j = UpperBoundInLeaf(key,AsLeafNode(node));
cxAssert(0 <= j && j <= AsLeafNode(node)->m_numKeyEntrys);
if (j == AsLeafNode(node)->m_numKeyEntrys)
{
if (LeafNode* next = AsLeafNode(node)->m_next)
{
return const_iterator(IteratorPolicy(next,0));
}
}
return const_iterator(IteratorPolicy(AsLeafNode(node), j));
}
else
{
cxAssert(m_height == -1);
return const_iterator(IteratorPolicy(nullptr,0));
}
}
iterator upper_bound(const Key& key)
{
const_iterator i = const_cast<const BPlusTree*>(this)->upper_bound(key);
return iterator(i);
//return reinterpret_cast<iterator&>(i);
}
const Payload& operator[](const Key& key) const
{
const Payload* p = rFind(key);
cxAssert(p);
return *p;
}
Payload& operator[](const Key& key)
{
wFindRet ret;
wFind(ret,key,true);
cxAssert(ret.m_payload);
return *ret.m_payload;
}
// Returns number of elements erased (0 or 1)
size_type erase(const Key& key)
{
return Delete(key) ? 1 : 0;
}
void erase(iterator i)
{
// todo: inefficient.
erase(i->first);
}
std::pair<iterator,bool> insert(const value_type& x)
{
wFindRet ret;
wFind(ret,x.first,true);
cxAssert(ret.m_leaf);
if (ret.m_created)
{
cxAssert(ret.m_payload);
*ret.m_payload = x.second;
}
return std::pair<iterator,bool>(iterator(IteratorPolicy(ret.m_leaf, ret.m_index)),ret.m_created);
}
///////////////////////////////////////////////////////////////////////////////////////
// Unconventional API
static ssize_t GetPayloadOffset()
{
return offsetof(LeafKeyEntry,second);
}
// Provides the return value for wFind
struct wFindRet
{
Payload* m_payload; // The payload for the requested key
LeafNode* m_leaf; // Leaf node containing the key
ssize_t m_index; // Index position in m_leaf
bool m_created; // Was a (key,payload) entry created?
};
/*
Find the payload with the given key. Returns the address of the payload in ret.m_payload.
If allowCreate = true then
* If no entry currently exists for the given key this function creates a new entry
(with the payload initialsed using the default ctor).
* ret.m_created indicates whether an entry was created (because no entry with
the given key previously existed)
If allowCreate = false then
* If no entry exists for the given key this function returns
ret.m_payload = nullptr and ret.m_containingNode = null.
* Always returns ret.m_created = false.
*/
void wFind(wFindRet& ret,const Key& key,bool allowCreate)
{
// Hmmm. Should we be so eager?
if (m_height == -1)
{
cxAssert(!m_root);
LeafNode* n = new LeafNode;
m_firstLeaf = m_lastLeaf = n;
m_root = n;
m_height = 0;
}
cxAssert(m_height >= 0);
cxAssert(m_root);
InternalKeyEntry right;
wFindInNode(ret,m_root,m_height,key,right,allowCreate);
if (right.m_ptr)
{
// The root node overflowed into 'right', so the height of the B+Tree will increase by one
// Create a new (internal) root node with one key and two sub-tree pointers.
InternalNode* newRoot = new InternalNode;
newRoot->m_ptr = m_root; // Left ptr to old root
new (&newRoot->GetEntry(0)) InternalKeyEntry();
newRoot->GetEntry(0).swap(right);
newRoot->m_numKeyEntrys = 1;
m_root = newRoot;
++m_height;
}
}
// Insert the given key and payload into the B+Tree. Returns false if an entry with the given
// key is already present.
bool Insert(const Key& key, const Payload& payload, bool allowReplace = true)
{
wFindRet ret;
wFind(ret,key,true);
cxAssert(ret.m_payload);
if (allowReplace || ret.m_created)
{
*ret.m_payload = payload;
}
return ret.m_created;
}
// Calculate total number of nodes for debugging purposes
ssize_t GetNumNodes() const
{
if (m_height >= 0)
{
return GetNumNodes(m_height,m_root);
}
else
{
return 0;
}
}
// Write B+Tree to ostream for debugging purpose
void Write(xostream& os) const
{
os << "Height = " << m_height << '\n';
if (m_height >= 0)
{
WriteNode(os,m_height,m_root);
}
}
// Validate the B+Tree
// Invariants to check
// 1. Keys are in the expected range
// 2. Keys in a node strictly increase from left to right
// 3. Min and max fanout is respected
void Validate() const
{
if (m_height >= 0)
{
ValidateNode(m_height, m_root, true, nullptr, nullptr);
}
}
// Find the payload for given key. Returns nullptr if not found.
const Payload* rFind(const Key& key) const
{
// Find the Leaf node containing the given key
if (Node* node = m_root)
{
cxAssert(m_height >= 0);
for (ssize_t i=0 ; i < m_height ; ++i)
{
// Let node be the child of node according to the key testing
node = AsInternalNode(node)->FindChildGivenKey(key);
cxAssert(node);
}
ssize_t j = SearchLeaf(key,AsLeafNode(node));
return (j == -1) ? nullptr : &AsLeafNode(node)->GetEntry(j).second;
}
else
{
cxAssert(m_height == -1);
return nullptr;
}
}
private:
bool Delete(const Key& key)
{
if (m_height == -1)
{
return false;
}
else
{
Node* deleteNode;
return RecursiveDelete(m_height, m_root, nullptr, nullptr, nullptr, nullptr, nullptr, deleteNode, key);
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// Delete all nodes
void DeleteNodes(ssize_t height, Node* node)
{
cxAssert(node);
if (height > 0)
{
InternalNode* inode = AsInternalNode(node);
DeleteNodes(height-1,inode->m_ptr);
ssize_t nk = inode->m_numKeyEntrys;
cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);
for (ssize_t i=0 ; i < nk ; ++i)
{
DeleteNodes(height-1,inode->GetEntry(i).m_ptr);
}
delete inode;
}
else
{
delete AsLeafNode(node);
}
}
void DeleteNodes()
{
if (m_height >= 0)
{
DeleteNodes(m_height,m_root);
}
}
/////////////////////////////////// Search //////////////////////////////////////////
// Search the BTree from the given node for the LeafKeyEntry with the given key
// Returns -1 if not found
ssize_t SearchLeaf(const Key& key, LeafNode* leaf) const
{
cxAssert(leaf);
ssize_t nk = leaf->m_numKeyEntrys;
cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
// Use binary search on this node. We have keys K0, .... , Kn-1
ssize_t lo = 0;
ssize_t hi = nk-1;
while (lo <= hi)
{
ssize_t mid = (lo + hi)/2;
const Key& kmid = leaf->GetEntry(mid).first;
if (less(key, kmid)) hi = mid - 1;
else if (less(kmid, key)) lo = mid + 1;
else return mid; // found
}
// not found
return -1;
}
ssize_t LowerBoundInLeaf(const Key& key, LeafNode* leaf) const
{
cxAssert(leaf);
ssize_t nk = leaf->m_numKeyEntrys;
cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
ssize_t lo = 0;
ssize_t hi = nk;
while (lo < hi)
{
ssize_t mid = (lo + hi)/2;
cxAssert(0 <= mid && mid < nk);
if (less( leaf->GetEntry(mid).first, key )) lo = mid + 1;
else hi = mid;
}
cxAssert(lo == hi);
cxAssert(0 <= hi && hi <= nk);
return hi;
}
ssize_t UpperBoundInLeaf(const Key& key, LeafNode* leaf) const
{
cxAssert(leaf);
ssize_t nk = leaf->m_numKeyEntrys;
cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
ssize_t lo = 0;
ssize_t hi = nk;
while (lo < hi)
{
ssize_t mid = (lo + hi)/2;
cxAssert(0 <= mid && mid < nk);
if (less(key, leaf->GetEntry(mid).first)) hi = mid;
else lo = mid + 1;
}
cxAssert(lo == hi);
cxAssert(0 <= hi && hi <= nk);
return hi;
}
/////////////////////////////////// Insertion ///////////////////////////////////////
/*
Insertion for given key involves the following steps
1. Recurse down the tree according to the key until reach the relevant leaf node
If key is found in leaf node then an entry for that key already exists.
2. As the stack unwinds we may need to insert a node into an internal node. This can
cause an internal node to split, causing a further insertion into its parent and so on
3. If the root node overflows then a new internal node must be created for the root, and the
height of the tree increases by 1.
*/
/*
Find or create an entry with the given key under the given leaf node (which was found after recursing
down through the B+Tree with the given key.
allowCreate indicates whether it is permissible to create a new entry with the given key.
If an insertion would make the leaf node overflow, then (at least conceptually)
1. the key is inserted anyway
2. A new 'overflow' leaf node is created that is inserted to the immediate right. The
linked list of leaf node pointers are updated accordingly.
3. Our (left) leaf node is split into two about the median, by copying the top half of the
entries into the 'overflow' leaf node.
*/
LeafNode* wFindInLeaf(wFindRet& ret, LeafNode* left, const Key& key, bool allowCreate)
{
cxAssert(left);
ssize_t nk = left->m_numKeyEntrys;
// Note that if the root node is a leaf then it may contain zero entries.
cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
// Use binary search to find the insertion position. We have keys K0, .... , Kn-1
ssize_t insertPos;
{
ssize_t lo = 0;
ssize_t hi = nk-1;
while (lo <= hi)
{
ssize_t mid = (lo + hi)/2;
cxAssert(0 <= mid && mid < nk);
const Key& kmid = left->GetEntry(mid).first;
if (less(key,kmid)) hi = mid - 1;
else if (less(kmid, key)) lo = mid + 1;
else
{
// Key has been found
ret.m_payload = &left->GetEntry(mid).second;
ret.m_leaf = left;
ret.m_index = mid;
ret.m_created = false;
return nullptr;
}
}
// Correct insertion position is at lo. Note that lo = nk is possible!
insertPos = lo;
}
// Key not found
if (allowCreate)
{
ret.m_created = true;
++m_size;
// Note that normally insertPos shouldn't be zero because the downwards recurse only gets to
// this leaf node if its key is at least as large as the smallest key. However, can have
// insertPos = 0 when the root node is a leaf node.
cxAssert(0 <= insertPos && insertPos <= nk);
if (nk == MAX_NUM_LEAF_KEY_ENTRYS)
{
// Leaf node is already full, so this insertion will overflow
// Create a new leaf node and insert to the immediate right of our node
LeafNode* next = left->m_next;
LeafNode* right = new LeafNode;
right->m_prev = left;
right->m_next = next;
if (next)
{
cxAssert(next->m_prev == left);
next->m_prev = right;
}
else
{
cxAssert(m_lastLeaf == left);
m_lastLeaf = right;
}
left->m_next = right;
// The total number of entrys (including the entry to be inserted) is MAX_NUM_LEAF_KEY_ENTRYS+1
// Let numLeftEntrys be the number of entrys to remain in 'left' (including perhaps the
// entry to be inserted), and the remainder are copied into 'right'.
const ssize_t numLeftEntrys = (MAX_NUM_LEAF_KEY_ENTRYS + 1)/2;
if (insertPos < numLeftEntrys)
{
// Number to keep in left, allowing for the insertion
ssize_t n = numLeftEntrys-1;
// Move top half of the entries into 'right', leaving n entries behind
MoveLeafEntries(left, n, right, 0, nk-n);
// Shift entrys after insertPos upwards by one position to make space
ShiftLeafEntries(left, insertPos, insertPos+1, n-insertPos);
// Set the new entry
LeafKeyEntry& e = left->GetEntry(insertPos);
CopyConstructKey(e.first,key);
ConstructPayload(e.second);
ret.m_payload = &e.second;
ret.m_leaf = left;
ret.m_index = insertPos;
}
else
{
// Insertion position in right node
ssize_t rightInsertPos = insertPos - numLeftEntrys;
cxAssert(rightInsertPos >= 0);
// Copy entries from left to right up to the insertion position
MoveLeafEntries(left, numLeftEntrys, right, 0, rightInsertPos);
// Set the new entry
LeafKeyEntry& e = right->GetEntry(rightInsertPos);
CopyConstructKey(e.first,key);
ConstructPayload(e.second);
ret.m_payload = &e.second;
ret.m_leaf = right;
ret.m_index = rightInsertPos;
// Copy remaining entries from left to right
MoveLeafEntries(left, insertPos, right, rightInsertPos+1, nk-insertPos);
}
left->m_numKeyEntrys = numLeftEntrys;
right->m_numKeyEntrys = MAX_NUM_LEAF_KEY_ENTRYS + 1 - numLeftEntrys;
return right;
}
else
{
// Shift entries after insertPos upwards by one position to make space
ShiftLeafEntries(left, insertPos, insertPos+1, nk-insertPos);
// Set the new entry
LeafKeyEntry& e = left->GetEntry(insertPos);
CopyConstructKey(e.first,key);
ConstructPayload(e.second);
ret.m_payload = &e.second;
ret.m_leaf = left;
ret.m_index = insertPos;
++left->m_numKeyEntrys;
return nullptr;
}
}
else
{
ret.m_payload = nullptr;
ret.m_leaf = nullptr;
ret.m_index = -1;
ret.m_created = false;
return nullptr;
}
}
/*
Insert a new entry e into the given internal node.
If the insertion would make the node overflow, then (at least conceptually)
1. e is inserted anyway
2. A new 'overflow' internal node is created that will be inserted to the immediate right.
3. Our (left) internal node is split into two about the median, by copying the top half of the
entries into the 'overflow' internal node.
4. The returned InternalKeyEntry contains the key and ptr to the overflow node. This needs to
be inserted into the parent node.
*/
void InsertIntoInternalNode(InternalNode* left, const InternalKeyEntry& e, InternalKeyEntry& overflow)
{
cxAssert(left);
ssize_t nk = left->m_numKeyEntrys;
// Note that if the root node is an internal node then it may contain one entry.
cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);
/*
Use binary search to find the insertion position. We have keys K(0), .... , K(nk-1)
Let K = e.m_key be the key to be inserted
For convenience, define K(-1) = -inf, and K(nk) = +inf.
Claim: Each assignment to hi makes it strictly decrease
Proof: hi can only be replaced by hi' = mid-1 where mid = floor( (lo+hi) / 2)
mid = floor( (lo+hi) / 2) => mid <= (lo+hi)/2
=> 2*mid <= lo + hi
=> 2*mid <= 2*hi (because lo <= hi in while condition)
=> mid <= hi
=> mid-1 < hi
=> hi' < hi
Claim: Each assignment to lo makes it strictly increase
Proof: lo can only be replaced by lo' = mid+1 where mid = floor( (lo+hi) / 2)
mid = floor( (lo+hi) / 2) => mid >= floor( (lo+lo)/2 ) (because lo <= hi)
=> mid >= lo
=> mid+1 > lo
=> lo' > lo
Claim: An invariant of the loop is hi < nk
Proof: Initially hi = nk-1 < nk. At each iteration hi cannot increase
Claim: At each iteration mid = floor( (lo+hi)/2 ) satisfies mid < nk
Proof: From a proof above we see that mid <= hi. Now hi < nk therefore mid < nk
Claim: An invariant of the binary search is : K is in [ K(lo-1), K(hi+1) )
Proof: (by induction)
* Initially lo = 0, hi = nk-1.
So [ K(lo-1), K(hi+1) ) = [ K(-1), K(nk) ) = [-inf, +inf) which must contain K
* At each iteration mid = floor( (lo+hi) / 2).
Therefore lo <= mid <= hi
Therefore lo-1 < mid < hi+1
1. if K > K(mid) then lo becomes lo' = mid+1.
lo-1 < mid < hi+1 => lo-1 < lo'-1 < hi+1. Hence the interval strictly decreases
in size but remains non-empty.
K > K(mid) => K > K(lo'-1) so the invariant remains true
2. if K < K(mid) then hi becomes hi' = mid-1
lo-1 < mid < hi+1 => lo-1 < hi'+1 < hi+1. Hence the interval strictly decreases
in size but remains non-empty.
K < K(mid) => K < K(hi'+1) so the invariant remains true
Claim: The loop eventually exits with lo = hi+1
Proof: At each iteration either lo strictly increases or else hi strictly decreases,
therefore an infinite loop is not possible.
The invariant implies lo-1 < hi+1 => lo < hi+2
The iteration exits with lo > hi.
So we have hi < lo < hi+2
This implies lo = h1+1.
*/
ssize_t insertPos;
{
ssize_t lo = 0;
ssize_t hi = nk-1;
while (lo <= hi)
{
ssize_t mid = (lo + hi)/2;
cxAssert(0 <= mid && mid < nk);
if (less(left->GetEntry(mid).m_key, e.m_key)) lo = mid + 1;
else
{
cxAssert(less(e.m_key, left->GetEntry(mid).m_key)); // This node shouldn't already contain the key
hi = mid - 1;
}
}
cxAssert(lo == hi+1);
// Correct insertion position is at lo. Note that lo = nk is possible!
insertPos = lo;
}
cxAssert(0 <= insertPos && insertPos <= nk);
if (nk == MAX_NUM_INTERNAL_KEY_ENTRYS)
{
// This internal node is already full, so this insertion will overflow
// Create a new internal node and insert to the immediate right of our node
InternalNode* right = new InternalNode;
overflow.m_ptr = right;
// The total number of entrys (including the entry to be inserted but excluding the key to be
// moved up into the parent) is MAX_NUM_INTERNAL_KEY_ENTRYS
// Let numLeftEntrys be the number of entrys to remain in 'left' (including perhaps the
// entry to be inserted), and the remainder are copied into 'right'.
const ssize_t numLeftEntrys = MAX_NUM_INTERNAL_KEY_ENTRYS / 2;
if (insertPos <= numLeftEntrys)
{
// Copy top half of the entries into 'right', leaving numLeftEntrys behind
MoveInternalEntries(left, numLeftEntrys, right, 0, nk - numLeftEntrys);
// Shift entrys after insertPos upwards by 1 to make space for the insertion
ShiftInternalEntries(left, insertPos, insertPos+1, numLeftEntrys-insertPos);
// Insert at insertPos into left
CopyConstructInternalEntry(left,insertPos,e);
}
else
{
// Insertion position in right node
ssize_t rightInsertPos = insertPos - (numLeftEntrys + 1);
cxAssert(rightInsertPos >= 0);
// Copy entries from left to right up to the insertion position
MoveInternalEntries(left, numLeftEntrys + 1, right, 0, rightInsertPos);
// Now insert the new entry
CopyConstructInternalEntry(right,rightInsertPos,e);
// Copy remaining entries from left to right
MoveInternalEntries(left, insertPos, right, rightInsertPos+1, nk-insertPos);
}
// Key for the middle entry will be moved up into the parent
right->m_ptr = left->GetEntry(numLeftEntrys).m_ptr;
// todo : comments in macro arguments like Key upset the call to the destructor!
// Having comments appear in macro arguments seems to be a bad idea. Perhaps
// eager evaluation of macro arguments would be better?
typedef Key K;
overflow.m_key.~K();
MoveKey(overflow.m_key, left->GetEntry(numLeftEntrys).m_key);
left->m_numKeyEntrys = numLeftEntrys;
right->m_numKeyEntrys = MAX_NUM_INTERNAL_KEY_ENTRYS - numLeftEntrys;
}
else
{
// Shift entries after insertPos upwards by one position to make space
ShiftInternalEntries(left, insertPos, insertPos+1, nk-insertPos);
CopyConstructInternalEntry(left,insertPos,e);
++left->m_numKeyEntrys;
overflow.m_ptr = nullptr;
}
}
void wFindInNode(wFindRet& ret,Node* left, ssize_t height, const Key& key, InternalKeyEntry& rightEntry, bool allowCreate)
{
if (height > 0)
{
wRecursiveFind(ret,AsInternalNode(left),height,key,rightEntry,allowCreate);
}
else
{
if (LeafNode* right = wFindInLeaf(ret,AsLeafNode(left),key,allowCreate))
{
// Leaf node 'left' overflowed, causing creation of another leaf node 'right'
// We really do need to make a second copy of the key. In a B+Tree (but not a
// BTree) InternalNodes normally record copies of (some of) the keys that occur in
// the leaf nodes.
// Fill rightEntry. Note that fill() returns an address of an instance that
// hasn't been constructed yet.
rightEntry.m_key = right->GetEntry(0).first;
rightEntry.m_ptr = right;
}
}
}
/*
Insert (key,payload) into the sub-tree rooted by left which is assumed to be at the given height
from the leaf nodes (a zero height implies that left if a leaf). Returns false if insertion failed
because the given key already exists in the B+Tree.
If 'left' overflows into a new node then the pointer to the new node is written to 'right'.
Otherwise 'right' is set to nullptr.
*/
void wRecursiveFind(wFindRet& ret,InternalNode* left, ssize_t height, const Key& key, InternalKeyEntry& rightEntry, bool allowCreate)
{
cxAssert(height >= 0);
cxAssert(left);
// In the manner of the normal search, linearly recurse down the B+Tree
Node* childLeft = left->FindChildGivenKey(key);
cxAssert(childLeft);
InternalKeyEntry childRight;
wFindInNode(ret,childLeft,height-1,key,childRight,allowCreate);
// As the stack unwinds, insert entrys and split nodes as required
if (childRight.m_ptr)
{
// The insertion into the (left) child node caused it to overflow into a new right child
// node, so we in turn have to insert a key and right child pointer into this internal
// node.
InsertIntoInternalNode(left,childRight,rightEntry);
}
}
/////////////////////////////////// Deletion ///////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////////////
// Delete an entry from the B+Tree
/*
Merge the given right node into the left node, rendering the right node empty so it may be
deleted. Fix up the double linked list of leaf pages as required for deletion of the right
node.
The given left and right nodes must not be nullptr and be consecutive leaf nodes.
+-------------------------+ next +-------------------------+ next
| K(0) K(1) ... K(nL-1) |---->----| K(0) K(1) ... K(nR-1) |--->---
| | | |
| |----<----| |---<---
+-------------------------+ prev +-------------------------+ prev
left right
*/
void MergeLeafRightToLeft(LeafNode* left, LeafNode* right)
{
cxAssert(left);
cxAssert(right);
cxAssert(left->m_next = right);
cxAssert(right->m_prev = left);
ssize_t nL = left->m_numKeyEntrys;
cxAssert(0 <= nL && nL <= MAX_NUM_LEAF_KEY_ENTRYS);
ssize_t nR = right->m_numKeyEntrys;
cxAssert(0 <= nR && nR <= MAX_NUM_LEAF_KEY_ENTRYS);
ssize_t nTot = nL+nR;
cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= nTot && nTot <= MAX_NUM_LEAF_KEY_ENTRYS);
// Move all entries from right to left
MoveLeafEntries(right,0, left,nL, nR);
left->m_numKeyEntrys = nTot;
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "Merge right to left : Left now has " << nTot << '\n';
TraceIndenter indent;
#endif
// Update double linked list of pages as per deletion of right
left->m_next = right->m_next;
if (right->m_next)
{
cxAssert(m_lastLeaf != right);
cxAssert(right->m_next->m_prev == right);
right->m_next->m_prev = left;
}
else
{
cxAssert(m_lastLeaf == right);
m_lastLeaf = left;
}
right->m_numKeyEntrys = 0;
delete right;
}
// Find the index of the largest key less than or equal to the given old key, and replace that key
// with the given new key value. This should be compatible with the existing ordering of the keys.
void ReplaceKeyInAnchor(InternalNode* node, const Key& oldKey, const Key& newKey)
{
cxAssert(node);
cxAssert(!equals(oldKey,newKey));
ssize_t i = node->FindChildIndexGivenKey(oldKey);
cxAssert(0 <= i && i < node->m_numKeyEntrys);
node->GetEntry(i).m_key = newKey;
// Ensure that new key is compatible with ordering constraint on the keys
if (i > 0)
{
cxAssert(less(node->GetEntry(i-1).m_key, newKey));
}
if (i+1 < node->m_numKeyEntrys)
{
cxAssert(less(newKey, node->GetEntry(i+1).m_key));
}
}
/*
Called because left has underflowed and therefore entries will be transferred from right
(which is assumed to have enough surplus entries)
The given left and right nodes must not be nullptr and be consecutive leaf nodes.
shift ns entrys from right to left
+-----------------+
| |
| |<----------->|
\|/ ns (number to shift)
+-------------------------+ next +-------------------------+
| K(0) K(1) ... K(nL-1) |---->----| K(0) K(1) ... K(nR-1) |
| | | |
| |----<----| |
+-------------------------+ prev +-------------------------+
left right
(has a deficit) (has a surplus)
Updates the key recorded in the given anchor node
*/
void BalanceLeafRightToLeft(LeafNode* left, LeafNode* right, InternalNode* anchor)
{
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "BalanceLeafRightToLeft\n";
TraceIndenter indent;
#endif
cxAssert(left);
cxAssert(right);
cxAssert(anchor);
cxAssert(left->m_next = right);
cxAssert(right->m_prev = left);
// Left has a deficit
ssize_t nL = left->m_numKeyEntrys;
cxAssert(0 <= nL && nL < MIN_NUM_LEAF_KEY_ENTRYS);
// Right has a surplus
ssize_t nR = right->m_numKeyEntrys;
cxAssert(MIN_NUM_LEAF_KEY_ENTRYS < nR && nR <= MAX_NUM_LEAF_KEY_ENTRYS);
ssize_t newNumLeft = (nL + nR)/2;
cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= newNumLeft && newNumLeft <= MAX_NUM_LEAF_KEY_ENTRYS);
// Number to shift
ssize_t ns = newNumLeft - nL;
cxAssert(ns > 0);
ssize_t newNumRight = nR - ns;
cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= newNumRight && newNumRight <= MAX_NUM_LEAF_KEY_ENTRYS);
// Move entries from right to left until they have about the same size
MoveLeafEntries(right,0, left,nL, ns);
// Shift remaining entries in right down to position 0
ShiftLeafEntries(right,ns,0,newNumRight);
left->m_numKeyEntrys = newNumLeft;
right->m_numKeyEntrys = newNumRight;
// Previously the key at index position 0 in the right leaf node
const Key& oldkey = left->GetEntry(nL).first;
// The new key at index position 0 in the right leaf node
const Key& newkey = right->GetEntry(0).first;
// The separator in the anchor has to be adjusted as required
ReplaceKeyInAnchor(anchor, oldkey, newkey);
}
/*
Called because right has underflowed and therefore entries will be transferred from left
(which is assumed to have enough surplus entries)
The given left and right nodes must not be nullptr and be consecutive leaf nodes.
shift ns entrys from left to right
+-------------------+
| |
| |
|<-------->| \|/
+--------------------------+ next +-------------------------+
| K(0) K(1) ... K(nL-1) |---->----| K(0) K(1) ... K(nR-1) |
| | | |
| |----<----| |
+--------------------------+ prev +-------------------------+
left right
(has a surplus) (has a deficit)
Updates the key recorded in the given anchor node
*/
void BalanceLeafLeftToRight(LeafNode* left, LeafNode* right, InternalNode* anchor)
{
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "BalanceLeafLeftToRight\n";
TraceIndenter indent;
#endif
cxAssert(left);
cxAssert(right);
cxAssert(anchor);
cxAssert(left->m_next = right);
cxAssert(right->m_prev = left);
// Left has a surplus
ssize_t nL = left->m_numKeyEntrys;
cxAssert(MIN_NUM_LEAF_KEY_ENTRYS < nL && nL <= MAX_NUM_LEAF_KEY_ENTRYS);
// Right has a deficit
ssize_t nR = right->m_numKeyEntrys;
cxAssert(0 <= nR && nR < MIN_NUM_LEAF_KEY_ENTRYS);
ssize_t newNumRight = (nL + nR)/2;
cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= newNumRight && newNumRight <= MAX_NUM_LEAF_KEY_ENTRYS);
ssize_t ns = newNumRight - nR;
cxAssert(ns > 0);
ssize_t newNumLeft = nL - ns;
cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= newNumLeft && newNumLeft <= MAX_NUM_LEAF_KEY_ENTRYS);
// Shift entries in right upwards from position 0
ShiftLeafEntries(right,0,ns,nR);
// Move entries from left to right
MoveLeafEntries(left,newNumLeft, right,0, ns);
left->m_numKeyEntrys = newNumLeft;
right->m_numKeyEntrys = newNumRight;
// Previously the key at index position 0 in the right leaf node
const Key& oldkey = right->GetEntry(ns).first;
// The new key at index position 0 in the right leaf node
const Key& newkey = right->GetEntry(0).first;
// The separator in the anchor has to be adjusted as required
ReplaceKeyInAnchor(anchor, oldkey, newkey);
}
/*
The given left and right nodes must not be nullptr and be consecutive internal nodes with the given
anchor node.
Merge the given right node into the left node, rendering the right node empty so it may be
deleted.
This function doesn't actually remove the relevant key K(i) from the anchor node.
+--------------------+
| K(i) | (anchor)
+--/------------\----+
/ \
... ...
/ \
+-------------------------------+ +------------------------------+
| K(0) K(1) ... K(i-1) | | K(i+1) K(i+2) ... |
| / \ \ \ | | / \ \ |
| P(-1) P(0) P(1) ... P(i-1) | | P(i) P(i+1) P(i+2) ... |
| | | |
+-------------------------------+ +------------------------------+
left right
The merge process involves the following steps
1. Find the key K(i) in the anchor
2. Copy K(i), P(i) to the new position in left
3. Copy all entries from right to left
*/
void MergeInternalRightToLeft(InternalNode* left, InternalNode* right, InternalNode* anchor)
{
cxAssert(left);
cxAssert(right);
cxAssert(anchor);
ssize_t nL = left->m_numKeyEntrys;
cxAssert(0 <= nL && nL < MAX_NUM_INTERNAL_KEY_ENTRYS);
ssize_t nR = right->m_numKeyEntrys;
cxAssert(1 <= nR && nR < MAX_NUM_INTERNAL_KEY_ENTRYS);
ssize_t nTot = nL + nR + 1;
cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= nTot && nTot <= MAX_NUM_INTERNAL_KEY_ENTRYS);
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "Merge right to left : nL= " << nL << " nR= " << nR << '\n';
TraceIndenter indent;
#endif
// Get index position of the key in the anchor. This is the largest key below right->GetEntry(0).m_key
ssize_t ai = anchor->FindChildIndexGivenKey(right->GetEntry(0).m_key);
cxAssert(0 <= ai && ai < anchor->m_numKeyEntrys);
const Key& anchorKey = anchor->GetEntry(ai).m_key;
if (nL > 0) cxAssert(less(left->GetEntry(nL-1).m_key, anchorKey));
cxAssert(less(anchorKey, right->GetEntry(0).m_key));
// Copy K(i), P(i) to the new position in left
CopyConstructKey(left->GetEntry(nL).m_key, anchorKey);
left->GetEntry(nL).m_ptr = right->m_ptr;
// Copy all entries from right to left
MoveInternalEntries(right,0, left,nL+1, nR);
left->m_numKeyEntrys = nTot;
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "Merge right to left : Left now has " << nTot << '\n';
#endif
right->m_numKeyEntrys = 0;
delete right;
}
/*
left and right are consecutive internal nodes with the given anchor node.
A deletion from right has caused it to underflow. Therefore we want to rebalance by moving entries
from the left node to the right node. It is assumed that left has sufficient surplus keys.
+--------------------+
| K(i) | (anchor)
+--/------------\----+
/ \
... ...
/ \
+-------------------------------+ +------------------------------+
| K(0) K(1) ... K(i-1) | | K(i+1) K(i+2) ... |
| / \ \ \ | | / \ \ |
| P(-1) P(0) P(1) ... P(i-1) | | P(i) P(i+1) P(i+2) ... |
| | | |
+-------------------------------+ +------------------------------+
left right
(has a surplus) (has a deficit)
It is useful to conceptualise the merge of the left and right nodes. The above figure has been
labelled accordingly.
We see that the anchor contains the key K(i) associated with the left most sub-tree P(i) of the
right node. Therefore K(i) needs to be copied into right as entries are shifted from left to
right.
The shift process involves the following steps
1. Find the key K(i) in the anchor
2. Calculate the number of entries to be shifted from left to right
3. Shift the entries in right upwards
4. Move K(i), P(i) to the new position in right
5. Move entries from left to right
6. Set the new anchor key value and right->m_ptr.
*/
void BalanceInternalLeftToRight(InternalNode* left, InternalNode* right, InternalNode* anchor)
{
cxAssert(left);
cxAssert(right);
cxAssert(anchor);
// Left has a surplus
ssize_t nL = left->m_numKeyEntrys;
cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS < nL && nL <= MAX_NUM_INTERNAL_KEY_ENTRYS);
// Right has a deficit
ssize_t nR = right->m_numKeyEntrys;
cxAssert(1 <= nR && nR < MIN_NUM_INTERNAL_KEY_ENTRYS);
ssize_t newNumRight = (nL + nR)/2;
cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= newNumRight && newNumRight <= MAX_NUM_INTERNAL_KEY_ENTRYS);
ssize_t ns = newNumRight - nR;
cxAssert(ns > 0);
ssize_t newNumLeft = nL - ns;
cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= newNumLeft && newNumLeft <= MAX_NUM_INTERNAL_KEY_ENTRYS);
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "BalanceInternalLeftToRight : nL=" << nL << " nR=" << nR << " new nR=" << newNumRight
<< " ns= " << ns << " new nL= " << newNumLeft << '\n';
TraceIndenter indent;
#endif
// Get index position of the key in the anchor. This is the largest key below right->GetEntry(0).m_key
ssize_t ai = anchor->FindChildIndexGivenKey(right->GetEntry(0).m_key);
cxAssert(0 <= ai && ai < anchor->m_numKeyEntrys);
Key& anchorKey = anchor->GetEntry(ai).m_key;
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "ai = " << ai << '\n';
#endif
cxAssert(less(left->GetEntry(nL-1).m_key, anchorKey));
cxAssert(less(anchorKey, right->GetEntry(0).m_key));
/*
Note that we move the anchor key K(i) down into right, and move the new anchor
key K(newNumLeft) out of left, without needing any copy construction or copy assignment
operations in order to duplicate key values.
*/
// Shift all nR entries in right upwards from position 0 to position ns
ShiftInternalEntries(right,0,ns,nR);
// Move the anchor key downwards into right to position ns-1
MoveKey(right->GetEntry(ns-1).m_key, anchorKey);
right->GetEntry(ns-1).m_ptr = right->m_ptr;
// Move ns-1 entries from left to right
MoveInternalEntries(left,newNumLeft+1, right,0, ns-1);
// Move new anchor key upwards out of left
MoveKey(anchorKey, left->GetEntry(newNumLeft).m_key);
right->m_ptr = left->GetEntry(newNumLeft).m_ptr;
left->m_numKeyEntrys = newNumLeft;
right->m_numKeyEntrys = newNumRight;
}
/*
left and right are consecutive internal nodes with the given anchor node.
A deletion from left has caused it to underflow. Therefore we want to rebalance by moving entries
from the right node to the left node. It is assumed that right has sufficient surplus keys.
+--------------------+
| K(i) | (anchor)
+--/------------\----+
/ \
... ...
/ \
+-------------------------------+ +------------------------------+
| K(0) K(1) ... K(i-1) | | K(i+1) K(i+2) ... |
| / \ \ \ | | / \ \ |
| P(-1) P(0) P(1) ... P(i-1) | | P(i) P(i+1) P(i+2) ... |
| | | |
+-------------------------------+ +------------------------------+
left right
(has a deficit) (has a surplus)
It is useful to conceptualise the merge of the left and right nodes. The above figure has been
labelled accordingly.
We see that the anchor contains the key K(i) associated with the left most sub-tree P(i) of the
right node. Therefore K(i) needs to be copied into left as entries are shifted from right to
left.
The shift process involves the following steps
1. Calculate ns = number of entries to be shifted to left
2. Find the key K(i) in the anchor
3. Copy K(i), P(i) to the new position in left
4. Copy ns-1 entries from right to left
5. Set the new anchor key value and right->m_ptr.
6. Shift the entries in right downwards to position 0
*/
void BalanceInternalRightToLeft(InternalNode* left, InternalNode* right, InternalNode* anchor)
{
cxAssert(left);
cxAssert(right);
cxAssert(anchor);
// Left has a deficit
ssize_t nL = left->m_numKeyEntrys;
cxAssert(0 <= nL && nL < MIN_NUM_INTERNAL_KEY_ENTRYS);
// Right has a surplus
ssize_t nR = right->m_numKeyEntrys;
cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS < nR && nR <= MAX_NUM_INTERNAL_KEY_ENTRYS);
ssize_t newNumLeft = (nL + nR)/2;
cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= newNumLeft && newNumLeft <= MAX_NUM_INTERNAL_KEY_ENTRYS);
ssize_t ns = newNumLeft - nL;
cxAssert(ns > 0);
ssize_t newNumRight = nR - ns;
cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= newNumRight && newNumRight <= MAX_NUM_INTERNAL_KEY_ENTRYS);
// Get index position of the key in the anchor. This is the largest key below right->GetEntry(0).m_key
ssize_t ai = anchor->FindChildIndexGivenKey(right->GetEntry(0).m_key);
cxAssert(0 <= ai && ai < anchor->m_numKeyEntrys);
Key& anchorKey = anchor->GetEntry(ai).m_key;
if (nL > 0) cxAssert(less(left->GetEntry(nL-1).m_key, anchorKey));
cxAssert(less(anchorKey, right->GetEntry(0).m_key));
/*
Note that we move the anchor key K(i) down into left, and move the new anchor
key out of right, without needing any key assignment operations.
*/
// Copy K(i), P(i) to the new position in left
MoveKey(left->GetEntry(nL).m_key, anchorKey);
left->GetEntry(nL).m_ptr = right->m_ptr;
// Copy ns-1 entries from right to left
MoveInternalEntries(right,0, left,nL+1, ns-1);
// Set the new anchor key value and right->m_ptr.
MoveKey(anchorKey, right->GetEntry(ns-1).m_key);
right->m_ptr = right->GetEntry(ns-1).m_ptr;
// Shift the entries in right downwards to position 0
ShiftInternalEntries(right,ns,0,newNumRight);
left->m_numKeyEntrys = newNumLeft;
right->m_numKeyEntrys = newNumRight;
}
/*
height is the height of thisNode (so height = 0 indicates that thisNode is a leaf node).
lNode is the node at the same height to the left of thisNode.
lNode is nullptr if there is no node to the left of thisNode
rNode is the node at the same height to the right of thisNode.
rNode is nullptr if there is no node to the right of thisNode
lAnchor is the anchor node between lNode and thisNode. lAnchor=nullptr <=> lNode=nullptr
rAnchor is the anchor node between rNode and thisNode. rAnchor=nullptr <=> rNode=nullptr
*/
bool RecursiveDelete(
ssize_t height, Node* thisNode, Node* lNode, Node* rNode,
InternalNode* lAnchor, InternalNode* rAnchor,
InternalNode* parent,
Node*& deleteNode,
const Key& key)
{
cxAssert(height >= 0);
cxAssert(thisNode);
cxAssert( (lNode == nullptr) == (lAnchor == nullptr) );
cxAssert( (rNode == nullptr) == (rAnchor == nullptr) );
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "RecursiveDelete. height = " << height << " key = " << key << '\n';
TraceIndenter indent(2);
#endif
if (height > 0)
{
Node* next_lNode;
Node* next_rNode;
InternalNode* next_lAnchor;
InternalNode* next_rAnchor;
InternalNode* iThisNode = AsInternalNode(thisNode);
std::pair<Node*,ssize_t> r = iThisNode->FindChildGivenKey2(key);
cxAssert(-1 <= r.second && r.second < iThisNode->m_numKeyEntrys);
Node* nextNode = r.first;
cxAssert(nextNode);
if (r.second == -1)
{
cxAssert(iThisNode->m_ptr == nextNode);
if (lNode)
{
//next_lNode = greatest entry ptr of lNode;
InternalNode* ilNode = AsInternalNode(lNode);
cxAssert(ilNode->m_numKeyEntrys > 0);
next_lNode = ilNode->GetEntry(ilNode->m_numKeyEntrys-1).m_ptr;
}
else
{
cxAssert(!lAnchor);
next_lNode = nullptr;
}
next_lAnchor = lAnchor;
}
else
{
cxAssert(iThisNode->m_ptr != nextNode);
cxAssert(iThisNode->GetEntry(r.second).m_ptr == nextNode);
// next_lNode = entry ptr that immediately precedes nextNode
next_lNode = (r.second == 0) ? iThisNode->m_ptr : iThisNode->GetEntry(r.second-1).m_ptr;
next_lAnchor = iThisNode;
}
if (r.second == iThisNode->m_numKeyEntrys-1)
{
if (rNode)
{
// next_rNode = least entry ptr of rNode
next_rNode = AsInternalNode(rNode)->m_ptr;
}
else
{
cxAssert(!rAnchor);
next_rNode = nullptr;
}
next_rAnchor = rAnchor;
}
else
{
// next_rNode = entry ptr that immediately follows nextNode
next_rNode = iThisNode->GetEntry(r.second+1).m_ptr;
next_rAnchor = iThisNode;
}
Node* childDeleteNode = nullptr;
if (!RecursiveDelete(height-1, nextNode, next_lNode, next_rNode, next_lAnchor, next_rAnchor, iThisNode, childDeleteNode, key))
{
// Deletion failed because entry not found
return false;
}
if (childDeleteNode)
{
// remove deleteNode from thisNode, for now ignoring underflow
ssize_t deletePos = (childDeleteNode == nextNode) ? r.second : r.second+1;
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "Internal node deletePos = " << deletePos << '\n';
TraceIndenter indent;
#endif
// The left most sub-tree under thisNode (i.e. associated with thisNode->m_ptr) is never
// deleted, because merging of a node into its neighbour may only occur when they are
// consecutive nodes under the same parent, and the right node is always merged into
// the left rather than the reverse.
cxAssert(deletePos != -1);
cxAssert(0 <= deletePos && deletePos < iThisNode->m_numKeyEntrys);
// Erase entry from thisNode
DestructInternalEntry(iThisNode,deletePos);
ShiftInternalEntries(iThisNode,deletePos+1,deletePos,iThisNode->m_numKeyEntrys-1-deletePos);
--iThisNode->m_numKeyEntrys;
if (parent)
{
cxAssert(height < m_height);
if (iThisNode->m_numKeyEntrys < MIN_NUM_INTERNAL_KEY_ENTRYS)
{
// thisNode has underflowed
ssize_t s1 = (lNode) ? AsInternalNode(lNode)->m_numKeyEntrys : 0;
ssize_t s2 = (rNode) ? AsInternalNode(rNode)->m_numKeyEntrys : 0;
bool useLeft = s1 > s2;
ssize_t maxs = (useLeft) ? s1 : s2;
if (maxs > MIN_NUM_INTERNAL_KEY_ENTRYS)
{
if (useLeft)
{
// Move surplus entries from lNode to thisNode
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "BalanceInternalLeftToRight\n";
TraceIndenter indent;
#endif
BalanceInternalLeftToRight(AsInternalNode(lNode), iThisNode, lAnchor);
}
else
{
// Move surplus entries from rNode to thisNode
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "BalanceInternalRightToLeft\n";
TraceIndenter indent;
#endif
BalanceInternalRightToLeft(iThisNode, AsInternalNode(rNode), rAnchor);
}
// No more nodes need removal
deleteNode = nullptr;
}
else
{
if (parent == lAnchor)
{
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "MergeInternalRightToLeft with lAnchor\n";
TraceIndenter indent;
#endif
MergeInternalRightToLeft(AsInternalNode(lNode), iThisNode, lAnchor);
// thisNode has been emptied and needs to be deleted from its parent node
deleteNode = iThisNode;
}
else
{
// At least one of the two anchors must be this node's parent
cxAssert(parent == rAnchor);
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "MergeInternalRightToLeft with rAnchor\n";
TraceIndenter indent;
#endif
MergeInternalRightToLeft(iThisNode, AsInternalNode(rNode), rAnchor);
// rNode has been emptied and needs to be deleted from its parent node.
deleteNode = rNode;
}
}
}
}
else
{
cxAssert(height == m_height);
cxAssert(m_root == thisNode);
// Collapse root
if (iThisNode->m_numKeyEntrys == 0)
{
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "Collapse root\n";
#endif
m_root = iThisNode->m_ptr;
delete iThisNode;
--m_height;
}
}
}
}
else
{
LeafNode* leaf = AsLeafNode(thisNode);
ssize_t nk = leaf->m_numKeyEntrys;
cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
// Look for exact match to key in leaf node.
ssize_t deletePos = SearchLeaf(key, leaf);
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "Leaf node deletePos = " << deletePos << '\n';
TraceIndenter indent;
#endif
if (deletePos == -1)
{
// Key not found so delete can't be performed.
return false;
}
cxAssert(0 <= deletePos && deletePos < leaf->m_numKeyEntrys);
// Erase entry from leaf node
DestructLeafEntry(leaf,deletePos);
ShiftLeafEntries(leaf,deletePos+1,deletePos,nk-1-deletePos);
--leaf->m_numKeyEntrys;
--m_size;
if (m_height == 0) cxAssert(!parent);
if (m_height > 0 && leaf->m_numKeyEntrys < MIN_NUM_LEAF_KEY_ENTRYS)
{
cxAssert(parent);
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "Leaf has underflowed n=" << leaf->m_numKeyEntrys << '\n';
TraceIndenter indent;
#endif
// leaf has underflowed
ssize_t s1 = (lNode) ? AsLeafNode(lNode)->m_numKeyEntrys : 0;
ssize_t s2 = (rNode) ? AsLeafNode(rNode)->m_numKeyEntrys : 0;
bool useLeft = s1 > s2;
ssize_t maxs = (useLeft) ? s1 : s2;
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "s1=" << s1 << " s2=" << s2 << " maxs=" << maxs << '\n';
#endif
if (maxs > MIN_NUM_LEAF_KEY_ENTRYS)
{
// Shift entries from the neighbourNode into thisNode. The separator in the
// anchorNode has to be adjusted as required.
if (useLeft)
{
// Assumes MIN_NUM_LEAF_KEY_ENTRYS > 1
cxAssert(leaf->m_numKeyEntrys > 0);
// Move surplus entries from lNode to leaf
BalanceLeafLeftToRight(AsLeafNode(lNode), leaf, lAnchor);
}
else
{
BalanceLeafRightToLeft(leaf, AsLeafNode(rNode), rAnchor);
}
// No more nodes need removal
deleteNode = nullptr;
}
else
{
if (parent == lAnchor)
{
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "MergeLeafRightToLeft. Delete leaf\n";
TraceIndenter indent;
#endif
MergeLeafRightToLeft(AsLeafNode(lNode), leaf);
// leaf has been emptied and needs to be deleted from its parent node
deleteNode = leaf;
}
else
{
// At least one of the two anchors must be this node's parent
cxAssert(parent == rAnchor);
#if CEDA_BPLUSTREE_TRACE_DELETE
Tracer() << "MergeLeafRightToLeft. Delete rNode\n";
TraceIndenter indent;
#endif
MergeLeafRightToLeft(leaf, AsLeafNode(rNode));
// rNode has been emptied and needs to be deleted from its parent node.
deleteNode = rNode;
}
}
}
else
{
deleteNode = nullptr;
}
}
return true;
}
/////////////////////////////////// Debugging ///////////////////////////////////////
// Ensure that all keys are in the range [*k1,*k2).
// If k1 != nullptr then check that all keys in this node satisfy k >= *k1
// If k2 != nullptr then check that all keys in this node satisfy k < *k2
// Ensure that fanout is in appropriate range
void ValidateNode(ssize_t height, Node* node, bool isRoot, const Key* k1, const Key* k2) const
{
cxAssert(node);
cxAssert(height >= 0);
if (height > 0)
{
/*
As an example, for nk = 4 we have the following picture
Key K0 K1 K2 K3
\ \ \ \
Ptr P P0 P1 P2 P3
| | | | |
Range [-inf,K0) [K0,K1) [K1,K2) [K2,K3) [K3,+inf)
*/
InternalNode* inode = AsInternalNode(node);
ssize_t nk = inode->m_numKeyEntrys;
cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);
if (!isRoot) cxAssert(MIN_NUM_INTERNAL_KEY_ENTRYS <= nk);
// Check that keys strictly increase from left to right
for (ssize_t i=1 ; i < nk ; ++i)
{
cxAssert(less(inode->GetEntry(i-1).m_key, inode->GetEntry(i).m_key));
}
// Check that keys are in [*k1,*k2)
if (k1) cxAssert(!less(inode->GetEntry(0).m_key, *k1));
if (k2) cxAssert(less(inode->GetEntry(nk-1).m_key, *k2));
// Recurse into child nodes
ValidateNode(height-1,inode->m_ptr, false, nullptr, &inode->GetEntry(0).m_key);
for (ssize_t i=0 ; i < nk ; ++i)
{
ValidateNode(height-1,inode->GetEntry(i).m_ptr, false, &inode->GetEntry(i).m_key,
(i == nk-1) ? nullptr : &inode->GetEntry(i+1).m_key);
}
}
else
{
LeafNode* leaf = AsLeafNode(node);
ssize_t nk = leaf->m_numKeyEntrys;
// Note that if the root node is a leaf then it may contain zero entries.
cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
if (!isRoot) cxAssert(MIN_NUM_LEAF_KEY_ENTRYS <= nk);
// Check that keys increase from left to right
for (ssize_t i=1 ; i < nk ; ++i)
{
cxAssert(less(leaf->GetEntry(i-1).first, leaf->GetEntry(i).first));
}
// Check that keys are in [*k1,*k2)
if (nk > 0)
{
if (k1) cxAssert(!less(leaf->GetEntry(0).first, *k1));
if (k2) cxAssert(less(leaf->GetEntry(nk-1).first, *k2));
}
}
}
ssize_t GetNumNodes(ssize_t height, Node* node) const
{
cxAssert(node);
if (height > 0)
{
InternalNode* inode = AsInternalNode(node);
ssize_t c = 1 + GetNumNodes(height-1,inode->m_ptr);
ssize_t nk = inode->m_numKeyEntrys;
cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);
for (ssize_t i=0 ; i < nk ; ++i)
{
c += GetNumNodes(height-1,inode->GetEntry(i).m_ptr);
}
return c;
}
else
{
return 1;
}
}
void WriteNode(xostream& os, ssize_t height, Node* node) const
{
cxAssert(node);
os << '{' << '\n';
{
Indenter indent(os,4);
if (height > 0)
{
InternalNode* inode = AsInternalNode(node);
WriteNode(os,height-1,inode->m_ptr);
ssize_t nk = inode->m_numKeyEntrys;
cxAssert(1 <= nk && nk <= MAX_NUM_INTERNAL_KEY_ENTRYS);
for (ssize_t i=0 ; i < nk ; ++i)
{
os << inode->GetEntry(i).m_key << '\n';
WriteNode(os,height-1,inode->GetEntry(i).m_ptr);
}
}
else
{
LeafNode* leaf = AsLeafNode(node);
ssize_t nk = leaf->m_numKeyEntrys;
cxAssert(0 <= nk && nk <= MAX_NUM_LEAF_KEY_ENTRYS);
for (ssize_t i=0 ; i < nk ; ++i)
{
os << leaf->GetEntry(i).first << " --> " << leaf->GetEntry(i).second << '\n';
}
}
}
os << '}' << '\n';
}
private:
ssize_t m_size; // Total number of (key,payload) entries stored in the B+Tree
LeafNode* m_firstLeaf; // pointer to first leaf node in a linked list. Never nullptr
LeafNode* m_lastLeaf; // pointer to last leaf node in a linked list. Never nullptr
Node* m_root; // pointer to root node. Could be leaf node or internal node
ssize_t m_height; // 0 if root node is a leaf node
};
} // namespace ceda
#endif // include guard