WithoutSockets.cpp

// TwoWorkingSets.cpp
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2008

@import "Ceda/cxOperation/SetOfPrefs.h"
@import "Ceda/cxOperation/UTRoot.h"
@import "Ceda/cxOperation/IWorkingSetMachine.h"
@import "Ceda/cxPersistStore/IPersistStore.h"
@import "Ceda/cxObject/IObjectVisitor.h"
@import "Ceda/cxObject/PrintReflectedVariable.h"
@import "Ceda/cxObject/DGNode.h"
#include "Ceda/cxUtils/CedaAssert.h"
#include "Ceda/cxUtils/Tracer.h"
#include "Ceda/cxUtils/Environ.h"
#include "Ceda/cxUtils/PagedBufferAsArchive.h"
#include "Ceda/cxUtils/TestTimer.h"

namespace x
{
    $variant+ Value
    {
        void;
        bool;
        int32;
        float64;
        ceda::xstring;
        ceda::xvector<Value> array;
        ceda::xmap<ceda::xstring,Value> object;
    };

    Value CreateBill()
    {
        ceda::xmap<ceda::xstring,Value> sarah;
        sarah["name"] = ceda::xstring("Sarah");
        sarah["likes_icecream"] = true;
        sarah["age"] = 6;
        sarah["skill"] = 0.95;
        
        ceda::xvector<Value> billsChildren;
        billsChildren.push_back( Value(sarah) );

        ceda::xmap<ceda::xstring,Value> bill;
        bill["name"] = ceda::xstring("Bill");
        bill["age"] = 27;
        bill["skill"] = 0.85;
        bill["children"] = Value(billsChildren);
        return Value::object(bill);
    }
}

namespace y
{
    $struct Object;
    $struct Array;

    $variant+ Value
    {
        void;
        bool;
        int32;
        float64;
        ceda::xstring;
        ceda::cref<Array>;
        ceda::cref<Object>;
    };

    $struct+ Array isa ceda::IPersistable : model { ceda::xvector<Value> elements; } {};
    $struct+ Object isa ceda::IPersistable : model { ceda::xmap<ceda::xstring,Value> members; } {};

    Object* CreateBill()
    {
        Object* sarah = $new Object;
        sarah->members["name"] = ceda::xstring("Sarah");
        sarah->members["likes_icecream"] = true;
        sarah->members["age"] = 6;
        sarah->members["skill"] = 0.95;
        
        Array* billsChildren = $new Array;
        billsChildren->elements.push_back( ceda::cref<Object>(sarah) );

        Object* bill = $new Object;
        bill->members["name"] = ceda::xstring("Bill");
        bill->members["age"] = 27;
        bill->members["skill"] = 0.85;
        bill->members["children"] = ceda::cref<Array>(billsChildren);
        return bill;
    }

    ceda::xstring GetName(const Object* o)
    {
        return o->members.read()["name"].as<ceda::xstring>();
    }
}


