A MsgReadBuffer represents an SPSC (single producer and single consumer) transient message queue used to buffer received messages. The producer and consumer run in strict alternation.
It is assumed each message begins with a 4 octet header which is a 32 bit integer (in little endian order over the wire) giving the number of octets in the message payload.
It is assumed that messages are small enough that it is reasonable for them to be buffered entirely in a single range in memory. As such the buffer used internally by MsgReadBuffer is automatically resized to allow a large message to fit in memory. Copying the data is avoided for both the producer and consumer by having them write and read the data in place.
MsgReadBuffer is not thread-safe. By design there is no mutex protecting the MsgReadBuffer state. All calls on the public functions must be serialised. Furthermore concurrency between the producer and consumer is not permitted. The write buffer returned to the producer must not be written concurrently with the read buffer returned to the consumer.
It can be related to the message buffering used for a message loop:
while(1)
{
MessageBuffer message = get_next_message();
process_message(message);
}
See the wikipedia article on an Event loop [].
Consider a message loop which reads from a socket and processes each message. It can use a MsgReadBuffer as follows:
void ReadSocketMessageLoop(Socket socket)
{
MessageReaderSettings s;
MsgReadBuffer mr(s);
while(1)
{
// Read from the socket
MsgReadBuffer::Buffer socketBuffer = msgReader.BeginReceive();
int numBytesRead = socket.recv(socketBuffer.buffer, socketBuffer.len);
msgReader.EndReceive(numBytesRead);
// Process all the messages that were fully received
MsgReadBuffer::Buffer msgBuffer;
MsgReadBuffer::EReadMsg rm;
while((rm = msgReader.Read(msgBuffer)) == MsgReadBuffer::RM_OK)
{
// Handle received message in 'msgBuffer'
// (note this doesn't include the 4 byte header for the message size)
}
if (rm == MsgReadBuffer::RM_INVALID_SIZE) < handle error >
}
}
Note that a single thread alternates between reading from the socket into the MsgReadBuffer's buffer and reading the messages out of the MsgReadBuffer's buffer until they have all been read.
BeginReceive() returns a buffer which can be provided to either a synchronous or asynchronous read call on a socket. After the read on the socket has completed EndReceive() is called providing the actual number of bytes that were read from the socket.
After a call to EndReceive(), calls to Read() are made to read the next message which is available if any. Available means the message has been received in its entirety and exists in a single range in memory.
Read() must not be called between calls to BeginReceive() and EndReceive().
A message loop dedicates a thread which tends to block on I/O (in this case reading from the socket). This is not scalable for servers which support many thousands of concurrent socket connections. In that case asynchronous I/O (e.g. IO completion ports on a Windows platform) is preferable. The MsgReadBuffer is well suited to having a pending I/O and on completion processing all the messages.
The producer provides a raw sequence of octets. It appends data to the MsgReadBuffer with calls to BeginReceive() and EndReceive(). The producer writes data without any regard for the message boundaries.
The consumer reads one message at a time with calls to Read()
Represents a buffer returned by MsgReadBuffer::BeginReceive() which the producer can use to write data.
struct Buffer
{
octet_t* buffer;
ssize_t len;
};
enum EReadMsg
{
RM_OK,
RM_NONE,
RM_PARTIAL,
RM_INVALID_SIZE
};
Enum tag | Description |
---|---|
RM_OK | Message read successfully |
RM_NONE | No message available in the buffer |
RM_PARTIAL | Only a non-empty partial message available in the buffer |
RM_INVALID_SIZE | Message header has an invalid size |
class MsgReadBuffer
{
public:
explicit MsgReadBuffer(const MessageReaderSettings& settings);
bool HaveUnreadData() const;
Buffer BeginReceive();
void EndReceive(ssize_t len);
EReadMsg Read(Buffer& b);
private:
[ implementation ]
};
explicit MsgReadBuffer(const MessageReaderSettings& settings)
Constructor which is passed the MessageReaderSettings.
bool HaveUnreadData() const
Returns true if there is data which has been written by the producer but not yet read by the consumer.
Buffer BeginReceive()
Receiving of data from the transport layer to be written to a buffer in memory involves paired calls to BeginReceive() and EndReceive().
BeginReceive() returns a buffer to be written with data. If the returned buffer is empty then there is no space available.
It is allowable to write any prefix of the returned buffer (including none at all).
The buffer returned by BeginReceive() must not be accessed after a subsequent call to EndReceive() or Read().
void EndReceive(ssize_t len)
After the received data has been written to the buffer EndReceive() should be called with 'len' equal to the number of octets which were actually written. If no data was received then calling EndReceive(0) is optional.
EReadMsg Read(Buffer& b)
Try to read the next message from the buffer. If returns RM_OK then b provides the message that was read.