RmiTwoClients.cpp
// RmiTwoClients.cpp
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2011
@import "Ceda/cxRmi/RmiConnection.h"
#include "Ceda/cxUtils/HPTime.h"
#include "Ceda/cxUtils/TracerUtils.h"
#include "Ceda/cxUtils/Tracer.h"
/*
The example below uses TEST_EARLY_CLOSE_OF_SERVER to enable a case whether the server is closed
while the clients are sending messages through the client side RMI stubs.
This requires the AddRef()/DecRef() calls on the client sessions to keep them alive even though
- the underlying sockets have encountered errors
- the client session mgr has called RequestClose()
- the sockets have actually been closed.
Without the AddRef()/DecRef() calls there is an access violation (with loop variable i = 1001)
because objects have been deleted.
Auto-shutdown of a session
--------------------------
The following steps occur when a write error occurs on the socket connection
1. There is a write error on the socket connection
2. TcpMsgWriter calls OnWriteFailure() on its handler:
void MultiplexedMsgConnection::OnWriteFailure(TcpMsgConnection* c, ESocketCall sc, int windowsErrorCode)
{
m_handler->OnWriteFailure(this,sc,windowsErrorCode);
}
3. void MEndPoint::OnWriteFailure(MultiplexedMsgConnection* c, ESocketCall sc, int windowsErrorCode)
{
m_handler->OnFailure(this, c, MCSF_Write, sc, windowsErrorCode);
}
4. void BlockingSingleSessionCreator<Session>::OnFailure(MEndPoint* endpoint, MultiplexedMsgConnection* c, EMEndPointFailure mcsf, ESocketCall sc, int windowsErrorCode)
{
if (sc != SC_WriteTemporarilyStopped)
{
if (sc != SC_ReadValidEndOfStream)
{
Tracer() << "Failure : mcsf = " << mcsf << " sc = " << sc << " windowsErrorCode = " << windowsErrorCode << '\n';
}
if (c)
{
RequestClose(c);
}
}
}
5. void MultiplexedMsgConnection::RequestClose()
{
m_handler->OnRequestClose(this);
}
6. void MEndPoint::OnRequestClose(MultiplexedMsgConnection* c)
{
CloseConnection(c);
}
7. void MEndPoint::CloseConnection(MultiplexedMsgConnection* c)
{
bool needClose = false;
{
std::lock_guard<std::mutex> lock(mutex_);
if (m_ep)
{
ssize_t numDeleted = m_connections.FindAndErase(c);
needClose = (numDeleted == 1);
if (needClose)
if (!m_connecting) TryConnect();
}
}
if (needClose) ceda::Close(c);
}
8. void MultiplexedMsgConnection::Close()
{
if (m_c) ceda::Close(m_c);
else delete this;
}
9. void TcpMsgConnection::Close()
{
m_protSocket.Close();
Release();
}
10. void TcpMsgConnection::Release()
{
if (--m_refCount == 0) delete this;
}
11. TcpMsgConnection::~TcpMsgConnection()
{
m_iocp->DecIocpRef();
m_handler->OnRelease(this);
}
12. void MultiplexedMsgConnection::OnRelease(TcpMsgConnection* c)
{
for (ssize_t i = 0 ; i < m_writers.size() ; ++i)
{
m_writers[i].m_writer->ReleaseWriter();
}
m_handler->OnRelease(this);
delete this;
}
13. void MEndPoint::OnRelease(MultiplexedMsgConnection* c)
{
{
std::lock_guard<std::mutex> lock(mutex_); // Is this needed?
m_handler->EndConnection(this,c);
}
--m_blockWhileUsed;
}
Lifetime management of the readers and writers
----------------------------------------------
The MultiplexedMsgConnection holds ref counts on its readers and writers.
struct IMessageWriter
{
virtual void ReleaseWriter() = 0;
virtual ssize_t GetNextMessage(MessageId& id) = 0;
virtual void WritePayload(octet_t* payload, ssize_t payloadSize) = 0;
};
struct IMessageReader
{
virtual void ReleaseReader() = 0;
virtual bool ReadMessage(const octet_t* payload, ssize_t payloadSize, MessageId id) = 0;
};
RmiCallee and RmiCaller each represent both a reader and a writer. They use a threadsafe ref count
ThreadSafeCount m_refCount;
This is initialised to 2 in the constructor.
void RmiCallee::Release()
{
if (--m_refCount == 0)
{
delete this;
}
}
void RmiCallee::ReleaseWriter()
{
Release();
}
void RmiCallee::ReleaseReader()
{
Release();
}
void RmiCaller::Release()
{
if (--m_refCount == 0)
{
delete this;
}
}
void RmiCaller::ReleaseWriter()
{
Release();
}
void RmiCaller::ReleaseReader()
{
Release();
}
So we see that rather conveniently, every RmiCallee/RmiCaller added to a MultiplexedMsgConnection
is automatically deleted by the MultiplexedMsgConnection.
The readers and writers are only released when MultiplexedMsgConnection destructs, ensuring the
readers/writers live long enough to allow the MultiplexedMsgConnection to work correctly while it
is running.
A useful principle
------------------
We often create a framework object and plug a handler into it. The handler is released when the
framework object destructs. This neatly ensures the framework object calls release after every
other method. It also means the framework object owns the handler.
It's appropriate to ref count the handler if and only if it can be plugged multiple times into
framework objects (indeed this happens for the RmiCaller and RmiCallee which are plugged into the
MultiplexedMsgConnection 2x).
A thread-safe ref count (using Interlocked API) which is used to 'delete this' when counter falls
to 0 is a very common and valid approach.
It's useful to draw an object tree diagram with edges according to strong references. E.g.
TcpMsgConnection
|
|
| ITcpMsgHandler
+-------1-1--->----+ MultiplexedMsgConnection
|
| IMessageWriter
| & IMessageReader
+-------1-*---->---- RmiCaller
| |
| +-------1-1---->---- Stub
| |
| |
| | IRmiCallerHandler
| +-------1-1---->---- RmiSessionMixin<ISend,IReceive>
|
| IMessageWriter
| & IMessageReader
+-------1-*---->---- RmiCallee
| |
| +-------1-1---->---- Skeleton
|
|
| APPLICATION
| |
| -->--|
| |
| IMultiplexedMsgHandler |
+---------*-1------>----- MEndPoint
/ |
/ |
TcpClient/TcpServer / |
| / |
| ISocketConnectionAcceptor / | IMEndPointHandler
------------1-1----->------/ +---------1-1--->--- MySessionMgr
Notes:
- When TcpMsgConnection ref count falls to 0 it performs 'delete this'
- When MultiplexedMsgConnection is released it performs 'delete this'
- When RmiCaller is released 2x it performs 'delete this'
- When RmiCallee is released 2x it performs 'delete this'
- When RmiSessionMixin<ISend,IReceive> is released it performs 'delete this'
- When MySessionMgr is released it performs 'delete this'
- When TcpMsgConnection destructs it releases its MultiplexedMsgConnection
- When MultiplexedMsgConnection destructs it releases all readers, writers and its handler (MEndPoint)
- When MEndPoint destructs it releases its handler (MySessionMgr)
- When RmiCaller destructs it deletes the stub and releases its handler (RmiSessionMixin<ISend,IReceive>)
- When RmiCallee destructs it deletes the skeleton
- MEndPoint must be closed. Close() blocks until
- no more connections from ISocketConnectionAcceptor
- has been released by all of its MultiplexedMsgConnections
Conclusion: This ref-count ownership diagram is accurate. Note that TcpMsgConnections own
their MEndPoint, not vice versa!!!! MEndPoint only *manages* its TcpMsgConnections, but it
doesn't own them in the sense of ref counts.
Actually this is not quite true. The MEndPoint is responsible for calling Close() on each
of its MultiplexedMsgConnection. This forwards on Close() to the TcpMsgConnection. Now
Close() is associated with a ref count.
MultiplexedMsgConnection hides the TcpMsgConnection from clients, and yet it is owned by the
TcpMsgConnection! MultiplexedMsgConnection::Close() calls TcpMsgConnection::Close() which
eventually results in ~TcpMsgConnection() which in turn causes ~MultiplexedMsgConnection().
IDEA: Make the application, which is making calls using the stub (managed by an RmiCaller),
hold a strong ref on the TcpMsgConnection. This prevents everything from being destroyed while
the application is making calls through the RmiCaller.
Socket error handling
---------------------
- Socket related errors originating from TcpClient/TcpServer are passed onto to the
IMEndPointHandler.
MEndPoint processes OnConnectFailure() messages, typically causing new connections
to be attempted.
- Socket related errors originating from TcpMsgConnection are passed onto the handler
MultiplexedMsgConnection, then onto the handler MEndPoint then onto the handler
MySessionMgr.
These errors are passed through without taking any action, to allow MySessionMgr to have
proper control over the policy.
ISocketConnectionAcceptor has no release method
-----------------------------------------------
TcpClient/TcpServer
|
|
| ISocketConnectionAcceptor
+--------------------------- MEndPoint
ISocketConnectionAcceptor has no release method! So it's not following our "framework objects
own their handlers" pattern at all.
struct ISocketConnectionAcceptor
{
virtual void OnConnect(ITcpEndpoint* ep, SocketHandle socket) = 0;
virtual void OnConnectFailure(ITcpEndpoint* ep,ESocketCall sc, int windowsErrorCode, bool closed) = 0;
};
Working set synchronisation
---------------------------
Consider the TwoWorkingSets() example in exOperation. This creates MClient and MServer like this:
// Create server and client
ceda::MServer* server;
ceda::MClient* client;
{
ceda::CSpaceTxn txn;
server = CreateWsipcServer(iocp, ws1, protocolId, 3000);
client = CreateWsipcClient(iocp, ws2, protocolId, "127.0.0.1", 3000);
}
< generate operations on the two working sets, and find that they synchronise automatically >
Close(server);
Close(client);
Note that this provides no access to the sessions and there is a built-in, simplistic session
manager which is used.
This doesn't make use of the RMI layer. So there is no stub. Instead WsipcSession directly
implements IMessageWriter and IMessageReader in order to be added directly as both a reader and
a writer of a given MultiplexedMsgConnection.
class WsipcSession : public IMessageWriter, public IMessageReader, private IMoreToSend
{
...
}
Message writing is performed by an IOCP thread when more data needs to be sent. There is no
need for an application thread to write outgoing messages. It follows that we don't have to
worry about buffering of messages or stopping an application writer thread during system shutdown.
Alternative to blocking until a connection is established
---------------------------------------------------------
The examples in exRmi typically block the main thread until a connection is established. This
raises questions about how to abort the thread if the user wants to shut down the application.
Furthermore we would rather not make application programmers dedicate threads because of the
blocking calls.
The proper approach is instead to have main() block on some condition which indicates that the
main thread is ready to exit. Then we use the IOCP threads to do the real work.
Consider a very simple example: we create one client, one server. After a connection is
established the client sends a message Sum(10,20). When it gets the response it shows the result.
We need a Session object. The current approach works well -
RmiSessionMixin<class ISend, class IReceive> takes care of setting up the RmiCaller and RiCallee,
providing access to the stub etc.
We need to use mixins to implement the SessionMgr, allowing for the application specific
customisation.
$mixin DefaultSessionMgrMixin<class _Session> : public IMEndPointHandler
{
typedef _Session Session;
virtual bool EnableMoreConnections(MEndPoint* endpoint, ssize_t numCurrentConnections) const
{
return true;
}
virtual MultiplexedMsgConnection* CreateConnection(
MEndPoint* endpoint,
Iocp* iocp,
SocketHandle socket)
{
MultiplexedMsgConnection* c = CreateMultiplexedMsgConnection();
Session* s = CreateSession(c);
OpenConnection(
c,
iocp,
socket,
m_readerSettings,
m_writerSettings);
return c;
}
virtual void BeginConnection(MEndPoint* endpoint, MultiplexedMsgConnection* c)
{
}
virtual void EndConnection(MEndPoint* endpoint, MultiplexedMsgConnection* c)
{
}
virtual void OnFailure(MEndPoint* endpoint, MultiplexedMsgConnection* c, EMEndPointFailure mcsf, ESocketCall sc, int windowsErrorCode)
{
if (sc != SC_WriteTemporarilyStopped)
{
if (sc != SC_ReadValidEndOfStream)
{
Tracer() << "Failure : mcsf = " << mcsf << " sc = " << sc << " windowsErrorCode = " << windowsErrorCode << '\n';
}
if (c)
{
RequestClose(c);
}
}
}
virtual void OnRelease(MEndPoint* endpoint)
{
delete this;
}
MessageReaderSettings m_readerSettings;
MessageWriterSettings m_writerSettings;
};
$mixin SingleSessionMgrMixin : public IMEndPointHandler
{
$$() : m_session(NULL) {}
virtual bool EnableMoreConnections(MEndPoint* endpoint, ssize_t numCurrentConnections) const
{
return numCurrentConnections < 1;
}
Session* CreateSession(MultiplexedMsgConnection* c)
{
m_session = new Session;
m_session->Init(c,false);
return m_session;
}
Session* m_session;
};
$struct MyClientSessionMgr :
mixin
[
ceda::DefaultSessionMgrMixin<MyClientSession>
ceda::SingleSessionMgrMixin
]
{
virtual void BeginConnection(MEndPoint* endpoint, MultiplexedMsgConnection* c)
{
ptr<Ix> stub = m_session->GetStub();
for (int i=0 ; i < 5 ; ++i)
{
stub->PrintInt(i);
}
Tracer() << "PrintInt() issued for 0,1,2,3,4\n";
Tracer() << "Flush\n";
m_session->Flush();
float32 c1 = 0;
float32 c2 = 0;
float32 c3 = 0;
stub->Sum(10,20,c1);
stub->Sum(1,2,c2);
stub->Sum(4,7,c3);
Tracer() << "Sum(10,20), Sum(1,2), Sum(4,7) issued\n";
m_session->MarkAndWait();
Tracer() << "c1 = " << c1 << '\n';
Tracer() << "c2 = " << c2 << '\n';
Tracer() << "c3 = " << c3 << '\n';
m_finished = true;
}
// Allow main thread to poll until we have finished, or else user has hit a key, or else timeout.
bool m_finished;
};
In this case we had no need to record an association between Session object and the
MultiplexedMsgConnection. But what happens with a server?
*/
/*
warning W1003 : $interface IResponse doesn't inherit from IObject
warning W1013 : $interface IResponse not reflected
warning W1003 : $interface IRequest doesn't inherit from IObject
warning W1013 : $interface IRequest not reflected
*/
$warning(disable: 1003 1013)
///////////////////////////////////////////////////////////////////////////////////////////////////
// Request and response protocol
$interface rmi IRequest
{
// Async request to calculate a+b
void Sum([in] ceda::int32 a,[in] ceda::int32 b);
};
$interface rmi IResponse
{
// Async response to prior call to IRequest::Sum()
void SumResult([in] ceda::int32 c);
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// ClientSession
// Client session : caller for IRequest
// callee for IResponse
$struct ClientSession isa IResponse :
mixin
[
ceda::RmiSessionMixin2<IRequest,IResponse>
]
{
cxNotCloneable(ClientSession)
public:
ClientSession() {}
// Implementation of IResponse
void SumResult(ceda::int32 c)
{
Tracer() << " SumResult() called with c= " << c << '\n';
}
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// ServerSession
// Server session : callee for IRequest
// caller for IResponse
$struct ServerSession isa IRequest :
mixin
[
ceda::RmiSessionMixin2<IResponse,IRequest>
]
{
cxNotCloneable(ServerSession)
public:
ServerSession() {}
// Implementation of IRequest
void Sum(ceda::int32 a,ceda::int32 b)
{
Tracer() << " Sum() called with a= " << a << " b= " << b << '\n';
GetStub()->SumResult(a+b);
Flush(); // Flush the message straight away
}
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// RmiTwoClients
/*
Typical output is as follows:
Rmi Two Clients
Waited for connections
Issue Sum(1000,0)
Issue Sum(2000,0)
Issue Sum(1000,1)
Issue Sum(2000,1)
Flush()
Sum() called with a= 1000 b= 0
Sum() called with a= 2000 b= 0
Sum() called with a= 1000 b= 1
SumResult() called with c= 1000
Sum() called with a= 2000 b= 1
SumResult() called with c= 2000
SumResult() called with c= 1001
SumResult() called with c= 2001
Test completed
*/
#define TEST_EARLY_CLOSE_OF_SERVER 1
void RmiTwoClients()
{
Tracer() << "Rmi Two Clients\n{\n";
{
ceda::TraceIndenter indent;
ceda::Iocp* iocp = ceda::CreateIocp();
for (int count = 0 ; count < 5 ; ++count)
{
ceda::SOCKET_PORT port = 3001;
typedef ceda::BlockingSingleSessionCreator<ClientSession> ClientEndPoint;
ClientEndPoint* ccClient1 = new ClientEndPoint();
ClientEndPoint* ccClient2 = new ClientEndPoint();
// Create server and two clients
ceda::SessionCreator<ServerSession>* sc = new ceda::SessionCreator<ServerSession>();
ceda::MServer* server = ceda::CreateMServer(iocp, sc, port);
ceda::MClient* client1 = ceda::CreateMClient(iocp, ccClient1, "127.0.0.1", port);
ceda::MClient* client2 = ceda::CreateMClient(iocp, ccClient2, "127.0.0.1", port);
Tracer() << "Begin wait for connections\n";
// Wait until client side sessions established
ClientSession* cs1 = ccClient1->GetSession();
ClientSession* cs2 = ccClient2->GetSession();
ceda::ptr<IRequest> clientStub1 = cs1->GetStub();
ceda::ptr<IRequest> clientStub2 = cs2->GetStub();
Tracer() << "Waited for connections\n";
#if TEST_EARLY_CLOSE_OF_SERVER
//cs1->AddRef();
//cs2->AddRef();
for (int i=0 ; i < 3 ; ++i)
{
clientStub1->Sum(1000,i);
//Tracer() << "Issue Sum(1000," << i << ")\n";
if (i == 1) Close(server);
clientStub2->Sum(2000,i);
//Tracer() << "Issue Sum(2000," << i << ")\n";
}
//Tracer() << "Before flush1\n";
cs1->Flush();
//Tracer() << "Before flush2\n";
cs2->Flush();
//Tracer() << "After flush2\n";
// Wait long enough for responses to be received by clients.
Sleep(100);
//cs1->DecRef();
//cs2->DecRef();
//Tracer() << "Before close 1\n";
Close(client1);
//Tracer() << "Before close 2\n";
Close(client2);
//Tracer() << "After close 2\n";
#else
for (int i=0 ; i < 10 ; ++i)
{
clientStub1->Sum(1000,i);
//Tracer() << "Issue Sum(1000," << i << ")\n";
clientStub2->Sum(2000,i);
//Tracer() << "Issue Sum(2000," << i << ")\n";
}
Tracer() << "Flush()\n";
cs1->Flush();
cs2->Flush();
// Wait long enough for responses to be received by clients.
Sleep(100);
Close(client1);
Close(client2);
Close(server);
#endif
}
Close(iocp);
}
Tracer() << "}\n";
}