WithoutSockets.cpp
// TwoWorkingSets.cpp
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2008
@import "Ceda/cxOperation/SetOfPrefs.h"
@import "Ceda/cxOperation/UTRoot.h"
@import "Ceda/cxOperation/IWorkingSetMachine.h"
@import "Ceda/cxPersistStore/IPersistStore.h"
@import "Ceda/cxObject/IObjectVisitor.h"
@import "Ceda/cxObject/PrintReflectedVariable.h"
@import "Ceda/cxObject/DGNode.h"
#include "Ceda/cxUtils/CedaAssert.h"
#include "Ceda/cxUtils/Tracer.h"
#include "Ceda/cxUtils/Environ.h"
#include "Ceda/cxUtils/PagedBufferAsArchive.h"
#include "Ceda/cxUtils/TestTimer.h"
namespace x
{
$variant+ Value
{
void;
bool;
int32;
float64;
ceda::xstring;
ceda::xvector<Value> array;
ceda::xmap<ceda::xstring,Value> object;
};
Value CreateBill()
{
ceda::xmap<ceda::xstring,Value> sarah;
sarah["name"] = ceda::xstring("Sarah");
sarah["likes_icecream"] = true;
sarah["age"] = 6;
sarah["skill"] = 0.95;
ceda::xvector<Value> billsChildren;
billsChildren.push_back( Value(sarah) );
ceda::xmap<ceda::xstring,Value> bill;
bill["name"] = ceda::xstring("Bill");
bill["age"] = 27;
bill["skill"] = 0.85;
bill["children"] = Value(billsChildren);
return Value::object(bill);
}
}
namespace y
{
$struct Object;
$struct Array;
$variant+ Value
{
void;
bool;
int32;
float64;
ceda::xstring;
ceda::cref<Array>;
ceda::cref<Object>;
};
$struct+ Array isa ceda::IPersistable : model { ceda::xvector<Value> elements; } {};
$struct+ Object isa ceda::IPersistable : model { ceda::xmap<ceda::xstring,Value> members; } {};
Object* CreateBill()
{
Object* sarah = $new Object;
sarah->members["name"] = ceda::xstring("Sarah");
sarah->members["likes_icecream"] = true;
sarah->members["age"] = 6;
sarah->members["skill"] = 0.95;
Array* billsChildren = $new Array;
billsChildren->elements.push_back( ceda::cref<Object>(sarah) );
Object* bill = $new Object;
bill->members["name"] = ceda::xstring("Bill");
bill->members["age"] = 27;
bill->members["skill"] = 0.85;
bill->members["children"] = ceda::cref<Array>(billsChildren);
return bill;
}
ceda::xstring GetName(const Object* o)
{
return o->members.read()["name"].as<ceda::xstring>();
}
}
namespace WithoutSocketsEx
{
@if (false)
{
///////////////////////////////////////////////////////////////////////////////////////////////////
/*
A simple test that
1. Creates a new store
2. Creates a PSpace
3. Creates two independent working sets named WS1 and WS2 in the same PSpace
4. On one working set
- a datasource is added as the 0th child under the UT root
- operations are performed
5. Send a delta
6. Print the datasource in the second working set
7. Close the two working sets, the PSpace and the PersistStore.
*/
$mixin TestMixin
{
using typename BaseClass::TestDS;
void Run()
{
Tracer() << BaseClass::GetTitle() << " example\n";
ceda::TraceIndenter indent;
const ceda::xstring UtEntryName("MyApp");
ceda::xstring path = ceda::GetCedaTestPath("TestInteract.ced");
ceda::PersistStore* pstore = ceda::OpenPersistStore(path.c_str(), ceda::OM_CREATE_ALWAYS);
{
// Open or create a PSpace
ceda::WPSpace pspace(ceda::OpenPSpace(pstore, "MyPSpace"));
// Open or create two working sets in the PSpace
ceda::WorkingSetMachine* ws1 = nullptr;
ceda::WorkingSetMachine* ws2 = nullptr;
{
ceda::CSpaceTxn txn;
ws1 = ceda::OpenWorkingSetMachine("WS1",true);
ws2 = ceda::OpenWorkingSetMachine("WS2",false);
}
{
// Bootstrap TestDS under the UT root
{
ceda::CSpaceTxn txn;
GetUTRoot(ws1).Map[UtEntryName].insert(0,$new TestDS);
}
BaseClass::GenerateOps(ws1);
// Synchronise using deltas
HalfDuplexSync(ws1, ws2);
Tracer() << "Showing result on ws2\n";
{
ceda::CSpaceTxn txn;
TestDS* t = ceda::GetUTRootEntry<TestDS>(ws2,UtEntryName);
ceda::PrintInstance(Tracer(), t);
}
}
{
ceda::CSpaceTxn txn;
Close(ws1);
Close(ws2);
}
}
Close(pstore);
}
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// Test1
$struct DS1 <<-os>> isa ceda::IPersistable :
model
{
int32 x32;
}
{
};
$struct Test1 : mixin
[
{
typedef DS1 TestDS;
const char* GetTitle() { return "Test1"; }
// Generate some operations
void GenerateOps(ceda::WorkingSetMachine* ws)
{
for (int i=0 ; i < 10 ; ++i)
{
ceda::CSpaceTxn txn;
DS1* t = ceda::GetUTRootEntry<DS1>(ws,"MyApp");
t->x32 = i; // Generate assignment operation
}
}
}
TestMixin
]
{
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// Test2
$struct DS2 <<-os>> isa ceda::IPersistable :
model
{
xvector<movable<cref<DS2> > > L;
int16 s;
}
{
};
$struct Test2 : mixin
[
{
typedef DS2 TestDS;
const char* GetTitle() { return "Test2"; }
// Generate some operations
void GenerateOps(ceda::WorkingSetMachine* ws)
{
for (int i=0 ; i < 3 ; ++i)
{
ceda::CSpaceTxn txn;
DS2* t = ceda::GetUTRootEntry<DS2>(ws,"MyApp");
DS2* c = $new DS2;
c->s = i;
t->L.insert(0,c);
}
}
}
TestMixin
]
{
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// Test3
$struct DS3 <<-os>> isa ceda::IPersistable :
model
{
xset<movable<cref<DS3> > > S;
float64 v;
}
{
};
$struct Test3 : mixin
[
{
typedef DS3 TestDS;
const char* GetTitle() { return "Test3"; }
// Generate some operations
void GenerateOps(ceda::WorkingSetMachine* ws)
{
for (int i=0 ; i < 4 ; ++i)
{
ceda::CSpaceTxn txn;
DS3* t = ceda::GetUTRootEntry<DS3>(ws,"MyApp");
DS3* c = $new DS3;
c->v = i*10.0;
t->S.Insert(c);
}
}
}
TestMixin
]
{
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// Run the tests
void Run()
{
{ Test1 t; t.Run(); }
{ Test2 t; t.Run(); }
{ Test3 t; t.Run(); }
}
} // @if (false)
//@if (false)
//{
void RunVectorPushBackTest();
void RunXmapInsertAndMutateTest();
void Run()
{
RunVectorPushBackTest();
//RunXmapInsertAndMutateTest();
}
@def ArraySize = 10
$struct TestObject <<-os>> isa ceda::IPersistable :
model
{
int64 m1[ArraySize];
xvector<int64> m2[ArraySize];
}
{
};
void RunVectorPushBackTest()
{
Tracer() << "TwoWorkingSets without sockets example\n";
ceda::TraceIndenter indent;
const ceda::xstring UtEntryName("MyApp");
ceda::xstring path = ceda::GetCedaTestPath("TestInteract.ced");
ceda::PersistStore* pstore = ceda::OpenPersistStore(path.c_str(), ceda::OM_CREATE_ALWAYS);
{
// Open or create a PSpace
ceda::WPSpace pspace(ceda::OpenPSpace(pstore, "MyPSpace"));
auto cspace = ceda::GetCSpace(pspace);
ceda::SetMaxDgsTotalByteSize(cspace, ceda::DgsEvictionQueueIndex_SystemMemory, 1024*1024*1024);
ceda::SetMilliSecsPerGc(cspace, 50000);
ceda::SetEvictionThreshold(1024*1024*1024);
auto dosPeriod = ceda::GetDosPeriod(pstore);
ceda::SetDosPeriod(pstore, dosPeriod * 100);
// Open or create two working sets in the PSpace
ceda::WorkingSetMachine* ws1 = nullptr;
ceda::WorkingSetMachine* ws2 = nullptr;
{
ceda::CSpaceTxn txn;
ws1 = ceda::OpenWorkingSetMachine("WS1",true);
ws2 = ceda::OpenWorkingSetMachine("WS2",false);
}
// Bootstrap TestObject under the UT root
{
ceda::CSpaceTxn txn;
GetUTRoot(ws1).Map[UtEntryName].insert(0,$new TestObject);
}
ceda::EmptyMoreToSendHandler m;
ceda::DeltaRW* drwSrc;
ceda::DeltaRW* drwDst;
{
ceda::CSpaceTxn txn;
drwSrc = CreateDeltaRW(ws1, &m);
drwDst = CreateDeltaRW(ws2, &m);
}
ceda::ExchangeVectorTimes(ws1, ws2, drwSrc, drwDst);
// Q9400 machine, x64 release.
// assignment: 150 kHz
// push_back on vector : 100 kHz
{
int N = 10;
int NUM_REPEAT = 100;
ceda::RateTimer timer("Generate operation calculate delta and apply delta", (double) N*ArraySize*NUM_REPEAT);
for (int i=0 ; i < N ; ++i)
{
ceda::HPTimer hpt;
// Generate some operations
{
ceda::CSpaceTxn txn;
TestObject* t = ceda::GetUTRootEntry<TestObject>(ws1,UtEntryName);
for (int k=0 ; k < NUM_REPEAT ; ++k)
{
for (int j=0 ; j < ArraySize ; ++j)
{
//t->m1[j] = i+j;
t->m2[j].push_back(i+j);
}
}
}
// Synchronise using deltas
ceda::SendAndReceiveDelta(drwSrc, drwDst);
Tracer() << i << ", " << hpt.GetElapsedTimeInSeconds() << "\n";
}
//HalfDuplexSync(ws1, ws2);
}
/*
Tracer() << "Showing result on ws2\n";
{
ceda::CSpaceTxn txn;
TestObject* t = ceda::GetUTRootEntry<TestObject>(ws2,UtEntryName);
ceda::PrintInstance(Tracer(), t);
}
*/
{
ceda::CSpaceTxn txn;
ceda::Close(drwSrc);
ceda::Close(drwDst);
Close(ws1);
Close(ws2);
}
}
Close(pstore);
}
$struct TestMapValue <<-os>> isa ceda::IPersistable :
model
{
int64 m1;
xvector<ceda::xstring> m2;
}
{
};
$struct TestMapHolder isa ceda::IPersistable :
model
{
xmap<ceda::xstring, cref<TestMapValue> > map;
}
{
};
void RunXmapInsertAndMutateTest()
{
Tracer() << "TwoWorkingSets without sockets xmap insert and mutate test\n";
ceda::TraceIndenter indent;
const ceda::xstring UtEntryName("MyApp");
ceda::xstring path = ceda::GetCedaTestPath("TestInteract.ced");
ceda::PersistStore* pstore = ceda::OpenPersistStore(path.c_str(), ceda::OM_CREATE_ALWAYS);
{
// Open or create a PSpace
ceda::WPSpace pspace(ceda::OpenPSpace(pstore, "MyPSpace"));
auto cspace = ceda::GetCSpace(pspace);
ceda::SetMaxDgsTotalByteSize(cspace, ceda::DgsEvictionQueueIndex_SystemMemory, 1024*1024*1024);
ceda::SetMilliSecsPerGc(cspace, 50000);
ceda::SetEvictionThreshold(1024*1024*1024);
auto dosPeriod = ceda::GetDosPeriod(pstore);
ceda::SetDosPeriod(pstore, dosPeriod * 100);
// Open or create two working sets in the PSpace
ceda::WorkingSetMachine* ws1 = nullptr;
ceda::WorkingSetMachine* ws2 = nullptr;
{
ceda::CSpaceTxn txn;
ws1 = ceda::OpenWorkingSetMachine("WS1",true);
ws2 = ceda::OpenWorkingSetMachine("WS2",false);
}
// Bootstrap TestObject under the UT root
{
ceda::CSpaceTxn txn;
GetUTRoot(ws1).Map[UtEntryName].insert(0,$new TestMapHolder);
}
// Q9400 machine, x64 release.
// insert: 25 kHz
// mutate: 100 kHz
int NUM_TXNS = 10000;
int NUM_OPS_PER_TXN = 100;
ceda::xvector<ceda::xstring> keys(NUM_TXNS * NUM_OPS_PER_TXN);
for (ceda::ssize_t pos = 0 ; pos < NUM_TXNS * NUM_OPS_PER_TXN ; ++pos)
{
keys[pos] = ceda::GuidToString(ceda::CreateGuid());
}
ceda::EmptyMoreToSendHandler m;
ceda::DeltaRW* drwSrc;
ceda::DeltaRW* drwDst;
{
ceda::CSpaceTxn txn;
drwSrc = CreateDeltaRW(ws1, &m);
drwDst = CreateDeltaRW(ws2, &m);
}
ceda::ExchangeVectorTimes(ws1,ws2,drwSrc,drwDst);
{
ceda::RateTimer timer("Insert into xmap and apply deltas", (double) NUM_TXNS*NUM_OPS_PER_TXN);
for (int i=0 ; i < NUM_TXNS ; ++i)
{
// Generate some map insertion operations
/*hpt.BeginSkip();
timer.BeginSkip();
for (int k=0 ; k < NUM_OPS_PER_TXN ; ++k)
{
keys[i*NUM_OPS_PER_TXN + k] = ceda::GuidToString(ceda::CreateGuid());
}
timer.EndSkip();
hpt.EndSkip();*/
ceda::HPTimer hpt;
{
ceda::CSpaceTxn _;
auto t = ceda::GetUTRootEntry<TestMapHolder>(ws1,UtEntryName);
for (int k=0 ; k < NUM_OPS_PER_TXN ; ++k)
{
t->map[keys[i*NUM_OPS_PER_TXN + k]] = $new TestMapValue;
}
}
// Synchronise using deltas
ceda::SendAndReceiveDelta(drwSrc, drwDst);
Tracer() << i << ", " << hpt.GetElapsedTimeInSeconds() << "\n";
}
}
{
ceda::RateTimer timer("Generate mutative ops", (double) NUM_TXNS*NUM_OPS_PER_TXN);
for (int i=0 ; i < NUM_TXNS ; ++i)
{
// Generate some map insertion operations
ceda::HPTimer hpt;
{
ceda::CSpaceTxn _;
auto t = ceda::GetUTRootEntry<TestMapHolder>(ws1,UtEntryName);
for (int k=0 ; k < NUM_OPS_PER_TXN ; ++k)
{
t->map[keys[i*NUM_OPS_PER_TXN + k]]->m1 = k % 3;
//ceda::xstring val("foobar");
//t->map[keys[k]]->m2.insert(0,&val,1);
}
}
// Synchronise using deltas
ceda::SendAndReceiveDelta(drwSrc, drwDst);
Tracer() << i << ", " << hpt.GetElapsedTimeInSeconds() << "\n";
}
}
{
ceda::CSpaceTxn _;
ceda::Close(drwSrc);
ceda::Close(drwDst);
Close(ws1);
Close(ws2);
}
}
Close(pstore);
}
//} // @if(false)
}