namespace WithoutSocketsEx
{

@if (false)
{
    ///////////////////////////////////////////////////////////////////////////////////////////////////
    /*
        A simple test that 

        1.  Creates a new store
        2.  Creates a PSpace
        3.  Creates two independent working sets named WS1 and WS2 in the same PSpace
        4.  On one working set
                -   a datasource is added as the 0th child under the UT root
                -   operations are performed
        5.  Send a delta
        6.  Print the datasource in the second working set
        7.  Close the two working sets, the PSpace and the PersistStore.
    */
    
    $mixin TestMixin
    {
        using typename BaseClass::TestDS;

        void Run()
        {
            Tracer() << BaseClass::GetTitle() << " example\n";
            ceda::TraceIndenter indent;

            const ceda::xstring UtEntryName("MyApp");
            ceda::xstring path = ceda::GetCedaTestPath("TestInteract.ced");
            ceda::PersistStore* pstore = ceda::OpenPersistStore(path.c_str(), ceda::OM_CREATE_ALWAYS);
            {
                // Open or create a PSpace
                ceda::WPSpace pspace(ceda::OpenPSpace(pstore, "MyPSpace"));

                // Open or create two working sets in the PSpace
                ceda::WorkingSetMachine* ws1 = nullptr;
                ceda::WorkingSetMachine* ws2 = nullptr;
                {
                    ceda::CSpaceTxn txn;
                    ws1 = ceda::OpenWorkingSetMachine("WS1",true);
                    ws2 = ceda::OpenWorkingSetMachine("WS2",false);
                }
                
                {
                    // Bootstrap TestDS under the UT root
                    {
                        ceda::CSpaceTxn txn;
                        GetUTRoot(ws1).Map[UtEntryName].insert(0,$new TestDS);
                    }

                    BaseClass::GenerateOps(ws1);
                    
                    // Synchronise using deltas
                    HalfDuplexSync(ws1, ws2);
                    
                    Tracer() << "Showing result on ws2\n";
                    {
                        ceda::CSpaceTxn txn;
                        TestDS* t = ceda::GetUTRootEntry<TestDS>(ws2,UtEntryName);
                        ceda::PrintInstance(Tracer(), t);
                    }
                }
                
                {
                    ceda::CSpaceTxn txn;
                    Close(ws1);
                    Close(ws2);
                }
            }
            Close(pstore);
        }
    };

    ///////////////////////////////////////////////////////////////////////////////////////////////////
    // Test1

    $struct DS1 <<-os>> isa ceda::IPersistable :
        model
        {
            int32 x32;
        }
    {
    };
    
    $struct Test1 : mixin 
        [
            {
                typedef DS1 TestDS;
                
                const char* GetTitle() { return "Test1"; }
                
                // Generate some operations
                void GenerateOps(ceda::WorkingSetMachine* ws)
                {
                    for (int i=0 ; i < 10 ; ++i)
                    {
                        ceda::CSpaceTxn txn;
                        DS1* t = ceda::GetUTRootEntry<DS1>(ws,"MyApp");
                        t->x32 = i;             // Generate assignment operation
                    }
                }
            }
            TestMixin
        ]
    {
    };

    ///////////////////////////////////////////////////////////////////////////////////////////////////
    // Test2

    $struct DS2 <<-os>> isa ceda::IPersistable :
        model
        {
            xvector<movable<cref<DS2> > > L;
            int16 s;
        }
    {
    };
    
    $struct Test2 : mixin 
        [
            {
                typedef DS2 TestDS;
                
                const char* GetTitle() { return "Test2"; }
                
                // Generate some operations
                void GenerateOps(ceda::WorkingSetMachine* ws)
                {
                    for (int i=0 ; i < 3 ; ++i)
                    {
                        ceda::CSpaceTxn txn;
                        DS2* t = ceda::GetUTRootEntry<DS2>(ws,"MyApp");
                        
                        DS2* c = $new DS2;
                        c->s = i;
                        t->L.insert(0,c);
                    }
                }
            }
            TestMixin
        ]
    {
    };

    ///////////////////////////////////////////////////////////////////////////////////////////////////
    // Test3

    $struct DS3 <<-os>> isa ceda::IPersistable :
        model
        {
            xset<movable<cref<DS3> > > S;
            float64 v;
        }
    {
    };
    
    $struct Test3 : mixin 
        [
            {
                typedef DS3 TestDS;
                
                const char* GetTitle() { return "Test3"; }
                
                // Generate some operations
                void GenerateOps(ceda::WorkingSetMachine* ws)
                {
                    for (int i=0 ; i < 4 ; ++i)
                    {
                        ceda::CSpaceTxn txn;
                        DS3* t = ceda::GetUTRootEntry<DS3>(ws,"MyApp");
                        
                        DS3* c = $new DS3;
                        c->v = i*10.0;
                        t->S.Insert(c);
                    }
                }
            }
            TestMixin
        ]
    {
    };

    ///////////////////////////////////////////////////////////////////////////////////////////////////
    // Run the tests
    
    void Run()
    {
        { Test1 t; t.Run(); }
        { Test2 t; t.Run(); }
        { Test3 t; t.Run(); }
    }

} // @if (false)
    
//@if (false)
//{
    void RunVectorPushBackTest();
    void RunXmapInsertAndMutateTest();
    void Run()
    {
        RunVectorPushBackTest();
        //RunXmapInsertAndMutateTest();
    }

    @def ArraySize = 10
   
    $struct TestObject <<-os>> isa ceda::IPersistable :
        model
        {
            int64 m1[ArraySize];
            xvector<int64> m2[ArraySize];
        }
    {
    };

