Using your favorite text editor, create a file named cedadatabase.py as follows:
# cedadatabase.py
# This is python
import time
import sys
import traceback
import pyceda
ceda = pyceda.cns.ceda
class Txn(object):
def __init__(self, db):
self.db = db
def __enter__(self):
self.db.openTxn()
def __exit__(self, etype, value, tb):
if etype is not None: # an exception has occurred
traceback.print_exception(etype, value, tb)
# We do not allow transactions to be automatically committed when exceptions are
# raised, it is too easy for an invalid state to be committed to the database
# The safest thing to do is to exit the process without comitting the transaction.
sys.exit(-1)
self.db.closeTxn()
class Database(object):
def __init__(self, iocp = None):
self.pstore = None
self.pspace = None
self.cspace = None
self.workingSet = None
self.utRoot = None
self.iocp = iocp
self.local_iocp = None
self.client = None
self.server = None
def setPSpaceAndCSpaceForThread(self):
ceda.SetThreadPSpace(self.pspace)
ceda.SetThreadCSpace(self.cspace)
# dataSetMaster determines whether a data set identifier is allocated using CreateGuid()
# when the working set is created for the first time
def open(self,path,createNew=False,dataSetMaster=True):
# Open/create a PersistStore
print 'Opening ' + path
self.pstore = ceda.OpenPersistStore(
path,
ceda.EOpenMode.OM_CREATE_ALWAYS if createNew else ceda.EOpenMode.OM_OPEN_ALWAYS,
ceda.PersistStoreSettings())
# Open/create a PSpace in the PersistStore
print 'Opening PSpace'
self.pspace = ceda.OpenPSpace(self.pstore, "MyPSpace", ceda.EOpenMode.OM_OPEN_ALWAYS, None)
# Get the CSpace associated with the PSpace
self.cspace = self.pspace.GetCSpace()
with Txn(self):
# Create/open a working set in the PSpace
print 'Opening working set'
self.workingSet = ceda.OpenWorkingSetMachine("MyWorkingSet", dataSetMaster)
# Get the root of the Universal Tree of the working set
self.utRoot = self.workingSet.GetUTRoot()
def bootstrapRootObject(self,utRootKey, createRootObjectFn):
# bootstrap root object into the database
with Txn(self):
# self.utRoot.Map is of type xmap > > >
# objList is of type xvector > >
# When the database is first created this vector is empty
objList = self.utRoot.Map[utRootKey]
if objList:
# We have opened an existing database
self.justCreated = False
self.rootObj = objList[0].self # Bind to existing object in the database
else:
# We have created a new database
self.justCreated = True
self.rootObj = createRootObjectFn()
objList.append(self.rootObj)
self.cspace.AddGcRoot(self.rootObj) # protect object from garbage collection
return self.rootObj
def addGcRoot(self,r):
self.cspace.AddGcRoot(r)
def removeGcRoot(self,r):
self.cspace.RemoveGcRoot(r)
# Open a transaction on the PSpace/CSpace
def openTxn(self):
self.setPSpaceAndCSpaceForThread()
self.cspace.Lock(ceda.ECSpaceLockMode.Transaction)
# Close the transaction on the PSpace
def closeTxn(self):
self.cspace.Unlock()
def getProtocolId(self):
return ceda.MakeWsipcSessionProtocolId("--protocol---", 1)
def getHostInfo(self):
return ceda.WsipcHostInfo()
def createServer(self, port):
if self.iocp is None:
self.iocp = self.local_iocp = ceda.CreateIocp2(0)
tcpSettings = ceda.TcpSettings()
print 'Creating server listening on port ' + `port`
self.server = ceda.CreateWsipcServer(self.iocp, self.workingSet, self.getProtocolId(), self.getHostInfo(), port, tcpSettings)
def createClient(self, hostname, port):
if self.iocp is None:
self.iocp = self.local_iocp = ceda.CreateIocp2(0)
tcpSettings = ceda.TcpSettings()
print 'Creating client connecting to ' + `hostname` + ':' + `port`
self.client = ceda.CreateWsipcClient(self.iocp, self.workingSet, self.getProtocolId(), hostname, port, tcpSettings)
def close(self):
self.setPSpaceAndCSpaceForThread()
if self.server:
print 'Closing server'
ceda.CloseEndPoint(self.server)
self.server = None
if self.client:
print 'Closing client'
ceda.CloseEndPoint(self.client)
self.client = None
if self.local_iocp:
print 'Closing Iocp'
ceda.CloseIocp(self.local_iocp)
self.local_iocp = None
if self.workingSet:
print 'Closing WorkingSet'
self.workingSet.Close()
self.workingSet = None
if self.pspace:
print 'Closing PSpace'
self.pspace.Close()
self.pspace = None
if self.pstore:
print 'Closing PersistStore'
self.pstore.Close()
self.pstore = None
ceda.SetThreadPSpace(None)
ceda.SetThreadCSpace(None)
def DataReplicationTest(dbSrc, dbDst, utRootKey, showRootObjFn):
print
print 'Replication test ' + dbSrc + ' to ' + dbDst
dbServer = Database()
dbServer.open(dbSrc, dataSetMaster=True)
dbServer.createServer(3000)
with Txn(dbServer):
numServerOperations = dbServer.workingSet.HistoryBufferSize()
print 'num server operations = ' + `numServerOperations`
dbClient = Database()
dbClient.open(dbDst, createNew=True, dataSetMaster=False)
dbClient.createClient("127.0.0.1", 3000)
# Wait until client has received the same number of operations
start = time.time()
while True:
with Txn(dbClient):
numClientOperations = dbClient.workingSet.HistoryBufferSize()
print 'num client operations = ' + `numClientOperations`
if numClientOperations == numServerOperations:
break
#time.sleep(0.01)
end = time.time()
print 'time to sync databases = ' + `(end-start)` + ' seconds'
with Txn(dbClient):
objList = dbClient.utRoot.Map[utRootKey]
if objList:
rootObject = objList[0].self
showRootObjFn(rootObject)
dbClient.close()
dbServer.close()