PythonUtilityFunctions.h
// PythonUtilityFunctions.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2017
@def mPythonUtilityFunctions =
{
import time
import sys
import traceback
class Txn(object):
def __init__(self, db):
self.db = db
def __enter__(self):
self.db.openTxn()
def __exit__(self, etype, value, tb):
if etype is not None: # an exception has occurred
traceback.print_exception(etype, value, tb)
# We do not allow transactions to be automatically committed when exceptions are
# raised, it is too easy for an invalid state to be committed to the database
# The safest thing to do is to exit the process without comitting the transaction.
sys.exit(-1)
self.db.closeTxn()
class Database(object):
def __init__(self):
self.pstore = None
self.pspace = None
self.cspace = None
self.workingSet = None
self.utRoot = None
self.client = None
self.server = None
def setPSpaceAndCSpaceForThread(self):
ceda.SetThreadPSpace(self.pspace)
ceda.SetThreadCSpace(self.cspace)
# dataSetMaster determines whether a data set identifier is allocated using CreateGuid()
# when the working set is created for the first time
def open(self,path,createNew=False,dataSetMaster=True):
# Open/create a PersistStore
print('Opening ' + path)
self.pstore = ceda.OpenPersistStore(
path,
ceda.EOpenMode.OM_CREATE_ALWAYS if createNew else ceda.EOpenMode.OM_OPEN_ALWAYS,
ceda.PersistStoreSettings())
# Open/create a PSpace in the PersistStore
print('Opening PSpace')
self.pspace = ceda.OpenPSpace(self.pstore, "MyPSpace", ceda.EOpenMode.OM_OPEN_ALWAYS, None)
# Get the CSpace associated with the PSpace
self.cspace = self.pspace.GetCSpace()
with Txn(self):
# Create/open a working set in the PSpace
print('Opening working set')
self.workingSet = ceda.OpenWorkingSetMachine("MyWorkingSet", dataSetMaster)
# Get the root of the Universal Tree of the working set
self.utRoot = self.workingSet.GetUTRoot()
def bootstrapRootObject(self,utRootKey, createRootObjectFn):
# bootstrap root object into the database
with Txn(self):
# self.utRoot.Map is of type xmap<xstring,xvector<pref<IObject>>>
# objList is of type xvector<pref<IObject>>
# When the database is first created this vector is empty
objList = self.utRoot.Map[utRootKey]
if objList:
# We have opened an existing database
self.justCreated = False
self.rootObj = objList[0].self # Bind to existing object in the database
else:
# We have created a new database
self.justCreated = True
self.rootObj = createRootObjectFn()
objList.append(self.rootObj)
self.cspace.AddGcRoot(self.rootObj) # protect object from garbage collection
return self.rootObj
def addGcRoot(self,r):
self.cspace.AddGcRoot(r)
def removeGcRoot(self,r):
self.cspace.RemoveGcRoot(r)
# Open a transaction on the PSpace/CSpace
def openTxn(self):
self.setPSpaceAndCSpaceForThread()
self.cspace.Lock(ceda.ECSpaceLockMode.Transaction)
# Close the transaction on the PSpace
def closeTxn(self):
self.cspace.Unlock()
def getProtocolId(self):
return ceda.MakeWsipcSessionProtocolId("--protocol---", 1)
def getHostInfo(self):
return ceda.WsipcHostInfo()
def createServer(self, port):
sessionSettings = ceda.TcpMsgSessionSettings()
print('Creating server listening on port ' + str(port))
reuse_addr = True
ep = ceda.CreateWsipcEndPoint(self.workingSet, self.getProtocolId(), self.getHostInfo())
ioContextPool = None
self.server = ceda.CreateTcpMsgServer(ceda.EProtocol.TCP_IPv4, port, reuse_addr, ep, sessionSettings, ioContextPool)
def createClient(self, hostname, port):
sessionSettings = ceda.TcpMsgSessionSettings()
print('Creating client connecting to ' + str(hostname) + ':' + str(port))
ep = ceda.CreateWsipcEndPoint(self.workingSet, self.getProtocolId(), self.getHostInfo())
ioContextPool = None
self.client = ceda.CreateTcpMsgClient(hostname, str(port), ep, sessionSettings, ioContextPool)
def close(self):
self.setPSpaceAndCSpaceForThread()
if self.server:
print('Closing server')
ceda.CloseTcpMsgServer(self.server)
self.server = None
if self.client:
print('Closing client')
ceda.CloseTcpMsgClient(self.client)
self.client = None
if self.workingSet:
print('Closing WorkingSet')
with Txn(self):
self.workingSet.Close()
self.workingSet = None
if self.pspace:
print('Closing PSpace')
self.pspace.Close()
self.pspace = None
if self.pstore:
print('Closing PersistStore')
self.pstore.Close()
self.pstore = None
ceda.SetThreadPSpace(None)
ceda.SetThreadCSpace(None)
def DataReplicationTest(dbSrc, dbDst, utRootKey, showRootObjFn):
print
print('Replication test ' + dbSrc + ' to ' + dbDst)
dbServer = Database()
dbServer.open(dbSrc, dataSetMaster=True)
dbServer.createServer(3000)
with Txn(dbServer):
numServerOperations = dbServer.workingSet.HistoryBufferSize()
print('num server operations = ' + str(numServerOperations))
dbClient = Database()
dbClient.open(dbDst, createNew=True, dataSetMaster=False)
dbClient.createClient("127.0.0.1", 3000)
# Wait until client has received the same number of operations
start = time.time()
while True:
with Txn(dbClient):
numClientOperations = dbClient.workingSet.HistoryBufferSize()
print('num client operations = ' + str(numClientOperations))
if numClientOperations == numServerOperations:
break
#time.sleep(0.01)
end = time.time()
print('time to sync databases = ' + str(end-start) + ' seconds')
with Txn(dbClient):
objList = dbClient.utRoot.Map[utRootKey]
if objList:
rootObject = objList[0].self
showRootObjFn(rootObject)
dbClient.close()
dbServer.close()
}