ForwardLinkedPagedBuffer.h
// ForwardLinkedPagedBuffer.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2011
#pragma once
#ifndef Ceda_cxUtils_ForwardLinkedPagedBuffer_H
#define Ceda_cxUtils_ForwardLinkedPagedBuffer_H
#include "cxUtils.h"
#include <mutex>
#include <string.h>
namespace ceda
{
///////////////////////////////////////////////////////////////////////////////////////////////////
// ForwardLinkedPagedBuffer
/*
An implementatation of an efficient FIFO queue for a byte stream, where Write() appends data
to the end of the queue and Read() extracts data from the front of the queue. The amount of
data that can be buffered is only constrained by available memory.
The queue is implemented using a forward linked list of pages.
A useful feature of the implementation is that as long there are appropriate memory ordering
guarantees between a Write() and any /dependent/ Read() (meaning a Read() that reads any part
of what was written by the Write()), it is possible to support concurrent reading and writing
for a single producer and single consumer. See SpscByteQueue for an example.
*/
template <ssize_t pageSize = 4096>
class ForwardLinkedPagedBuffer
{
cxNotCloneable(ForwardLinkedPagedBuffer)
private:
struct Page
{
Page() : next_(nullptr) {}
octet_t buffer_[pageSize];
Page* next_; // Next page in the forward linked list
};
Page* first_; // Ptr to first page in the linked list
Page* last_; // Ptr to last page in the linked list
ssize_t wpos_; // Write position on last_
ssize_t rpos_; // Read position on first_
public:
ForwardLinkedPagedBuffer() : wpos_(0), rpos_(0)
{
first_ = last_ = new Page();
}
~ForwardLinkedPagedBuffer()
{
clear();
delete first_;
}
/*
ForwardLinkedPagedBuffer(const ForwardLinkedPagedBuffer<pageSize>& other) : first_(nullptr), last_(nullptr)
{
push(other.begin(), other.end());
}
ForwardLinkedPagedBuffer<pageSize>& operator=(const ForwardLinkedPagedBuffer<pageSize>& other)
{
if (this != &other)
{
clear();
push(other.begin(), other.end());
}
return *this;
}
void swap(ForwardLinkedPagedBuffer<pageSize>& rhs)
{
using std::swap;
swap(first_, rhs.first_);
swap(last_, rhs.last_);
swap(wpos_, rhs.wpos_);
swap(rpos_, rhs.rpos_);
}
*/
void clear()
{
cxAssert(first_);
cxAssert(last_);
cxAssert(last_->next_ == nullptr);
// Delete all pages except the first page
Page* p = first_->next_;
while(p)
{
Page* n = p->next_;
delete p;
p = n;
}
first_->next_ = nullptr;
last_ = first_;
wpos_ = 0;
rpos_ = 0;
}
// Write the given buffer to the end of this ForwardLinkedPagedBuffer
void Write(const void* buffer, ssize_t numBytes)
{
cxAssert(numBytes >= 0);
if (numBytes > 0)
{
cxAssert(buffer);
for(;;)
{
cxAssert(last_);
cxAssert(last_->next_ == nullptr);
ssize_t numBytesAvail = pageSize - wpos_;
cxAssert(0 <= numBytesAvail && numBytesAvail <= pageSize);
if (numBytes <= numBytesAvail)
{
// Will fit on last page
memcpy(last_->buffer_ + wpos_, buffer, numBytes);
wpos_ += numBytes;
break;
}
memcpy(last_->buffer_ + wpos_, buffer, numBytesAvail);
(const octet_t*&) buffer += numBytesAvail;
numBytes -= numBytesAvail;
cxAssert(numBytes > 0);
Page* p = new Page();
last_->next_ = p;
last_ = p;
wpos_ = 0;
}
}
}
/*
Read() must only be called when it is known there is sufficient data available (i.e.
for the requested number of bytes).
*/
void Read(void* buffer, ssize_t numBytes)
{
cxAssert(numBytes >= 0);
if (numBytes > 0)
{
cxAssert(buffer);
for(;;)
{
cxAssert(first_);
// Num bytes available to read on the first page
ssize_t numBytesAvail = pageSize - rpos_;
cxAssert(0 <= numBytesAvail && numBytesAvail <= pageSize);
if (numBytes <= numBytesAvail)
{
// Can satisfy entire read from first_
memcpy(buffer, first_->buffer_ + rpos_, numBytes);
rpos_ += numBytes;
break;
}
memcpy(buffer, first_->buffer_ + rpos_, numBytesAvail);
(octet_t*&) buffer += numBytesAvail;
numBytes -= numBytesAvail;
cxAssert(numBytes > 0);
Page* p = first_;
first_ = first_->next_;
delete p;
rpos_ = 0;
}
}
}
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// SpscByteQueue
/*
Illustrates how ForwardLinkedPagedBuffer<> can support concurrent reading and writing for a
queue that supports a single producer and a single consumer.
A mutex is (only) locked while accessing size_, and this is sufficient to provide
the memory ordering guarantees between producer and consumer.
*/
class SpscByteQueue
{
public:
SpscByteQueue() : size_(0) {}
void clear()
{
size_ = 0;
buffer_.clear();
}
void Write(const void* buffer, ssize_t numBytes)
{
cxAssert(numBytes >= 0);
// For concurrency write actual buffer outside mutex
buffer_.Write(buffer,numBytes);
{
std::lock_guard<std::mutex> lock(mutex_);
size_ += numBytes;
}
}
bool Read(void* buffer, ssize_t numBytes)
{
cxAssert(numBytes >= 0);
{
std::lock_guard<std::mutex> lock(mutex_);
if (size_ < numBytes) return false;
size_ -= numBytes;
}
// Write actual buffer without a mutex
buffer_.Read(buffer,numBytes);
return true;
}
ssize_t TryRead(void* buffer, ssize_t numBytes)
{
cxAssert(numBytes >= 0);
{
std::lock_guard<std::mutex> lock(mutex_);
if (size_ < numBytes)
{
numBytes = size_;
size_ = 0;
}
else
{
size_ -= numBytes;
}
}
// For concurrency write actual buffer without a mutex
buffer_.Read(buffer,numBytes);
return numBytes;
}
ssize_t size() const
{
std::lock_guard<std::mutex> lock(mutex_);
return size_;
}
bool empty() const
{
std::lock_guard<std::mutex> lock(mutex_);
return size_ == 0;
}
private:
ForwardLinkedPagedBuffer<> buffer_;
ssize_t size_;
mutable std::mutex mutex_;
};
} // namespace ceda
#endif // include guard