RmiTwoClients2.cpp
// RmiTwoClients2.cpp
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2011
@import "Ceda/cxRmi/RmiConnection.h"
#include "Ceda/cxUtils/TracerUtils.h"
#include "Ceda/cxUtils/ThreadBlockerWhileUsed.h"
#include "Ceda/cxUtils/Tracer.h"
#include <algorithm>
#include <mutex>
@if (false)
{
/*
todo: this is a test. and belongs in txRmi
*/
///////////////////////////////////////////////////////////////////////////////////////////////////
// Request and response protocol
$interface rmi IRequest2
{
// Async request to calculate a+b
void Sum([in] ceda::int32 a,[in] ceda::int32 b);
};
$interface rmi IResponse2
{
// Async response to prior call to IRequest::Sum()
void SumResult([in] ceda::int32 c);
};
$struct ClientSession2;
namespace
{
std::mutex g_cs;
ceda::xvector<ClientSession2*> g_sessions;
ceda::ThreadBlockerWhileUsed g_callsWithPendingResponses(0);
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// ClientSession
// Client session : caller for IRequest
// callee for IResponse
$struct ClientSession2 /*isa IResponse2*/ :
mixin
[
ceda::NoSessionCreationStateMixin2
ceda::RmiSessionMixin2<IRequest2,IResponse2,IClientSessionManager,false>
]
{
~$$()
{
Tracer() << "ClientSession2::~ClientSession2(), this = " << this
<< ", g_sessions.size() = " << g_sessions.size() << ceda::endl;
}
// Implementation of IResponse
void SumResult(int32 c)
{
Tracer() << " SumResult() called with c= " << c << '\n';
--g_callsWithPendingResponses;
}
void OnSessionStart()
{
std::lock_guard<std::mutex> lock(g_cs);
Tracer() << "Client " << this << " received OnSessionStart() notification, g_sessions.size = " << g_sessions.size() << "\n";
g_sessions.push_back(this);
}
void OnSessionEnd(ESocketCall sc, int windowsErrorCode)
{
std::lock_guard<std::mutex> lock(g_cs);
Tracer() << "ClientSession2::OnSessionEnd() client = " << this << ", removing from g_sessions ["
<< g_sessions << "]\n";
ceda::xvector<ClientSession2*>::iterator it = std::find(g_sessions.begin(),g_sessions.end(),this);
cxAssert(it != g_sessions.end());
g_sessions.erase(it);
Tracer() << "ClientSession2::OnSessionEnd(), after erase, g_sessions.size() = " << g_sessions.size() << ceda::endl;
}
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// ServerSession
// Server session : callee for IRequest
// caller for IResponse
$struct ServerSession2 /*isa IRequest2*/ :
mixin
[
ceda::NoSessionCreationStateMixin2
ceda::RmiServerSessionMixin2<IResponse2,IRequest2>
]
{
$$()
{
}
// Implementation of IRequest
void Sum(ceda::int32 a, ceda::int32 b)
{
Tracer() << " Sum() called with a= " << a << " b= " << b << '\n';
GetStub()->SumResult(a+b);
Flush(); // Flush the message straight away
}
virtual void OnRelease()
{
Tracer() << "Would call StopInproc()\n";
BaseClass::OnRelease();
}
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// RmiTwoClients
void RmiTwoClients2()
{
TraceGroup g("Rmi Two Clients 2 example");
ceda::TraceIndenter indent;
const int NUM_RUNS = 1000;
ceda::Iocp* iocp = ceda::CreateIocp();
for (int count = 0 ; count < NUM_RUNS ; ++count)
{
{
std::lock_guard<std::mutex> lock(g_cs);
cxAssert(g_sessions.size() == 0);
}
ceda::SOCKET_PORT port = 3000;
ceda::SingleClientSessionManager<ClientSession2>* csmClient1 = new ceda::SingleClientSessionManager<ClientSession2>();
ceda::SingleClientSessionManager<ClientSession2>* csmClient2 = new ceda::SingleClientSessionManager<ClientSession2>();
// This is necessary because we use the CRTP to call back into the MServer2 from the session manager
/*struct TheServer : public MServer2<
DefaultSessionManager<
FinalBaseMixin<TheServer>, ServerSession2 > >
{
};*/
typedef ceda::DefaultSessionManager< ServerSession2 > SessMgr;
ServerSession2::SessionCreationState scs;
ceda::MServer2<SessMgr>* server = ceda::CreateMServer2<SessMgr>(scs, iocp, port);
ceda::MClient2* client1 = ceda::CreateMClient2(iocp, csmClient1, "127.0.0.1", port);
ceda::MClient2* client2 = ceda::CreateMClient2(iocp, csmClient2, "127.0.0.1", port);
while(1)
{
{
std::lock_guard<std::mutex> lock(g_cs);
if (g_sessions.size() >= 2) { break; }
else
{
Tracer() << "############## g_sessions.size() = " << g_sessions.size() << ceda::endl;
}
}
Sleep(1);
}
// note 1 & 2 might be the reverse of what is expected. If this mattered, a map could be used
ceda::ClientSession2* cs1 = g_sessions[0];
ceda::ClientSession2* cs2 = g_sessions[1];
ceda::ptr<IRequest2> clientStub1 = cs1->GetStub();
ceda::ptr<IRequest2> clientStub2 = cs2->GetStub();
Tracer() << "Waited for connections\n";
for (int i=0 ; i < 2 ; ++i)
{
clientStub1->Sum(1000 + count,i);
++g_callsWithPendingResponses;
Tracer() << "Issue Sum(" << 1000+count << ',' << i << ")\n";
clientStub2->Sum(2000 + count,i);
++g_callsWithPendingResponses;
Tracer() << "Issue Sum(" << 2000+count << ',' << i << ")\n";
}
Tracer() << "Flush()\n";
cs1->Flush();
cs2->Flush();
// Wait for responses to be received by clients.
g_callsWithPendingResponses.Wait();
Close(client1);
Tracer() << "Client1 closed, closing client2 " << client2 << "\n";
Close(client2);
Tracer() << "Client2 closed\n";
Close(server);
Tracer() << "Server " << server << " closed\n";
}
Close(iocp);
Tracer() << "Test completed\n";
}
}