ForwardLinkedPagedBuffer.h

// ForwardLinkedPagedBuffer.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2011

#pragma once
#ifndef Ceda_cxUtils_ForwardLinkedPagedBuffer_H
#define Ceda_cxUtils_ForwardLinkedPagedBuffer_H

#include "cxUtils.h"
#include <mutex>
#include <string.h>

namespace ceda
{

///////////////////////////////////////////////////////////////////////////////////////////////////
// ForwardLinkedPagedBuffer

/*
An implementatation of an efficient FIFO queue for a byte stream, where Write() appends data 
to the end of the queue and Read() extracts data from the front of the queue.  The amount of 
data that can be buffered is only constrained by available memory.

The queue is implemented using a forward linked list of pages. 

A useful feature of the implementation is that as long there are appropriate memory ordering 
guarantees between a Write() and any /dependent/ Read() (meaning a Read() that reads any part
of what was written by the Write()), it is possible to support concurrent reading and writing 
for a single producer and single consumer. See SpscByteQueue for an example.
*/

template <ssize_t pageSize = 4096>
class ForwardLinkedPagedBuffer
{
    cxNotCloneable(ForwardLinkedPagedBuffer)
    
private:
    struct Page
    {
        Page() : next_(nullptr) {}
        
        octet_t buffer_[pageSize];
        Page* next_;       // Next page in the forward linked list
    };

    Page* first_;      // Ptr to first page in the linked list
    Page* last_;       // Ptr to last page in the linked list
    ssize_t wpos_;     // Write position on last_
    ssize_t rpos_;     // Read position on first_

public:
    ForwardLinkedPagedBuffer() : wpos_(0), rpos_(0) 
    {
        first_ = last_ = new Page();
    }
    ~ForwardLinkedPagedBuffer() 
    { 
        clear();
        delete first_; 
    }

    /*
    ForwardLinkedPagedBuffer(const ForwardLinkedPagedBuffer<pageSize>& other) : first_(nullptr), last_(nullptr)
    {
        push(other.begin(), other.end());    
    }

    ForwardLinkedPagedBuffer<pageSize>& operator=(const ForwardLinkedPagedBuffer<pageSize>& other)
    {
        if (this != &other)
        {
            clear();
            push(other.begin(), other.end());    
        }
        return *this;
    }

    void swap(ForwardLinkedPagedBuffer<pageSize>& rhs)
    {
        using std::swap;
        swap(first_, rhs.first_);
        swap(last_, rhs.last_);
        swap(wpos_, rhs.wpos_);
        swap(rpos_, rhs.rpos_);
    }
    */

    void clear()
    {
        cxAssert(first_);
        cxAssert(last_);
        cxAssert(last_->next_ == nullptr);

        // Delete all pages except the first page
        Page* p = first_->next_;
        while(p)
        {
            Page* n = p->next_;
            delete p;
            p = n;
        }
        first_->next_ = nullptr;
        
        last_ = first_;
        wpos_ = 0;
        rpos_ = 0;
    }

    // Write the given buffer to the end of this ForwardLinkedPagedBuffer
    void Write(const void* buffer, ssize_t numBytes)
    {
        cxAssert(numBytes >= 0);
        if (numBytes > 0)
        {
            cxAssert(buffer);
            for(;;)
            {
                cxAssert(last_);
                cxAssert(last_->next_ == nullptr);

                ssize_t numBytesAvail = pageSize - wpos_;
                cxAssert(0 <= numBytesAvail && numBytesAvail <= pageSize);
                
                if (numBytes <= numBytesAvail)
                {
                    // Will fit on last page
                    memcpy(last_->buffer_ + wpos_, buffer, numBytes);
                    wpos_ += numBytes;
                    break;
                }
                
                memcpy(last_->buffer_ + wpos_, buffer, numBytesAvail);
                (const octet_t*&) buffer += numBytesAvail;
                numBytes -= numBytesAvail;
                cxAssert(numBytes > 0);

                Page* p = new Page();
                last_->next_ = p;
                last_ = p;
                wpos_ = 0;
            }
        }
    }
    
    /*
    Read() must only be called when it is known there is sufficient data available (i.e.
    for the requested number of bytes).
    */
    void Read(void* buffer, ssize_t numBytes)
    {
        cxAssert(numBytes >= 0);
        if (numBytes > 0)
        {
            cxAssert(buffer);
            for(;;)
            {
                cxAssert(first_);

                // Num bytes available to read on the first page
                ssize_t numBytesAvail = pageSize - rpos_;
                cxAssert(0 <= numBytesAvail && numBytesAvail <= pageSize);
                
                if (numBytes <= numBytesAvail)
                {
                    // Can satisfy entire read from first_
                    memcpy(buffer, first_->buffer_ + rpos_, numBytes);
                    rpos_ += numBytes;
                    break;
                }
                
                memcpy(buffer, first_->buffer_ + rpos_, numBytesAvail);
                (octet_t*&) buffer += numBytesAvail;
                numBytes -= numBytesAvail;
                cxAssert(numBytes > 0);
                
                Page* p = first_;
                first_ = first_->next_;
                delete p;
                rpos_ = 0;
            }            
        }
    }
};


///////////////////////////////////////////////////////////////////////////////////////////////////
// SpscByteQueue

/*
Illustrates how ForwardLinkedPagedBuffer<> can support concurrent reading and writing for a
queue that supports a single producer and a single consumer.

A mutex is (only) locked while accessing size_, and this is sufficient to provide
the memory ordering guarantees between producer and consumer.
*/

class SpscByteQueue
{
public:
    SpscByteQueue() : size_(0) {}
    
    void clear()
    {
        size_ = 0;
        buffer_.clear();
    }
    
    void Write(const void* buffer, ssize_t numBytes)
    {
        cxAssert(numBytes >= 0);

        // For concurrency write actual buffer outside mutex
        buffer_.Write(buffer,numBytes);

        {
            std::lock_guard<std::mutex> lock(mutex_);
            size_ += numBytes;
        }
    }
    
    bool Read(void* buffer, ssize_t numBytes)
    {
        cxAssert(numBytes >= 0);
        {
            std::lock_guard<std::mutex> lock(mutex_);
            if (size_ < numBytes) return false;
            size_ -= numBytes;
        }
        
        // Write actual buffer without a mutex
        buffer_.Read(buffer,numBytes);
        
        return true;
    }

    ssize_t TryRead(void* buffer, ssize_t numBytes)
    {
        cxAssert(numBytes >= 0);
        {
            std::lock_guard<std::mutex> lock(mutex_);
            if (size_ < numBytes)
            {
                numBytes = size_;
                size_ = 0;
            } 
            else
            {
                size_ -= numBytes;
            }
        }
        
        // For concurrency write actual buffer without a mutex
        buffer_.Read(buffer,numBytes);
        
        return numBytes;
    }
    
    ssize_t size() const
    {
        std::lock_guard<std::mutex> lock(mutex_);
        return size_;
    }
    
    bool empty() const
    {
        std::lock_guard<std::mutex> lock(mutex_);
        return size_ == 0;
    }

private:
    ForwardLinkedPagedBuffer<> buffer_;
    ssize_t size_;
    mutable std::mutex mutex_;
};

} // namespace ceda

#endif // include guard