TcpMsg.h

// TcpMsg.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2020

#ifndef Ceda_cxMessage_TcpMsg_H
#define Ceda_cxMessage_TcpMsg_H

#include "cxMessage.h"
#include "Ceda/cxUtils/BasicTypes.h"
#include "Ceda/cxUtils/CedaAssert.h"
#include "Ceda/cxMessage/cxMessage.h"
#include <system_error>

/*
[
This one self-contained header encompasses the relatively complicated and messy API from the old cxMessage such as
    CreateIocp/Close
    ITcpEndpoint
    ISocketConnectionAcceptor
    CreateTcpServer
    CreateTcpClient
    ITcpMsgHandler
    CreateTcpMsgConnection, StartReading, NotifyMoreMessagesToWrite
    EMEndPointFailure, IMEndPointHandler, CreateMEndPoint, Start, NotifyEnableMoreConnections, CreateMServer, CreateMClient,
    SessionCreator<Session>, BlockingSingleSessionCreator<Session>, DefaultSessionManager
]

In all three interfaces ITcpMsgSession, ITcpMsgSessionHandler, ITcpMsgEndPoint it is assumed the methods are
never called concurrently, and Release() is called last and exactly once.

Typically an application will create a single IoContextPool with as many threads as processors, and this is used by
all servers and clients.

It is required that the IoContextPool be closed after all the clients/servers that use it have been closed.

This header is platform independent (e.g. there is no dependency on Windows, Linux and POSIX) and has no boost 
dependency.  We have avoided all mention of sockets in the API.

The cxMessage library implements the following classes or interfaces 
    IoContextPool 
    TcpMsgServer 
    TcpMsgClient 
    ITcpMsgSession.  

Users of cxMessage must implement the following interfaces 
    ITcpMsgSessionHandler
    ITcpMsgEndPoint.

There is no concept of message ids and multiplexing in this layer.  That can be done in the implementation of
ITcpMsgSessionHandler.

Every successful call to CreateIoContextPool() (i.e. that doesn't return nullptr) must be paired with a 
call to Close(IoContextPool*).  Similarly for CreateTcpMsgServer and CreateTcpMsgClient.

MessageWriterSettings
---------------------

These are settings that affect the way messages are written to a connected socket.
This structure has the following members:

  ssize_t lowerHysteresisSendSize
    Default value: 128

    Sets the 'lower water mark' used for preparing messages in memory.  i.e. send requests will
    be sent until the lower water mark is reached (i.e. there is not enough remaining in the
    send buffer for it to be worth sending it without first moving that section back to the
    start of the buffer and seeing if there are more messages that can be prepared in memory).
    
    This value is set according to a compromise between the overhead of issuing a send with a 
    very small buffer, versus the overhead that arises when a memmov is used to copy the 
    remaining unsent data back to the start of the send buffer.
    
  ssize_t upperHysteresisSendSize;
    Default value: 4096

    This is the most important parameter affecting performance.  It specifies the minimum size 
    for sends using the transport layer.

    Sets the 'upper water mark' used for preparing messages in memory. i.e. messages are 
    prepared until the fill position of the buffer exceeds this value.
    
    For very large messages this parameter has no impact on the performance.  When sending many
    small messages the value should be set in order to compromise as follows:
    
    -   Too large a value can increase latency because small messages tend to be buffered 
        according to this value.
    
    -   Too small a value increases the overheads of the send calls
    
  ssize_t normalCommitBufferSize;
    Default value: 8192

    The initial size of the write buffer

  ssize_t sendSizeUntilBackToNormalBufferSize;
    Default value: 1,048,576  (i.e. 1MB)

    A very large message can cause a very large buffer to be allocated.  Since this may be an
    unusual event we want to reset the buffer back to a normal size at an appropriate time
    rather that use an excessive amount of memory for the life of the session.

  ssize_t maxSendSize;
    Default value: 65536

    Imposes an upper bound on the size of a send call.

    It is not clear whether this serves any useful purpose, so it should probably just be set to
    a very large value.

    If maxSendSize is too small then the overheads of memmoves can increase because larger messages 
    tend not to be sent in their entirety in a single send call, so ends of messages have to be 
    moved back to the start of the buffer more frequently.

MessageReaderSettings
---------------------

These are settings that affect the way messages are read from a connected socket.
This structure has the following members:

  ssize_t normalCommitBufferSize;
    The initial size of the read buffer
    Default value: 16384 (i.e. 16 kB)

  ssize_t maxMsgSize;
    The upper bound on the size of the body of an individual message 
    (i.e. not including the 4 byte header giving the message size).
    Therefore it also represents the upper bound on the size of the receive buffer in memory.
    If the size of the body of a message exceeds this value then a fatal SC_ReadInvalidMessageSize 
    error is raised on the connection.
    Making this value extremely large could make the machine vulnerable to running out of memory. 
    Default value: 67,108,864 (i.e. 64 MB)

  ssize_t maxRecvSize;
    The upper bound on the size of the receive buffer provided in the receive socket calls. 
    Default value: 262,144 (i.e. 256 kB)

TcpSettings
-----------

These are settings for the socket connections:

  ssize_t sendBufSize;
    Default value: -1
    SO_SNDBUF setting
    -1 means use the system default

  ssize_t recBufSize;
    Default value: -1
    SO_RCVBUF setting
    -1 means use the system default

  bool enableNagle;
    Default value: false
    If false then TCP_NODELAY option is set

  bool enableKeepAlive;
    Default value: false
    If true then SO_KEEPALIVE option is set

  int keepalivetime;          
    Default value: 7,200,000 (i.e. 2 hours)
    Time for next keep alive in millisec

  int keepaliveinterval;      
    Default value: 1000 (i.e. 1 second)
    Retranmission timeout in milliseconds if ACK not received
    Typically after 5 failures an error will be reported.

Error codes
-----------

Besides various system error codes, the following error codes may be generated:

    MakeCedaErrorCode(ECedaErrorCode::ReadValidEndOfStream)
        When the peer's implementation of GetSizeOfNextMessageToWrite returned -2
        indicating there are no more messages, causing EOF to be read on the socket

    MakeCedaErrorCode(ECedaErrorCode::ReadInvalidEndOfStream)
        When EOF was read on the socket part way through reading a message
        (this should never happen with a valid peer)

    MakeCedaErrorCode(ECedaErrorCode::ReadInvalidMessageSize)
        A message was received with an unexpected size
*/

