AsyncIOEx.cpp

// AsyncIOEx.cpp
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2011

@import "Ceda/cxObject/Object.h"
@import "Ceda/cxObject/DGNode.h"
@import "Ceda/cxObject/WCSpace.h"
@import "Ceda/cxPersistStore/IPersistStore.h"
#include "Ceda/cxUtils/TracerUtils.h"
#include "Ceda/cxUtils/Environ.h"
#include "Ceda/cxUtils/TestTimer.h"

namespace AsyncIO
{
$struct X isa ceda::IPersistable :
    model
    {
        int32 Value;
        xvector<cref<X> > Children;
    } 
{
    ceda::ssize_t AsyncLoadChildren() const
    {
        ceda::xvector<ceda::cref<X> > const& C = Children.read();
        ceda::ssize_t numLoaded = 0;
        for (ceda::ssize_t i = 0 ; i < C.size() ; ++i)
        {
            X* p = C[i].AsyncGet();
            if (p) 
            {
                cxAssert(p->Value == i);
                numLoaded++;
            }
        }
        return numLoaded; 
    }
    
};

void Run()
{
    ceda::TraceGroup g("Async I/O example");

    ceda::xstring storePath = ceda::GetCedaTestPath("AsyncIOEx.ced");
    ceda::ConstStringZ pSpaceName = "P";
    ceda::ConstStringZ rootName = "R";

    int N = 1000;
    
    // Create store, PSpace, and some objects in the PSpace
    {
        Tracer() << "Creating new store " << storePath << '\n';
        ceda::close_ptr<ceda::PersistStore> pstore(ceda::OpenPersistStore(storePath.c_str(), ceda::OM_CREATE_ALWAYS));

        Tracer() << "Creating PSpace " << pSpaceName << '\n';
        ceda::WPSpace pspace(ceda::OpenPSpace(pstore.get(), pSpaceName));

        Tracer() << "Creating root " << rootName << '\n';
        X* root = ceda::BootstrapPSpaceRoot<X>(rootName);
        cxAssert(root);
        
        {
            ceda::CSpaceTxn txn;
            for (int i = 0 ; i < N ; ++i)
            {
                X* x = $new X;
                x->Value = i;
                root->Children.push_back(x);
            }
        }
    }
    
    // Reopen store, load some objects with async I/O
    {
        Tracer() << "\nOpen store " << storePath << '\n';
        ceda::close_ptr<ceda::PersistStore> pstore(ceda::OpenPersistStore(storePath.c_str(), ceda::OM_OPEN_EXISTING));

        Tracer() << "Open PSpace " << pSpaceName << '\n';
        ceda::WPSpace pspace(ceda::OpenPSpace(pstore.get(), pSpaceName));

        ceda::SetMaxDgsTotalByteSize(ceda::DgsEvictionQueueIndex_SystemMemory, N);

        Tracer() << "Open root " << rootName << '\n';
        X* root = ceda::BootstrapPSpaceRoot<X>(rootName);
        cxAssert(root);
        {
            ceda::RateTimer timer("Async load", N);
            while(1)
            {
                Sleep(10);
                ceda::CSpaceTxn txn;

                ceda::HPTimer timer;
                ceda::ssize_t numLoaded = root->AsyncLoadChildren();
                Tracer() << "Number of children loaded = " << numLoaded << '\n';
                timer.Done() << "AsyncLoadChildren()";
                if (numLoaded == N) break;
            }
        }        
    }
}

} // namespace AsyncIO