RmiTwoClients2.cpp

// RmiTwoClients2.cpp
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2011

@import "Ceda/cxRmi/RmiConnection.h"
#include "Ceda/cxUtils/TracerUtils.h"
#include "Ceda/cxUtils/ThreadBlockerWhileUsed.h"
#include "Ceda/cxUtils/Tracer.h"
#include <algorithm>
#include <mutex>

@if (false)
{

/*
todo: this is a test. and belongs in txRmi
*/

///////////////////////////////////////////////////////////////////////////////////////////////////
// Request and response protocol

$interface rmi IRequest2
{
    // Async request to calculate a+b
    void Sum([in] ceda::int32 a,[in] ceda::int32 b);
};

$interface rmi IResponse2
{
    // Async response to prior call to IRequest::Sum()
    void SumResult([in] ceda::int32 c);
};

$struct ClientSession2;

namespace 
{
std::mutex g_cs;
ceda::xvector<ClientSession2*> g_sessions;

ceda::ThreadBlockerWhileUsed g_callsWithPendingResponses(0);
}


///////////////////////////////////////////////////////////////////////////////////////////////////
// ClientSession

// Client session : caller for IRequest
//                  callee for IResponse 
$struct ClientSession2 /*isa IResponse2*/ : 
    mixin 
    [
        ceda::NoSessionCreationStateMixin2
        ceda::RmiSessionMixin2<IRequest2,IResponse2,IClientSessionManager,false>
    ]
{
    ~$$()
    {
        Tracer() << "ClientSession2::~ClientSession2(), this = " << this 
                 << ", g_sessions.size() = " << g_sessions.size() << ceda::endl;
    }
    // Implementation of IResponse
    void SumResult(int32 c)
    {
        Tracer() << "        SumResult() called with c= " << c << '\n';
        --g_callsWithPendingResponses;
    }
    
    void OnSessionStart()
    {
        std::lock_guard<std::mutex> lock(g_cs);
        Tracer() << "Client " << this << " received OnSessionStart() notification, g_sessions.size = " << g_sessions.size() << "\n";
        g_sessions.push_back(this);
    }
    
    void OnSessionEnd(ESocketCall sc, int windowsErrorCode)
    {
        std::lock_guard<std::mutex> lock(g_cs);
        Tracer() << "ClientSession2::OnSessionEnd() client = " << this << ", removing from g_sessions ["
                 << g_sessions << "]\n";
        ceda::xvector<ClientSession2*>::iterator it = std::find(g_sessions.begin(),g_sessions.end(),this);
        cxAssert(it != g_sessions.end());
        g_sessions.erase(it);
        Tracer() << "ClientSession2::OnSessionEnd(), after erase, g_sessions.size() = " << g_sessions.size() << ceda::endl;
    }
};

///////////////////////////////////////////////////////////////////////////////////////////////////
// ServerSession

// Server session : callee for IRequest
//                  caller for IResponse         
$struct ServerSession2 /*isa IRequest2*/ :
    mixin 
    [
        ceda::NoSessionCreationStateMixin2
        ceda::RmiServerSessionMixin2<IResponse2,IRequest2>
    ]
{
    $$()
    {
    }
    
    // Implementation of IRequest
    void Sum(ceda::int32 a, ceda::int32 b)
    {
        Tracer() << "    Sum() called with a= " << a << "  b= " << b << '\n';
        GetStub()->SumResult(a+b);
        Flush();        // Flush the message straight away
    }
    
    virtual void OnRelease()
    {
        Tracer() << "Would call StopInproc()\n";
        BaseClass::OnRelease();
    }
};


///////////////////////////////////////////////////////////////////////////////////////////////////
// RmiTwoClients

void RmiTwoClients2()
{
    TraceGroup g("Rmi Two Clients 2 example");
    ceda::TraceIndenter indent;
    const int NUM_RUNS = 1000;

    ceda::Iocp* iocp = ceda::CreateIocp();
    for (int count = 0 ; count < NUM_RUNS ; ++count)
    {
        {
            std::lock_guard<std::mutex> lock(g_cs);
            cxAssert(g_sessions.size() == 0);
        }
        ceda::SOCKET_PORT port = 3000;
        
        ceda::SingleClientSessionManager<ClientSession2>* csmClient1 = new ceda::SingleClientSessionManager<ClientSession2>();
        ceda::SingleClientSessionManager<ClientSession2>* csmClient2 = new ceda::SingleClientSessionManager<ClientSession2>();
        
        // This is necessary because we use the CRTP to call back into the MServer2 from the session manager
        /*struct TheServer : public MServer2<
                                    DefaultSessionManager<
                                        FinalBaseMixin<TheServer>, ServerSession2 > >
        {
        };*/
        typedef ceda::DefaultSessionManager< ServerSession2 > SessMgr;
        
        ServerSession2::SessionCreationState scs;
        ceda::MServer2<SessMgr>* server = ceda::CreateMServer2<SessMgr>(scs, iocp, port);
        ceda::MClient2* client1 = ceda::CreateMClient2(iocp, csmClient1, "127.0.0.1", port);
        ceda::MClient2* client2 = ceda::CreateMClient2(iocp, csmClient2, "127.0.0.1", port);
        
        while(1)
        {
            {
                std::lock_guard<std::mutex> lock(g_cs);
                if (g_sessions.size() >= 2) { break; }
                else
                {
                    Tracer() << "##############  g_sessions.size() = " << g_sessions.size() << ceda::endl;
                }
            }
            Sleep(1);
        }
        
        // note 1 & 2 might be the reverse of what is expected.  If this mattered, a map could be used
        ceda::ClientSession2* cs1 = g_sessions[0];
        ceda::ClientSession2* cs2 = g_sessions[1];
        
        ceda::ptr<IRequest2> clientStub1 = cs1->GetStub();
        ceda::ptr<IRequest2> clientStub2 = cs2->GetStub();

        Tracer() << "Waited for connections\n";

        for (int i=0 ; i < 2 ; ++i)
        {
            clientStub1->Sum(1000 + count,i);
            ++g_callsWithPendingResponses;
            
            Tracer() << "Issue Sum(" << 1000+count << ',' << i << ")\n";
            
            clientStub2->Sum(2000 + count,i);
            ++g_callsWithPendingResponses;
            
            Tracer() << "Issue Sum(" << 2000+count << ',' << i << ")\n";
        }
        
        Tracer() << "Flush()\n";
        cs1->Flush();
        cs2->Flush();
        
        // Wait for responses to be received by clients.
        g_callsWithPendingResponses.Wait();
        
        Close(client1);
        Tracer() << "Client1 closed, closing client2 " << client2 << "\n";
        Close(client2);
        Tracer() << "Client2 closed\n";

        Close(server);
        Tracer() << "Server " << server << " closed\n";
    }
    Close(iocp);
    
    Tracer() << "Test completed\n";
}

}