namespace ceda
{
class IoContextPool;
class TcpMsgServer;
class TcpMsgClient;

struct MessageWriterSettings
{
    MessageWriterSettings()
    {
        lowerHysteresisSendSize = 128;
        upperHysteresisSendSize = 4096;
        normalCommitBufferSize = 8192;
        sendSizeUntilBackToNormalBufferSize = 1024*1024;
        maxSendSize = 65536;
    }

    void AssertInvariant() const
    {
        cxAssert(0 < lowerHysteresisSendSize);
        cxAssert(lowerHysteresisSendSize <= upperHysteresisSendSize);
        cxAssert(upperHysteresisSendSize <= normalCommitBufferSize);
        cxAssert(normalCommitBufferSize <= sendSizeUntilBackToNormalBufferSize);
        cxAssert(normalCommitBufferSize <= maxSendSize);
    }

    ssize_t lowerHysteresisSendSize;
    ssize_t upperHysteresisSendSize;
    ssize_t normalCommitBufferSize;
    ssize_t sendSizeUntilBackToNormalBufferSize;
    ssize_t maxSendSize;
};

struct MessageReaderSettings
{
    MessageReaderSettings()
    {
        normalCommitBufferSize = 16*1024;
        maxMsgSize = 64*1024*1024;
        maxRecvSize = 256*1024;
    }

