TwoWorkingSets.cpp
// TwoWorkingSets.cpp
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2008
@import "Ceda/cxOperation/UTRoot.h"
@import "Ceda/cxOperation/IWorkingSetMachine.h"
@import "Ceda/cxWorkingSetIpc/WorkingSetIPC.h"
@import "Ceda/cxPersistStore/IPersistStore.h"
@import "Ceda/cxObject/IObjectVisitor.h"
@import "Ceda/cxObject/PrintReflectedVariable.h"
#include "Ceda/cxUtils/HPTime.h"
#include "Ceda/cxUtils/CedaAssert.h"
#include "Ceda/cxUtils/Tracer.h"
#include "Ceda/cxUtils/Environ.h"
@if (false)
{
$interface+ IImage : ceda::IObject
{
void foo();
};
$struct+ JpegImage <<-copy>> isa ceda::IPersistable, IImage
{
PagedBuffer m_jpegStream;
void Serialise(Archive& ar) const
{
const schema_t SCHEMA = 1;
if (ar.IsStoring())
{
SerialiseSchema(ar, SCHEMA);
ar << m_jpegStream;
}
else
{
schema_t schema = DeserialiseSchema(ar,SCHEMA);
ar >> m_jpegStream;
Tracer() << "Created a JPEG image - stream length: " << m_jpegStream.GetSize() << "\n";
}
}
void foo()
{
Tracer() << "foo called\n";
}
};
$struct+ XX isa ceda::IPersistable :
model
{
xvector<movable<pref<IImage> > > images;
}
{
};
void Test()
{
Tracer() << "Test example\n";
const ceda::xstring UtEntryName("MyApp");
xstring path = GetCedaTestPath("TestInteract.ced");
{
PersistStore* pstore = OpenPersistStore(path.c_str(), OM_CREATE_ALWAYS);
{
// Open or create a PSpace
WPSpace pspace(OpenPSpace(pstore, "MyPSpace"));
// Open or create two working sets in the PSpace
WorkingSet* ws = NULL;
{
CSpaceTxn txn;
ws = OpenWorkingSet(txn, pspace, "WS");
}
{
CSpaceTxn txn;
UTRoot& r = GetUTRoot(ws);
r.Map[UtEntryName].insert(0,$new XX);
x->images.insert(0, $new JpegImage);
p->foo();
}
Close(ws);
}
Close(pstore);
}
{
PersistStore* pstore = OpenPersistStore(path.c_str(), OM_OPEN_ALWAYS);
{
// Open or create a PSpace
WPSpace pspace(OpenPSpace(pstore, "MyPSpace"));
// Open or create two working sets in the PSpace
WorkingSet* ws = NULL;
{
CSpaceTxn txn;
ws = OpenWorkingSet(txn, pspace, "WS");
}
{
CSpaceTxn txn;
XX* x = GetUTRootEntry<X>(ws,UtEntryName);
pref<IImage> p = x->images[0];
p->foo();
}
Close(ws);
}
Close(pstore);
}
}
} // false
///////////////////////////////////////////////////////////////////////////////////////////////////
// TestDS
/*
A simple example of a data source
*/
$struct+ TestDS <<-os>> isa ceda::IPersistable :
model
{
int32 x32;
}
{
};
///////////////////////////////////////////////////////////////////////////////////////////////////
/*
A simple test that
1. Creates a new store
2. Creates a PSpace
3. Creates two independent working sets named WS1 and WS2 in the same PSpace
4. Creates server and client, allowing for interactive collaborative between the two
working sets.
5. On one working set
- a TestDS is added as the 0th child under the UT root
- 10 assignments are performed on the TestDS
6. Wait until the second working set has received and applied all these operations
7. Print the TestDS in the second working set
8. Close server, client, the two working sets, the PSpace and the PersistStore.
*/
void TwoWorkingSets()
{
Tracer() << "TwoWorkingSets example\n";
ceda::TraceIndenter indent;
const ceda::xstring UtEntryName("MyApp");
ceda::xstring path = ceda::GetCedaTestPath("TestInteract.ced");
ceda::PersistStore* pstore = ceda::OpenPersistStore(path.c_str(), ceda::OM_CREATE_ALWAYS);
{
// Open or create a PSpace
ceda::WPSpace pspace(ceda::OpenPSpace(pstore, "MyPSpace"));
// Open or create two working sets in the PSpace
ceda::WorkingSetMachine* ws1 = nullptr;
ceda::WorkingSetMachine* ws2 = nullptr;
{
ceda::CSpaceTxn txn;
ws1 = ceda::OpenWorkingSetMachine("WS1",true);
ws2 = ceda::OpenWorkingSetMachine("WS2",false);
}
{
// Create server and client
ceda::TcpMsgServer* server;
ceda::TcpMsgClient* client;
{
const ceda::WsipcSessionProtocolId protocolId = { "TwoWorkingSets", 1 };
ceda::WsipcHostInfo localHostInfo;
auto protocol = ceda::EProtocol::TCP_IPv4;
int port = 3000;
bool reuse_addr = false;
const char* host = "127.0.0.1";
const char* service = "3000";
ceda::TcpMsgSessionSettings sessionSettings;
ceda::CSpaceTxn txn;
server = CreateTcpMsgServer(
protocol, port, reuse_addr,
*CreateWsipcEndPoint(ws1, protocolId, localHostInfo), sessionSettings);
client = CreateTcpMsgClient(
host, service,
*CreateWsipcEndPoint(ws2, protocolId, localHostInfo), sessionSettings);
}
// Bootstrap TestDS under the UT root
{
ceda::CSpaceTxn txn;
GetUTRoot(ws1).Map[UtEntryName].insert(0,$new TestDS);
}
// Generate some operations
for (int i=0 ; i < 10 ; ++i)
{
ceda::CSpaceTxn txn;
TestDS* t = ceda::GetUTRootEntry<TestDS>(ws1,UtEntryName);
t->x32 = i; // Generate assignment operation
}
// Poll for quiescence
Tracer() << "Polling for quiescence\n";
while(1)
{
Sleep(10);
ceda::CSpaceTxn txn;
if (HistoryBufferSize(ws2) == 11) break;
}
Tracer() << "Showing result on ws2\n";
{
ceda::CSpaceTxn txn;
TestDS* t = ceda::GetUTRootEntry<TestDS>(ws2,UtEntryName);
ceda::PrintInstance(Tracer(), t);
}
Close(server);
Tracer() << "closed server\n";
Close(client);
Tracer() << "closed client\n";
}
{
ceda::CSpaceTxn txn;
Close(ws1);
Close(ws2);
Tracer() << "closed working sets\n";
}
}
Close(pstore);
Tracer() << "closed pstore\n";
}
@if (false)
{
$model+ Point
{
int32 x;
int32 y;
//int32 L[5];
};
$model+ Line
{
Point p1;
Point p2;
Point P[5][3];
};
$model+ LeafTypes
{
bool b;
int8 x8;
int16 x16;
int32 x32;
int64 x64;
uint8 ux8;
uint16 ux16;
uint32 ux32;
uint64 ux64;
float32 f32;
float64 f64;
char8 c8;
char16 c16;
string8 s8;
string16 s16;
};
typedef int32 AssignType;
$struct+ TestX <<-copy>> isa ceda::IPersistable :
model
{
//LeafTypes lt;
//Point p;
//Line line;
//int8 x8;
//int16 x16;
int32 x32;
//int64 x64;
xvector<char8> s;
//int32 A[10];
//xvector<movable<pref<IObject> > > w;
//xvector<movable<cref<X> > > children;
xvector<movable<cref<TestX> > > children;
}
{
$$()
{
//_p_.x = 0;
//_p_.y = 0;
//_x8_ = 8;
//_x16_ = 16;
//_x32_ = 32;
//_x64_ = 64;
//for (int i=0 ; i < 5 ; ++i)
// for (int j=0 ; j < 3 ; ++j) _line_.P[i][j].x = 0;
//for (int i=0 ; i < 10 ; ++i) _A_[i] = 0;
}
int32 getx() const { return x32; }
void setx(int32 x) { x32 = x; }
bool operator==(const TestX& rhs) const { return _x32_ == rhs._x32_; }
/*
int32 getx() const { return p.x; }
void setx(int32 x) { p.x = x; }
bool operator==(const UTRoot& rhs) const { return _p_.x == rhs._p_.x; }
*/
/*
int32 getx() const { return line.p2.x; }
void setx(int32 x) { line.p2.x = x; }
bool operator==(const UTRoot& rhs) const
{
return _line_.p2.x == rhs._line_.p2.x &&
_s_ == rhs._s_;
}
*/
/*
int32 getx() const { return A[3]; }
void setx(int32 x) { A[3] = x; }
bool operator==(const UTRoot& rhs) const
{
return _A_[3] == rhs._A_[3] &&
_s_ == rhs._s_;
}
*/
/*
int32 getx() const { return line.p2.L[3]; }
void setx(int32 x) { line.p2.L[3] = x; }
bool operator==(const UTRoot& rhs) const
{
return _line_.p2.L[3]== rhs._line_.p2.L[3] &&
_s_ == rhs._s_;
}
*/
/*
int32 getx() const { return line.P[2][4].x; }
void setx(int32 x) { line.P[2][4].x = x; }
bool operator==(const UTRoot& rhs) const
{
return _line_.P[2][4].x == rhs._line_.P[2][4].x &&
_s_ == rhs._s_;
}
*/
void PrintInfo(xostream& os)
{
//os << "PtoQ = " << (const PtoQMap&) _s_.m_ptoq << '\n';
}
int32 size_s() const { return s.size(); }
xvector<char8> get_s() const { return _s_; }
void insert_s(int32 pos, const char8* c, int32 n) { s.insert(pos,c,n); }
void erase_s(int32 i1,int32 i2) { s.erase(i1,i2); }
void InsertChild(int32 pos)
{
children.insert(pos, $new TestX);
}
};
} // false