IMsgOrientedInputStream.h
// IMsgOrientedInputStream.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2011
#pragma once
#ifndef Ceda_cxUtils_IMsgOrientedInputStream_H
#define Ceda_cxUtils_IMsgOrientedInputStream_H
#include "cxUtils.h"
/*
This interface was developed at the time we were investigating the idea of input buffering on an
Archive.
An InputArchive associated with a contiguous block of memory makes this whole idea unnecessary.
*/
namespace ceda
{
///////////////////////////////////////////////////////////////////////////////////////////////////
// IMsgOrientedInputStream
/*
An input byte stream.
ReadStream() is passed a range describing the number of bytes we would like to read from the
device. This allows it to represent pure blocking and pure non-blocking behaviour and anywhere in
between in the one function call. For example ReadStream(buf,n,n) represents a pure blocking
call, whereas ReadStream(buf,0,n) represents a pure non-blocking call.
Allowing for input buffering
----------------------------
Consider that the following function is used to read from a stream:
ssize_t BlockingReadStream(void* buffer, ssize_t numBytesRequested)
This function tries to read numBytesRequested from the stream. It returns the actual number of
bytes read which may be less than numBytesRequested, indicating the end of stream. So ignoring end
of stream, this implies that streams representing I/O (e.g. sockets) must block indefinitely until
numBytesRequested is available.
The problem is that an attempt to fill an input buffer in a client using a IInputStream by reading
ahead on a socket may block indefinately on I/O. So a client thread is blocked even though it may
actually have some buffered data available to be processed right away. This is particularly evil
if the stream is broken up into messages sent intermittently and the client thread is stalled even
though there are completed messages which could be deserialised and processed right away.
What we actually need is a version of ReadStream that can return what is already available for
reading (i.e. already buffered by the device), without unnecessarily blocking on I/O. How do we
define whether blocking on I/O is necessary? Consider that we pass a range describing the number
of bytes we would like to read from the device.
ssize_t ReadStream(void* buffer, ssize_t minBytesRequested, ssize_t maxBytesRequested)
In this case, ReadStream() is required to block on I/O indefinitely until minBytesRequested has been
read. Once minBytesRequested bytes has been read, further blocking on I/O is not permitted.
ReadStream() reads any extra bytes available up to maxBytesRequested, so long as that consumes CPU
and only CPU resources. The reason for wanting to maximise CPU work is that it minimises CPU
overheads (which occur each time the client's input buffer is exhausted and so a call to ReadStream
is required).
*/
struct IMsgOrientedInputStream
{
virtual ~IMsgOrientedInputStream() {}
/*
precondition:
0 <= minBytesRequested <= maxBytesRequested
maxBytesRequested > 0 => buffer != nullptr
'buffer' points to a buffer in memory of at least 'maxBytesRequested' bytes.
This one call supports blocking and/or non-blocking behaviours:
blocking:
Try to read at least minBytesRequested from the stream. Streams representing I/O
(e.g. sockets) must block indefinitely until minBytesRequested is available.
non-blocking:
Try to read at most maxBytesRequested from the stream.
The call is pure blocking if minBytesRequested = maxBytesRequested
The call is pure non-blocking if minBytesRequested = 0
Returns the actual number of bytes read which must be between 0 and maxBytesRequested inclusive.
Note therefore that returning a negative value is not permitted. The return value is less than
minBytesRequested if and only if the request for minBytesRequested couldn't be satisfied because
end-of-stream was reached. If minBytesRequested is zero then ReadStream() isn't able to
indicate whether end-of-stream has been reached.
maxBytesRequested-minBytesRequested represents the number of bytes to be read in a non-blocking
mode. i.e. this part of the request is only satisfied (or partially satisfied) according to what
has already been read by the underlying device, and therefore can be copied into the buffer without
blocking on I/O.
Exceptions are thrown to indicate failure of the stream, never for end-of-stream.
*/
virtual ssize_t ReadStream(void* buffer, ssize_t minBytesRequested, ssize_t maxBytesRequested) = 0;
ssize_t BlockingReadStream(void* buffer, ssize_t numBytesRequested)
{
return ReadStream(buffer,numBytesRequested,numBytesRequested);
}
ssize_t NonblockingReadStream(void* buffer, ssize_t numBytesRequested)
{
return ReadStream(buffer,0,numBytesRequested);
}
};
} // namespace ceda
#endif // include guard