MsgReadBuffer.h
// MsgReadBuffer.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2013
#ifndef Ceda_cxMessage_MsgReadBuffer_H
#define Ceda_cxMessage_MsgReadBuffer_H
#include "cxMessage.h"
#include "TcpMsg.h"
#include "VirtualAllocBuffer.h"
#include "Ceda/cxUtils/CedaAssert.h"
#ifdef _MSC_VER
// struct 'X' needs to have dll-interface to be used by clients of class 'Y'
#pragma warning(disable:4251)
#endif
namespace ceda
{
///////////////////////////////////////////////////////////////////////////////////////////////////
// MsgReadBuffer
/*
MsgReadBuffer provides a receive buffer for a socket connection, suitable for reading an octet
stream broken up into messages.
BeginReceive()
EndReceive() Read()
+--------------+ +-------------------+ +--------------+
| producer |------->-------| MsgReadBuffer |------->-------| consumer |
+--------------+ +-------------------+ +--------------+
It is assumed each message begins with a 4 octet header which is a 32 bit integer (in little
endian order over the wire) giving the number of octets in the message payload.
It is assumed that messages are small enough that it is reasonable for them to be buffered entirely
in a single range in memory. As such the buffer used internally by MsgReadBuffer is automatically
resized to allow a large message to fit in memory.
MsgReadBuffer is not thread-safe.
It is assumed MsgReadBuffer is used in something like the following manner:
MessageReaderSettings settings;
MsgReadBuffer mrb(settings);
while(1)
{
// Read from the socket
MsgReadBuffer::Buffer socketBuffer = mrb.BeginReceive();
int numBytesRead = socket.recv(socketBuffer.buffer, socketBuffer.len);
mrb.EndReceive(numBytesRead);
// Process all the messages that were fully received
MsgReadBuffer::Buffer msgBuffer;
MsgReadBuffer::EReadMsg rm;
while((rm = mrb.Read(msgBuffer)) == MsgReadBuffer::RM_OK)
{
// Handle received message in 'msgBuffer'
// (note this doesn't include the 4 byte header for the message size)
}
if (rm == MsgReadBuffer::RM_INVALID_SIZE) < handle error >
}
Note that a single thread alternates between reading from the socket into the MsgReadBuffer's buffer
and reading the messages out of the MsgReadBuffer's buffer until they have all been read.
BeginReceive() returns a buffer which can be provided to either a synchronous or asynchronous
read call on a socket. After the read on the socket has completed EndReceive() is called
providing the actual number of bytes that were read from the socket.
After a call to EndReceive(), calls to Read() are made to read the next message which is available
if any. Available means the message has been received in its entirety and exists in a single range in
memory.
Read() must not be called between calls to BeginReceive() and EndReceive().
*/
class cxMessage_API MsgReadBuffer
{
cxNotCloneable(MsgReadBuffer)
public:
explicit MsgReadBuffer(const MessageReaderSettings& settings);
struct Buffer
{
octet_t* buffer;
ssize_t len;
};
bool HaveUnreadData() const
{
return messagePos_ != receivedPos_;
}
/*
Receiving of data from the transport layer to be written to a buffer in memory involves paired
calls to BeginReceive() and EndReceive().
BeginReceive() returns a buffer to be written with data. If the returned buffer is empty then
there is no space available.
It is allowable to write any prefix of the returned buffer (including none at all).
After the received data has been written to the buffer EndReceive() should be called with 'len'
equal to the number of octets which were actually written. If no data was received then calling
EndReceive(0) is optional.
The buffer returned by BeginReceive() must not be accessed after a subsequent call to EndReceive() or
Read().
*/
Buffer BeginReceive();
void EndReceive(ssize_t len);
enum EReadMsg
{
RM_OK, // message read successfully
RM_NONE, // no message available in the buffer
RM_PARTIAL, // only a non-empty partial message available in the buffer
RM_INVALID_SIZE // Message header has an invalid size
// A valid size is in the range [0,settings_.maxMsgSize]
};
// Try to read the next message from the buffer. If returns RM_OK then b provides the message
// that was read.
EReadMsg Read(Buffer& b);
private:
void AssertInvariant() const
{
cxAssert(!buffer_.empty());
cxAssert(0 <= messagePos_);
cxAssert(messagePos_ <= receivedPos_);
cxAssert(receivedPos_ <= buffer_.size());
}
private:
MessageReaderSettings settings_;
/*
There is a single buffer used. At any time a prefix of the buffer contains data that has
already been received. A single pending I/O request is normally made for the suffix
corresponding to the part that hasn't been received yet. There is however possibly a
constraint on the maximum size of any I/O request.
Within the buffer there is a section corresponding to the current message we are reading.
If enough data has been received we can read the message header which specifies the size
of the message body. On that basis we can increase the size of the buffer if necessary
to allow the entire message to fit in the buffer.
The implementation treats the section [0,messagePos_) as already processed, and therefore
no longer required. Therefore where appropriate it may move the portion of the current
message that has been received back to position 0.
The implementation tries to avoid unnecessary increases in the buffer size, and memmove
operations.
buffer_
[---------------------------------------------------------------------)
received
[------------------------------------------)
current message
[---------------------...
| | | |
0 messagePos_ receivedPos_ buffer_.size()
Invariant:
0 <= messagePos_ <= receivedPos_ <= buffer_.size()
*/
// buffer implemented in terms of VirtualAlloc
VirtualAllocBuffer buffer_;
ssize_t receivedPos_; // Marks portion of buffer_ that has been received (i.e. filled
// with data read from the socket)
// Information about the current message we are reading
ssize_t messagePos_; // Index position in buffer_ of the start (i.e. header)
// of the current message being read.
// Always well defined.
ssize_t bodySize_; // -1 if not known (i.e. haven't yet read the header which
// records the message size)
};
} // namespace ceda
#endif // include guard