    void RunVectorPushBackTest()
    {
        Tracer() << "TwoWorkingSets without sockets example\n";
        ceda::TraceIndenter indent;

        const ceda::xstring UtEntryName("MyApp");
        ceda::xstring path = ceda::GetCedaTestPath("TestInteract.ced");
        ceda::PersistStore* pstore = ceda::OpenPersistStore(path.c_str(), ceda::OM_CREATE_ALWAYS);
        {
            // Open or create a PSpace
            ceda::WPSpace pspace(ceda::OpenPSpace(pstore, "MyPSpace"));
            auto cspace = ceda::GetCSpace(pspace);
            ceda::SetMaxDgsTotalByteSize(cspace, ceda::DgsEvictionQueueIndex_SystemMemory, 1024*1024*1024);
            ceda::SetMilliSecsPerGc(cspace, 50000);
            ceda::SetEvictionThreshold(1024*1024*1024);
            auto dosPeriod = ceda::GetDosPeriod(pstore);
            ceda::SetDosPeriod(pstore, dosPeriod * 100);

            // Open or create two working sets in the PSpace
            ceda::WorkingSetMachine* ws1 = nullptr;
            ceda::WorkingSetMachine* ws2 = nullptr;
            {
                ceda::CSpaceTxn txn;
                ws1 = ceda::OpenWorkingSetMachine("WS1",true);
                ws2 = ceda::OpenWorkingSetMachine("WS2",false);
            }
            
            // Bootstrap TestObject under the UT root
            {
                ceda::CSpaceTxn txn;
                GetUTRoot(ws1).Map[UtEntryName].insert(0,$new TestObject);
            }

            ceda::EmptyMoreToSendHandler m;
            ceda::DeltaRW* drwSrc;
            ceda::DeltaRW* drwDst;
            {
                ceda::CSpaceTxn txn;
                drwSrc = CreateDeltaRW(ws1, &m);
                drwDst = CreateDeltaRW(ws2, &m);
            }

            ceda::ExchangeVectorTimes(ws1, ws2, drwSrc, drwDst);
  
            // Q9400 machine, x64 release.
            //     assignment:           150 kHz
            //     push_back on vector : 100 kHz
            {
                int N = 10;
                int NUM_REPEAT = 100;
                ceda::RateTimer timer("Generate operation calculate delta and apply delta", (double) N*ArraySize*NUM_REPEAT);
                for (int i=0 ; i < N ; ++i)
                {
                    ceda::HPTimer hpt;
                    // Generate some operations
                    {
                        ceda::CSpaceTxn txn;
                        TestObject* t = ceda::GetUTRootEntry<TestObject>(ws1,UtEntryName);
                        for (int k=0 ; k < NUM_REPEAT ; ++k)
                        {
                            for (int j=0 ; j < ArraySize ; ++j)
                            {
                                //t->m1[j] = i+j;
                                t->m2[j].push_back(i+j);
                            }
                        }
                    }
                
                    // Synchronise using deltas
                    ceda::SendAndReceiveDelta(drwSrc, drwDst);
                    Tracer() << i << ", " << hpt.GetElapsedTimeInSeconds() << "\n";
                }
                //HalfDuplexSync(ws1, ws2);
            }
            /*
            Tracer() << "Showing result on ws2\n";
            {
                ceda::CSpaceTxn txn;
                TestObject* t = ceda::GetUTRootEntry<TestObject>(ws2,UtEntryName);
                ceda::PrintInstance(Tracer(), t);
            }
            */

            {
                ceda::CSpaceTxn txn;
                ceda::Close(drwSrc);
                ceda::Close(drwDst);
                Close(ws1);
                Close(ws2);
            }
        }
        Close(pstore);
    }

    $struct TestMapValue <<-os>> isa ceda::IPersistable :
        model
        {
            int64 m1;
            xvector<ceda::xstring> m2;
        }
    {
    };

    $struct TestMapHolder isa ceda::IPersistable :
        model
        {
            xmap<ceda::xstring, cref<TestMapValue> > map;
        }
    {
    };