    ssize_t normalCommitBufferSize;
    ssize_t maxMsgSize;
    ssize_t maxRecvSize;
};

enum class EProtocol
{
    TCP_IPv4,
    TCP_IPv6
};

struct TcpSettings
{
    TcpSettings() :
        sendBufSize(-1),
        recBufSize(-1),
        enableNagle(false),
        enableKeepAlive(false),
        keepalivetime(2*3600*1000),
        keepaliveinterval(1*1000)
    {
    }

    ssize_t sendBufSize;
    ssize_t recBufSize;
    bool enableNagle;
    bool enableKeepAlive;
    int keepalivetime;
    int keepaliveinterval;
};

struct TcpMsgSessionSettings
{
    TcpSettings tcpSettings;
    MessageReaderSettings msgReaderSettings;
    MessageWriterSettings msgWriterSettings;
};

enum class ESessionFailureType
{
    Session,
    SessionRead,
    SessionWrite
};

struct ITcpMsgSession
{
    virtual void RequestClose() = 0;
    virtual void OnMoreToSend() = 0;
};

struct WriteMessageResult
{
    enum
    {
        NoMessage = -1,
        ShutdownSending = -2
    };

    /*
        > 0                 payloadSize represents the size of the message just written by the call to 
                            WriteMessage(). This must not exceed payloadSizeAvailable.
    
        0                   No message was written - either because there is none currently 
                            available or because payloadSizeAvailable was too small.
    */
    ssize_t payloadSize;

    /*
        >= 0                Indicates a request for another call to WriteMessage with
                            payloadSizeAvailable (at least) as large as nextPayloadSize
                        
        NoMessage           Currently there are no more messages ready to be sent.
        (-1)
        
        ShutdownSending     There are no more messages to be sent, and there never will be.
        (-2)                (this causes a shutdown of sending on the associated socket so
                            that the other end will read end-of-stream)
    */
    ssize_t nextPayloadSize;
};

struct ITcpMsgSessionHandler
{
    virtual void Release() = 0;
    virtual void OnFailure(ESessionFailureType type, std::error_code ec) = 0;
    virtual std::error_code ReadMessage(const octet_t* payload, ssize_t payloadSize) = 0;

    /*
    Try to write a /single/ message to the given buffer which has size payloadSizeAvailable.

    The returned WriteMessageResult provides information about the message that was written
    and whether there are more messages to write.
        
    Must not throw an exception.
    */
    virtual WriteMessageResult WriteMessage(octet_t* payload, ssize_t payloadSizeAvailable) = 0;
};

enum class EEndPointFailureType
{
    Open,
    Close,
    SetOption,
    Bind,
    Listen,
    Accept,
    Connect
};

struct ITcpMsgEndPoint
{
    virtual void Release() = 0;
    virtual void OnFailure(EEndPointFailureType type, std::error_code ec) = 0;
    virtual ITcpMsgSessionHandler* CreateSessionHandler(ITcpMsgSession&) = 0;

    // Called as the number of sessions managed by the end point increases or decreases
    virtual void UpdateSessionCount(ssize_t numSessions) {};
};

// If pool is null then use the default IoContextPool
cxMessage_API TcpMsgServer* CreateTcpMsgServer(
    EProtocol protocol,
    int port,
    bool reuse_addr, 
    ITcpMsgEndPoint& ep,
    const TcpMsgSessionSettings& sessionSettings,
    IoContextPool* pool = nullptr);

cxMessage_API void Close(TcpMsgServer*);

// If pool is null then use the default IoContextPool
// For meaning of host and service consult boost documentation on basic_resolver::resolve method
cxMessage_API TcpMsgClient* CreateTcpMsgClient(
    const char* host,
    const char* service,
    ITcpMsgEndPoint& ep,
    const TcpMsgSessionSettings& sessionSettings,
    IoContextPool* pool = nullptr);

cxMessage_API void Close(TcpMsgClient*);

} // namespace ceda

#endif // include guard