TcpMsg.h
// TcpMsg.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2020
#ifndef Ceda_cxMessage_TcpMsg_H
#define Ceda_cxMessage_TcpMsg_H
#include "cxMessage.h"
#include "Ceda/cxUtils/BasicTypes.h"
#include "Ceda/cxUtils/CedaAssert.h"
#include "Ceda/cxMessage/cxMessage.h"
#include <system_error>
/*
[
This one self-contained header encompasses the relatively complicated and messy API from the old cxMessage such as
CreateIocp/Close
ITcpEndpoint
ISocketConnectionAcceptor
CreateTcpServer
CreateTcpClient
ITcpMsgHandler
CreateTcpMsgConnection, StartReading, NotifyMoreMessagesToWrite
EMEndPointFailure, IMEndPointHandler, CreateMEndPoint, Start, NotifyEnableMoreConnections, CreateMServer, CreateMClient,
SessionCreator<Session>, BlockingSingleSessionCreator<Session>, DefaultSessionManager
]
In all three interfaces ITcpMsgSession, ITcpMsgSessionHandler, ITcpMsgEndPoint it is assumed the methods are
never called concurrently, and Release() is called last and exactly once.
Typically an application will create a single IoContextPool with as many threads as processors, and this is used by
all servers and clients.
It is required that the IoContextPool be closed after all the clients/servers that use it have been closed.
This header is platform independent (e.g. there is no dependency on Windows, Linux and POSIX) and has no boost
dependency. We have avoided all mention of sockets in the API.
The cxMessage library implements the following classes or interfaces
IoContextPool
TcpMsgServer
TcpMsgClient
ITcpMsgSession.
Users of cxMessage must implement the following interfaces
ITcpMsgSessionHandler
ITcpMsgEndPoint.
There is no concept of message ids and multiplexing in this layer. That can be done in the implementation of
ITcpMsgSessionHandler.
Every successful call to CreateIoContextPool() (i.e. that doesn't return nullptr) must be paired with a
call to Close(IoContextPool*). Similarly for CreateTcpMsgServer and CreateTcpMsgClient.
MessageWriterSettings
---------------------
These are settings that affect the way messages are written to a connected socket.
This structure has the following members:
ssize_t lowerHysteresisSendSize
Default value: 128
Sets the 'lower water mark' used for preparing messages in memory. i.e. send requests will
be sent until the lower water mark is reached (i.e. there is not enough remaining in the
send buffer for it to be worth sending it without first moving that section back to the
start of the buffer and seeing if there are more messages that can be prepared in memory).
This value is set according to a compromise between the overhead of issuing a send with a
very small buffer, versus the overhead that arises when a memmov is used to copy the
remaining unsent data back to the start of the send buffer.
ssize_t upperHysteresisSendSize;
Default value: 4096
This is the most important parameter affecting performance. It specifies the minimum size
for sends using the transport layer.
Sets the 'upper water mark' used for preparing messages in memory. i.e. messages are
prepared until the fill position of the buffer exceeds this value.
For very large messages this parameter has no impact on the performance. When sending many
small messages the value should be set in order to compromise as follows:
- Too large a value can increase latency because small messages tend to be buffered
according to this value.
- Too small a value increases the overheads of the send calls
ssize_t normalCommitBufferSize;
Default value: 8192
The initial size of the write buffer
ssize_t sendSizeUntilBackToNormalBufferSize;
Default value: 1,048,576 (i.e. 1MB)
A very large message can cause a very large buffer to be allocated. Since this may be an
unusual event we want to reset the buffer back to a normal size at an appropriate time
rather that use an excessive amount of memory for the life of the session.
ssize_t maxSendSize;
Default value: 65536
Imposes an upper bound on the size of a send call.
It is not clear whether this serves any useful purpose, so it should probably just be set to
a very large value.
If maxSendSize is too small then the overheads of memmoves can increase because larger messages
tend not to be sent in their entirety in a single send call, so ends of messages have to be
moved back to the start of the buffer more frequently.
MessageReaderSettings
---------------------
These are settings that affect the way messages are read from a connected socket.
This structure has the following members:
ssize_t normalCommitBufferSize;
The initial size of the read buffer
Default value: 16384 (i.e. 16 kB)
ssize_t maxMsgSize;
The upper bound on the size of the body of an individual message
(i.e. not including the 4 byte header giving the message size).
Therefore it also represents the upper bound on the size of the receive buffer in memory.
If the size of the body of a message exceeds this value then a fatal SC_ReadInvalidMessageSize
error is raised on the connection.
Making this value extremely large could make the machine vulnerable to running out of memory.
Default value: 67,108,864 (i.e. 64 MB)
ssize_t maxRecvSize;
The upper bound on the size of the receive buffer provided in the receive socket calls.
Default value: 262,144 (i.e. 256 kB)
TcpSettings
-----------
These are settings for the socket connections:
ssize_t sendBufSize;
Default value: -1
SO_SNDBUF setting
-1 means use the system default
ssize_t recBufSize;
Default value: -1
SO_RCVBUF setting
-1 means use the system default
bool enableNagle;
Default value: false
If false then TCP_NODELAY option is set
bool enableKeepAlive;
Default value: false
If true then SO_KEEPALIVE option is set
int keepalivetime;
Default value: 7,200,000 (i.e. 2 hours)
Time for next keep alive in millisec
int keepaliveinterval;
Default value: 1000 (i.e. 1 second)
Retranmission timeout in milliseconds if ACK not received
Typically after 5 failures an error will be reported.
Error codes
-----------
Besides various system error codes, the following error codes may be generated:
MakeCedaErrorCode(ECedaErrorCode::ReadValidEndOfStream)
When the peer's implementation of GetSizeOfNextMessageToWrite returned -2
indicating there are no more messages, causing EOF to be read on the socket
MakeCedaErrorCode(ECedaErrorCode::ReadInvalidEndOfStream)
When EOF was read on the socket part way through reading a message
(this should never happen with a valid peer)
MakeCedaErrorCode(ECedaErrorCode::ReadInvalidMessageSize)
A message was received with an unexpected size
*/
namespace ceda
{
class IoContextPool;
class TcpMsgServer;
class TcpMsgClient;
struct MessageWriterSettings
{
MessageWriterSettings()
{
lowerHysteresisSendSize = 128;
upperHysteresisSendSize = 4096;
normalCommitBufferSize = 8192;
sendSizeUntilBackToNormalBufferSize = 1024*1024;
maxSendSize = 65536;
}
void AssertInvariant() const
{
cxAssert(0 < lowerHysteresisSendSize);
cxAssert(lowerHysteresisSendSize <= upperHysteresisSendSize);
cxAssert(upperHysteresisSendSize <= normalCommitBufferSize);
cxAssert(normalCommitBufferSize <= sendSizeUntilBackToNormalBufferSize);
cxAssert(normalCommitBufferSize <= maxSendSize);
}
ssize_t lowerHysteresisSendSize;
ssize_t upperHysteresisSendSize;
ssize_t normalCommitBufferSize;
ssize_t sendSizeUntilBackToNormalBufferSize;
ssize_t maxSendSize;
};
struct MessageReaderSettings
{
MessageReaderSettings()
{
normalCommitBufferSize = 16*1024;
maxMsgSize = 64*1024*1024;
maxRecvSize = 256*1024;
}
ssize_t normalCommitBufferSize;
ssize_t maxMsgSize;
ssize_t maxRecvSize;
};
enum class EProtocol
{
TCP_IPv4,
TCP_IPv6
};
struct TcpSettings
{
TcpSettings() :
sendBufSize(-1),
recBufSize(-1),
enableNagle(false),
enableKeepAlive(false),
keepalivetime(2*3600*1000),
keepaliveinterval(1*1000)
{
}
ssize_t sendBufSize;
ssize_t recBufSize;
bool enableNagle;
bool enableKeepAlive;
int keepalivetime;
int keepaliveinterval;
};
struct TcpMsgSessionSettings
{
TcpSettings tcpSettings;
MessageReaderSettings msgReaderSettings;
MessageWriterSettings msgWriterSettings;
};
enum class ESessionFailureType
{
Session,
SessionRead,
SessionWrite
};
struct ITcpMsgSession
{
virtual void RequestClose() = 0;
virtual void OnMoreToSend() = 0;
};
struct WriteMessageResult
{
enum
{
NoMessage = -1,
ShutdownSending = -2
};
/*
> 0 payloadSize represents the size of the message just written by the call to
WriteMessage(). This must not exceed payloadSizeAvailable.
0 No message was written - either because there is none currently
available or because payloadSizeAvailable was too small.
*/
ssize_t payloadSize;
/*
>= 0 Indicates a request for another call to WriteMessage with
payloadSizeAvailable (at least) as large as nextPayloadSize
NoMessage Currently there are no more messages ready to be sent.
(-1)
ShutdownSending There are no more messages to be sent, and there never will be.
(-2) (this causes a shutdown of sending on the associated socket so
that the other end will read end-of-stream)
*/
ssize_t nextPayloadSize;
};
struct ITcpMsgSessionHandler
{
virtual void Release() = 0;
virtual void OnFailure(ESessionFailureType type, std::error_code ec) = 0;
virtual std::error_code ReadMessage(const octet_t* payload, ssize_t payloadSize) = 0;
/*
Try to write a /single/ message to the given buffer which has size payloadSizeAvailable.
The returned WriteMessageResult provides information about the message that was written
and whether there are more messages to write.
Must not throw an exception.
*/
virtual WriteMessageResult WriteMessage(octet_t* payload, ssize_t payloadSizeAvailable) = 0;
};
enum class EEndPointFailureType
{
Open,
Close,
SetOption,
Bind,
Listen,
Accept,
Connect
};
struct ITcpMsgEndPoint
{
virtual void Release() = 0;
virtual void OnFailure(EEndPointFailureType type, std::error_code ec) = 0;
virtual ITcpMsgSessionHandler* CreateSessionHandler(ITcpMsgSession&) = 0;
// Called as the number of sessions managed by the end point increases or decreases
virtual void UpdateSessionCount(ssize_t numSessions) {};
};
// If pool is null then use the default IoContextPool
cxMessage_API TcpMsgServer* CreateTcpMsgServer(
EProtocol protocol,
int port,
bool reuse_addr,
ITcpMsgEndPoint& ep,
const TcpMsgSessionSettings& sessionSettings,
IoContextPool* pool = nullptr);
cxMessage_API void Close(TcpMsgServer*);
// If pool is null then use the default IoContextPool
// For meaning of host and service consult boost documentation on basic_resolver::resolve method
cxMessage_API TcpMsgClient* CreateTcpMsgClient(
const char* host,
const char* service,
ITcpMsgEndPoint& ep,
const TcpMsgSessionSettings& sessionSettings,
IoContextPool* pool = nullptr);
cxMessage_API void Close(TcpMsgClient*);
} // namespace ceda
#endif // include guard