MsgReadBuffer.h

// MsgReadBuffer.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2013

#ifndef Ceda_cxMessage_MsgReadBuffer_H
#define Ceda_cxMessage_MsgReadBuffer_H

#include "cxMessage.h"
#include "TcpMsg.h"
#include "VirtualAllocBuffer.h"
#include "Ceda/cxUtils/CedaAssert.h"

#ifdef _MSC_VER
    // struct 'X' needs to have dll-interface to be used by clients of class 'Y'
    #pragma warning(disable:4251)
#endif

namespace ceda
{
///////////////////////////////////////////////////////////////////////////////////////////////////
// MsgReadBuffer

/*
MsgReadBuffer provides a receive buffer for a socket connection, suitable for reading an octet 
stream broken up into messages.

                    BeginReceive()                        
                     EndReceive()                         Read() 
  +--------------+               +-------------------+               +--------------+
  |   producer   |------->-------|   MsgReadBuffer   |------->-------|   consumer   |
  +--------------+               +-------------------+               +--------------+

It is assumed each message begins with a 4 octet header which is a 32 bit integer (in little 
endian order over the wire) giving the number of octets in the message payload.

It is assumed that messages are small enough that it is reasonable for them to be buffered entirely
in a single range in memory.  As such the buffer used internally by MsgReadBuffer is automatically 
resized to allow a large message to fit in memory.  

MsgReadBuffer is not thread-safe.

It is assumed MsgReadBuffer is used in something like the following manner:

    MessageReaderSettings settings;
    MsgReadBuffer mrb(settings);
    
    while(1)
    {
        // Read from the socket
        MsgReadBuffer::Buffer socketBuffer = mrb.BeginReceive();
        int numBytesRead = socket.recv(socketBuffer.buffer, socketBuffer.len);
        mrb.EndReceive(numBytesRead);

        // Process all the messages that were fully received
        MsgReadBuffer::Buffer msgBuffer;
        MsgReadBuffer::EReadMsg rm;
        while((rm = mrb.Read(msgBuffer)) == MsgReadBuffer::RM_OK)
        {
            // Handle received message in 'msgBuffer'
            // (note this doesn't include the 4 byte header for the message size)
        }

        if (rm == MsgReadBuffer::RM_INVALID_SIZE) < handle error >
    }

Note that a single thread alternates between reading from the socket into the MsgReadBuffer's buffer 
and reading the messages out of the MsgReadBuffer's buffer until they have all been read.

BeginReceive() returns a buffer which can be provided to either a synchronous or asynchronous
read call on a socket.  After the read on the socket has completed EndReceive() is called
providing the actual number of bytes that were read from the socket.

After a call to EndReceive(), calls to Read() are made to read the next message which is available 
if any. Available means the message has been received in its entirety and exists in a single range in 
memory.

Read() must not be called between calls to BeginReceive() and EndReceive().
*/

class cxMessage_API MsgReadBuffer
{
    cxNotCloneable(MsgReadBuffer)

public:
    explicit MsgReadBuffer(const MessageReaderSettings& settings);

    struct Buffer
    {
        octet_t* buffer;
        ssize_t len;
    };

    bool HaveUnreadData() const
    {
        return messagePos_ != receivedPos_;
    }
    
    /*
    Receiving of data from the transport layer to be written to a buffer in memory involves paired 
    calls to BeginReceive() and EndReceive().

    BeginReceive() returns a buffer to be written with data.  If the returned buffer is empty then 
    there is no space available.

    It is allowable to write any prefix of the returned buffer (including none at all). 
    
    After the received data has been written to the buffer EndReceive() should be called with 'len' 
    equal to the number of octets which were actually written.  If no data was received then calling 
    EndReceive(0) is optional.

    The buffer returned by BeginReceive() must not be accessed after a subsequent call to EndReceive() or
    Read().
    */
    Buffer BeginReceive();
    void EndReceive(ssize_t len);

    enum EReadMsg
    {
        RM_OK,              // message read successfully
        RM_NONE,            // no message available in the buffer
        RM_PARTIAL,         // only a non-empty partial message available in the buffer    
        RM_INVALID_SIZE     // Message header has an invalid size
                            // A valid size is in the range [0,settings_.maxMsgSize]
    };
    
    // Try to read the next message from the buffer.  If returns RM_OK then b provides the message 
    // that was read.
    EReadMsg Read(Buffer& b);

private:
    void AssertInvariant() const
    {
        cxAssert(!buffer_.empty());
        cxAssert(0 <= messagePos_);
        cxAssert(messagePos_ <= receivedPos_);
        cxAssert(receivedPos_ <= buffer_.size());
    }

private:
    MessageReaderSettings settings_;

    /*
    There is a single buffer used.  At any time a prefix of the buffer contains data that has
    already been received.  A single pending I/O request is normally made for the suffix 
    corresponding to the part that hasn't been received yet.  There is however possibly a 
    constraint on the maximum size of any I/O request.
    
    Within the buffer there is a section corresponding to the current message we are reading.
    If enough data has been received we can read the message header which specifies the size
    of the message body.  On that basis we can increase the size of the buffer if necessary
    to allow the entire message to fit in the buffer.
    
    The implementation treats the section [0,messagePos_) as already processed, and therefore
    no longer required.  Therefore where appropriate it may move the portion of the current 
    message that has been received back to position 0.
    
    The implementation tries to avoid unnecessary increases in the buffer size, and memmove 
    operations.
    
    
                                      buffer_
        [---------------------------------------------------------------------)
        
                       received 
        [------------------------------------------)
        
                                           current message
                                      [---------------------...
    
        |                             |             |                          |
        0                     messagePos_      receivedPos_             buffer_.size()
        
        
    Invariant:
    
        0 <= messagePos_ <= receivedPos_ <= buffer_.size()    
    */
    
    // buffer implemented in terms of VirtualAlloc
    VirtualAllocBuffer buffer_;

    ssize_t receivedPos_;        // Marks portion of buffer_ that has been received (i.e. filled
                                 // with data read from the socket)
    
    // Information about the current message we are reading
    ssize_t messagePos_;         // Index position in buffer_ of the start (i.e. header)
                                 // of the current message being read.  
                                 // Always well defined.
                                            
    ssize_t bodySize_;           // -1 if not known (i.e. haven't yet read the header which
                                 // records the message size)
};

} // namespace ceda

#endif // include guard