MsgWriter

A MsgWriter represents an SPSC (single producer and single consumer) transient message queue used to buffer messages to be sent. The producer and consumer run in strict alternation.

In more detail MsgWriter is responsible for managing a send buffer in user space which is filled with variable length messages and occassionally the filled portion is sent by a stream oriented transport layer such as TCP. Neither the filling of the buffer with messages nor the sending of filled buffers is performed by MsgWriter itself. Rather, the MsgWriter is passive in nature and is only concerned with managing the policy for when it's time to send filled portions - the main reason being that most I/O functions provided by an OS have large overheads (perhaps because they go into kernel mode) and are very inefficient when sending many small buffers rather than a smaller number of large buffers.

The following code illustrates usage:


void WriteMessagesToSocket(Socket socket)
{
    MessageWriterSettings s;
    MsgWriter mw(s);

    while(1)
    {
        // producer:  write messages to buffer in memory
        while( have_messages_to_write() && !mw.ReadyToSend() )
        {
            ssize_t bufferSizeRequested = size_of_next_message();
            Buffer b = mw.BeginWrite(bufferSizeRequested);
            assert(bufferSizeRequested <= b.len);

            // write some prefix of b
            ssize_t numOctetsWritten = write_messages_to_buffer(b);

            assert(numOctetsWritten <= b.len);
            mw.EndWrite(numOctetsWritten);
        }

        // consumer:  send messages using asynchronous or synchronous I/O
        while(mw.ReadyToSend())
        {
            Buffer b = mw.BeginSend();
            ssize_t numOctetsSent = write_buffer_to_socket(socket,b);
            assert(numOctetsSent <= b.len);
            mw.EndSend(numOctetsSent);
        }
    }
}

This function dedicates a thread to writing to a socket and isn't scalable for servers which support many thousands of concurrent socket connections. In that case asynchronous I/O (e.g. IO completion ports on a Windows platform) is preferable. The MsgWriter is well suited to pending I/O and on completion writing more messages to the buffer.

It is assumed messages are not too large (so each message can fit in memory in a contiguous buffer in memory). Copying the data is avoided for both the producer and consumer by having them write and read the data in place.

By design there is no mutex protecting the MsgWriter state. All calls on the public functions must be serialised. Furthermore concurrency between the producer and consumer is not permitted. The write buffer returned to the producer must not be written concurrently with the read buffer returned to the consumer.

The consumer is given a raw sequence of octets for writing to a socket without any regard for the message boundaries.

The producer writes one message at a time with calls to BeginWrite() and EndWrite().

MsgWriter::Buffer

Represents a buffer returned by MsgWriter::BeginWrite() which the producer can use to write data in-place, or a buffer returned by MsgWriter::BeginSend() which the consumer can use to read data in-place.


struct Buffer
{
    octet_t* buffer;
    ssize_t len;
};

MsgWriter


class MsgWriter
{
public:
    explicit MsgWriter(const MessageWriterSettings& settings);
    bool ReadyToSend() const;
    Buffer BeginWrite(ssize_t bufferSizeRequested);
    void EndWrite(ssize_t len);
    Buffer BeginSend() const;
    void EndSend(ssize_t len);
private:
    [ implementation ]
};


explicit MsgWriter(const MessageWriterSettings& settings)

Constructor which is passed the MessageWriterSettings.


bool ReadyToSend() const

Returns true if the buffer is so full that it is appropriate to send what's already there rather than write more messages.


Buffer BeginWrite(ssize_t bufferSizeRequested)

Writing of messages to the buffer in memory involves paired calls to BeginWrite() and EndWrite().

BeginWrite() returns the next buffer to be written (i.e. to be filled) with data. The size of the returned buffer will be at least bufferSizeRequested (but may well be somewhat larger).

It is allowable to write any prefix of the returned buffer (including none at all) and this may exceed what was requested with 'bufferSizeRequested'.

Multiple messages can be written to the buffer - indeed writing as many messages as will fit into the returned buffer may improve performance.

Each message must begin with a 4 byte header which represents the size in bytes of the message body (i.e. not including the 4 byte header).

The buffer returned by BeginWrite() must not be accessed after a subsequent call to EndWrite() or BeginSend().


void EndWrite(ssize_t len)

After the data has been written EndWrite() should be called with 'len' equal to the number of octets which were actually written. If no data was written then calling EndWrite(0) is optional.


Buffer BeginSend() const

Sending of data involves paired calls to BeginSend() and EndSend().

BeginSend() returns the next buffer of data that's been written with data and is ready to be sent. If the returned buffer is empty then there is no data available to be sent.

It is allowable to send any prefix of the returned buffer (including none at all).

The buffer returned by BeginSend() must not be accessed after a subsequent call to EndSend() or BeginWrite().


void EndSend(ssize_t len)

After the data has been sent EndSend() should be called with 'len' equal to the number of octets which were actually sent. If no data was sent then calling EndSend(0) is optional.