A MsgReader represents an SPSC (single producer and single consumer) transient message queue used to buffer received messages. The producer and consumer run in strict alternation.
It can be related to the message buffering used for a message loop:
while(1)
{
MessageBuffer message = get_next_message();
process_message(message);
}
See the wikipedia article on an Event loop [].
Consider a message loop which reads from a socket and processes each message. It can use a MsgReader as follows:
void ReadSocketMessageLoop(Socket socket)
{
MessageReaderSettings s;
MsgReader mr(s);
while(1)
{
// Producer: read from socket and write to the MsgReader
MsgReader::Buffer buffer = mr.BeginReceive();
int len = read_from_socket_into_buffer(socket,buffer);
mr.EndReceive(len);
// Consumer: read messages from the MsgReader
while(1)
{
MsgReader::result = mr.Read(buffer);
if (result != RM_OK) break;
process_message(buffer);
}
}
}
A message loop dedicates a thread which tends to block on I/O (in this case reading from the socket). This is not scalable for servers which support many thousands of concurrent socket connections. In that case asynchronous I/O (e.g. IO completion ports on a Windows platform) is preferable. The MsgReader is well suited to having a pending I/O and on completion processing all the messages.
It is assumed messages are not too large (so each message can fit in memory in a contiguous buffer in memory). Copying the data is avoided for both the producer and consumer by having them write and read the data in place.
By design there is no mutex protecting the MsgReader state. All calls on the public functions must be serialised. Furthermore concurrency between the producer and consumer is not permitted. The write buffer returned to the producer must not be written concurrently with the read buffer returned to the consumer.
The producer provides a raw sequence of octets. It appends data to the MsgReader with calls to BeginReceive() and EndReceive(). The producer writes data without any regard for the message boundaries.
The consumer reads one message at a time with calls to Read()
Represents a buffer returned by MsgReader::BeginReceive() which the producer can use to write data.
struct Buffer
{
octet_t* buffer;
ssize_t len;
};
enum EReadMsg
{
RM_OK,
RM_NONE,
RM_PARTIAL,
RM_INVALID_SIZE
};
Enum tag | Description |
---|---|
RM_OK | Message read successfully |
RM_NONE | No message available in the buffer |
RM_PARTIAL | Only a non-empty partial message available in the buffer |
RM_INVALID_SIZE | Message header has an invalid size |
class MsgReader
{
public:
explicit MsgReader(const MessageReaderSettings& settings);
bool HaveUnreadData() const;
Buffer BeginReceive();
void EndReceive(ssize_t len);
EReadMsg Read(Buffer& b);
private:
[ implementation ]
};
explicit MsgReader(const MessageReaderSettings& settings)
Constructor which is passed the MessageReaderSettings.
bool HaveUnreadData() const
Returns true if there is data which has been written by the producer but not yet read by the consumer.
Buffer BeginReceive()
Receiving of data from the transport layer to be written to a buffer in memory involves paired calls to BeginReceive() and EndReceive().
BeginReceive() returns a buffer to be written with data. If the returned buffer is empty then there is no space available.
It is allowable to write any prefix of the returned buffer (including none at all).
The buffer returned by BeginReceive() must not be accessed after a subsequent call to EndReceive() or Read().
void EndReceive(ssize_t len)
After the received data has been written to the buffer EndReceive() should be called with 'len' equal to the number of octets which were actually written. If no data was received then calling EndReceive(0) is optional.
EReadMsg Read(Buffer& b)
Try to read the next message from the buffer. If returns RM_OK then b provides the message that was read.