MsgWriteBuffer.h

// MsgWriteBuffer.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2013

#ifndef Ceda_cxMessage_MsgWriteBuffer_H
#define Ceda_cxMessage_MsgWriteBuffer_H

#include "cxMessage.h"
#include "TcpMsg.h"
#include "VirtualAllocBuffer.h"
#include "Ceda/cxUtils/CedaAssert.h"

#ifdef _MSC_VER
    // struct 'X' needs to have dll-interface to be used by clients of class 'Y'
    #pragma warning(disable:4251)
#endif

namespace ceda
{

///////////////////////////////////////////////////////////////////////////////////////////////////
/*
A MsgWriteBuffer represents an SPSC (single producer and single consumer) transient message queue 
used to buffer messages to be sent. The producer and consumer run in strict alternation.

                    BeginWrite()                        BeginSend()
                     EndWrite()                          EndSend()
  +--------------+               +-------------------+               +--------------+
  |   producer   |------->-------|   MsgWriteBuffer  |------->-------|   consumer   |
  +--------------+               +-------------------+               +--------------+

MsgWriteBuffer is not concerned with the content of messages, including any kind of message header.

MsgWriteBuffer is responsible for managing a send buffer in user space which is filled with variable 
length messages and occassionally the filled portion is sent by a stream oriented transport layer
such as TCP.  Neither the filling of the buffer with messages nor the sending of filled buffers 
is performed by MsgWriteBuffer itself. Rather, the MsgWriteBuffer is passive in nature and is only concerned 
with managing the policy for when it's time to send filled portions - the main reason being that 
most I/O functions provided by an OS have large overheads (perhaps because they go into kernel 
mode) and are very inefficient when sending many small buffers rather than a smaller number of 
large buffers.

MsgWriteBuffer has the following public member functions:

    ReadyToSend()
    BeginWrite()
    EndWrite()
    BeginSend()
    EndSend()

MsgWriteBuffer is not thread-safe - i.e. it is not allowable for more than one thread to call the public
functions of MsgWriteBuffer concurrently.

The following code illustrates usage:

    MessageWriterSettings settings;
    MsgWriteBuffer mwb(settings);

    while(1)
    {
        // Write messages to buffer in memory
        while( <have messages to be written> && !mwb.ReadyToSend())
        {
            ssize_t bufferSizeRequested = < size of next message >
            Buffer b = mwb.BeginWrite(bufferSizeRequested);
            assert(bufferSizeRequested <= b.len);

            < write some prefix of b >
            ssize_t numOctetsWritten = ...

            assert(numOctetsWritten <= b.len);
            mwb.EndWrite(numOctetsWritten);
        }

        // Send messages using asynchronous or synchronous I/O
        while(mwb.ReadyToSend())
        {
            Buffer b = mwb.BeginSend();

            < send some prefix of b >
            ssize_t numOctetsSent = socket.write(b.buffer, b.len);

            assert(numOctetsSent <= b.len);
            mwb.EndSend(numOctetsSent);
        }
    }
*/

class cxMessage_API MsgWriteBuffer
{
    cxNotCloneable(MsgWriteBuffer)

public:
    explicit MsgWriteBuffer(const MessageWriterSettings& settings);

    struct Buffer
    {
        octet_t* buffer;
        ssize_t len;
    };

    /*
    Returns true if the buffer is so full that it is appropriate to send what's already there
    rather than write more messages.
    */
    bool ReadyToSend() const
    {
         return filledPos_ >= settings_.upperHysteresisSendSize;
    }

    /*
    Writing of data to the buffer in memory involves paired calls to BeginWrite() and EndWrite().

    BeginWrite() returns the next buffer to be written (i.e. to be filled) with data.  The size of 
    the returned buffer will be at least bufferSizeRequested (but may well be somewhat larger).
    
    It is allowable to write any prefix of the returned buffer (including none at all) and this may 
    exceed what was requested with 'bufferSizeRequested'.

    Multiple messages can be written to the buffer - indeed writing as many messages as will fit 
    into the returned buffer may improve performance.
    
    After the data has been written EndWrite() should be called with 'len' equal to the number of 
    octets which were actually written.  If no data was written then calling EndWrite(0) is 
    optional.

    The buffer returned by BeginWrite() must not be accessed after a subsequent call to EndWrite() or
    BeginSend().
    */
    Buffer BeginWrite(ssize_t bufferSizeRequested);
    void EndWrite(ssize_t len);

    /*
    Sending of data involves paired calls to BeginSend() and EndSend().

    BeginSend() returns the next buffer of data that's been written with data and is ready to be 
    sent.  If the returned buffer is empty then there is no data available to be sent.

    It is allowable to send any prefix of the returned buffer (including none at all). 
    
    After the data has been sent EndSend() should be called with 'len' equal to the number of 
    octets which were actually sent.  If no data was sent then calling EndSend(0) is optional.

    The buffer returned by BeginSend() must not be accessed after a subsequent call to EndSend() or
    BeginWrite().
    */
    Buffer BeginSend() const;
    void EndSend(ssize_t len);

private:
    void AssertInvariant() const
    {
        cxAssert(0 <= completedPos_);
        cxAssert(completedPos_ <= filledPos_);
        cxAssert(filledPos_ <= buffer_.size());
    }

private:
    MessageWriterSettings settings_;

    /*
        <--- sent ---> <-- filled and waiting to be sent --> <-- free (not filled with data) -->
        [-------------|-------------------------------------|----------------------------------)
        0             |                                     |                                  |
                 completedPos_                           filledPos_                  buffer_.size()
    
    Invariant:
    
    0 <= completedPos_ <= filledPos_ <= buffer_.size()
    */

    // buffer implemented in terms of VirtualAlloc
    VirtualAllocBuffer buffer_;

    // The amount of the buffer for which send requests have successfully been completed
    ssize_t completedPos_;

    // The amount of the buffer that is filled with data (i.e. messages that have been prepared
    // in memory).
    ssize_t filledPos_;

    ssize_t sendUntilResetBufferSize_;
};

} // namespace ceda

#endif // include guard