    void RunXmapInsertAndMutateTest()
    {
        Tracer() << "TwoWorkingSets without sockets xmap insert and mutate test\n";
        ceda::TraceIndenter indent;

        const ceda::xstring UtEntryName("MyApp");
        ceda::xstring path = ceda::GetCedaTestPath("TestInteract.ced");
        ceda::PersistStore* pstore = ceda::OpenPersistStore(path.c_str(), ceda::OM_CREATE_ALWAYS);
        {
            // Open or create a PSpace
            ceda::WPSpace pspace(ceda::OpenPSpace(pstore, "MyPSpace"));
            auto cspace = ceda::GetCSpace(pspace);
            ceda::SetMaxDgsTotalByteSize(cspace, ceda::DgsEvictionQueueIndex_SystemMemory, 1024*1024*1024);
            ceda::SetMilliSecsPerGc(cspace, 50000);
            ceda::SetEvictionThreshold(1024*1024*1024);
            auto dosPeriod = ceda::GetDosPeriod(pstore);
            ceda::SetDosPeriod(pstore, dosPeriod * 100);

            // Open or create two working sets in the PSpace
            ceda::WorkingSetMachine* ws1 = nullptr;
            ceda::WorkingSetMachine* ws2 = nullptr;
            {
                ceda::CSpaceTxn txn;
                ws1 = ceda::OpenWorkingSetMachine("WS1",true);
                ws2 = ceda::OpenWorkingSetMachine("WS2",false);
            }
            
            // Bootstrap TestObject under the UT root
            {
                ceda::CSpaceTxn txn;
                GetUTRoot(ws1).Map[UtEntryName].insert(0,$new TestMapHolder);
            }

            // Q9400 machine, x64 release.
            //     insert:           25 kHz
            //     mutate:           100 kHz
            int NUM_TXNS = 10000;
            int NUM_OPS_PER_TXN = 100;
            ceda::xvector<ceda::xstring> keys(NUM_TXNS * NUM_OPS_PER_TXN);
            for (ceda::ssize_t pos = 0 ; pos < NUM_TXNS * NUM_OPS_PER_TXN ; ++pos)
            {
                keys[pos] = ceda::GuidToString(ceda::CreateGuid());
            }

                
            ceda::EmptyMoreToSendHandler m;
            ceda::DeltaRW* drwSrc;
            ceda::DeltaRW* drwDst;
            {
                ceda::CSpaceTxn txn;
                drwSrc = CreateDeltaRW(ws1, &m);
                drwDst = CreateDeltaRW(ws2, &m);
            }
  
            ceda::ExchangeVectorTimes(ws1,ws2,drwSrc,drwDst);

            {
                ceda::RateTimer timer("Insert into xmap and apply deltas", (double) NUM_TXNS*NUM_OPS_PER_TXN);

                for (int i=0 ; i < NUM_TXNS ; ++i)
                {
                    // Generate some map insertion operations
                    /*hpt.BeginSkip();
                    timer.BeginSkip();
                    for (int k=0 ; k < NUM_OPS_PER_TXN ; ++k)
                    {
                        keys[i*NUM_OPS_PER_TXN + k] = ceda::GuidToString(ceda::CreateGuid());
                    }
                    timer.EndSkip();
                    hpt.EndSkip();*/
                    ceda::HPTimer hpt;

                    {
                        ceda::CSpaceTxn _;
                        auto t = ceda::GetUTRootEntry<TestMapHolder>(ws1,UtEntryName);
                        for (int k=0 ; k < NUM_OPS_PER_TXN ; ++k)
                        {
                            t->map[keys[i*NUM_OPS_PER_TXN + k]] = $new TestMapValue;
                        }
                    }
                
                    // Synchronise using deltas
                    ceda::SendAndReceiveDelta(drwSrc, drwDst);
                    Tracer() << i << ", " << hpt.GetElapsedTimeInSeconds() << "\n";
                }
            }

            {
                ceda::RateTimer timer("Generate mutative ops", (double) NUM_TXNS*NUM_OPS_PER_TXN);

                for (int i=0 ; i < NUM_TXNS ; ++i)
                {
                    // Generate some map insertion operations
                    ceda::HPTimer hpt;
                    {
                        ceda::CSpaceTxn _;
                        auto t = ceda::GetUTRootEntry<TestMapHolder>(ws1,UtEntryName);
                        for (int k=0 ; k < NUM_OPS_PER_TXN ; ++k)
                        {
                            t->map[keys[i*NUM_OPS_PER_TXN + k]]->m1 = k % 3;
                            //ceda::xstring val("foobar");
                            //t->map[keys[k]]->m2.insert(0,&val,1);
                        }
                    }
                
                    // Synchronise using deltas
                    ceda::SendAndReceiveDelta(drwSrc, drwDst);
                    Tracer() << i << ", " << hpt.GetElapsedTimeInSeconds() << "\n";
                }
            }

            {
                ceda::CSpaceTxn _;
                ceda::Close(drwSrc);
                ceda::Close(drwDst);
                Close(ws1);
                Close(ws2);
            }
        }
        Close(pstore);
    }
    
    //} // @if(false)
}