PythonUtilityFunctions.h

// PythonUtilityFunctions.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2017

@def mPythonUtilityFunctions =
{
import time
import sys
import traceback
    
class Txn(object):
    def __init__(self, db):
        self.db = db

    def __enter__(self):
        self.db.openTxn()

    def __exit__(self, etype, value, tb):
        if etype is not None:        # an exception has occurred
            traceback.print_exception(etype, value, tb)

            # We do not allow transactions to be automatically committed when exceptions are
            # raised, it is too easy for an invalid state to be committed to the database
            # The safest thing to do is to exit the process without comitting the transaction.
            sys.exit(-1)
        self.db.closeTxn()

class Database(object):
    def __init__(self):
        self.pstore = None
        self.pspace = None
        self.cspace = None
        self.workingSet = None
        self.utRoot = None
        self.client = None
        self.server = None

    def setPSpaceAndCSpaceForThread(self):
        ceda.SetThreadPSpace(self.pspace)
        ceda.SetThreadCSpace(self.cspace)

    # dataSetMaster determines whether a data set identifier is allocated using CreateGuid()
    # when the working set is created for the first time
    def open(self,path,createNew=False,dataSetMaster=True):
        # Open/create a PersistStore
        print('Opening ' + path)
        self.pstore = ceda.OpenPersistStore(
            path,
            ceda.EOpenMode.OM_CREATE_ALWAYS if createNew else ceda.EOpenMode.OM_OPEN_ALWAYS,
            ceda.PersistStoreSettings())

        # Open/create a PSpace in the PersistStore
        print('Opening PSpace')
        self.pspace = ceda.OpenPSpace(self.pstore, "MyPSpace", ceda.EOpenMode.OM_OPEN_ALWAYS, None)

        # Get the CSpace associated with the PSpace
        self.cspace = self.pspace.GetCSpace()

        with Txn(self):
            # Create/open a working set in the PSpace
            print('Opening working set')
            self.workingSet = ceda.OpenWorkingSetMachine("MyWorkingSet", dataSetMaster)

            # Get the root of the Universal Tree of the working set
            self.utRoot = self.workingSet.GetUTRoot()

    def bootstrapRootObject(self,utRootKey, createRootObjectFn):
        # bootstrap root object into the database
        with Txn(self):
            # self.utRoot.Map is of type xmap<xstring,xvector<pref<IObject>>>
            # objList is of type xvector<pref<IObject>>
            # When the database is first created this vector is empty
            objList = self.utRoot.Map[utRootKey]
            if objList:
                # We have opened an existing database
                self.justCreated = False
                self.rootObj = objList[0].self      # Bind to existing object in the database
            else:
                # We have created a new database
                self.justCreated = True
                self.rootObj = createRootObjectFn()
                objList.append(self.rootObj)
            self.cspace.AddGcRoot(self.rootObj)  # protect object from garbage collection
        return self.rootObj

    def addGcRoot(self,r):
        self.cspace.AddGcRoot(r)

    def removeGcRoot(self,r):
        self.cspace.RemoveGcRoot(r)

    # Open a transaction on the PSpace/CSpace
    def openTxn(self):
        self.setPSpaceAndCSpaceForThread()
        self.cspace.Lock(ceda.ECSpaceLockMode.Transaction)

    # Close the transaction on the PSpace
    def closeTxn(self):
        self.cspace.Unlock()

    def getProtocolId(self):
        return ceda.MakeWsipcSessionProtocolId("--protocol---", 1)

    def getHostInfo(self):
        return ceda.WsipcHostInfo()

    def createServer(self, port):
        sessionSettings = ceda.TcpMsgSessionSettings()
        print('Creating server listening on port ' + str(port))
        reuse_addr = True
        ep = ceda.CreateWsipcEndPoint(self.workingSet, self.getProtocolId(), self.getHostInfo())
        ioContextPool = None
        self.server = ceda.CreateTcpMsgServer(ceda.EProtocol.TCP_IPv4, port, reuse_addr, ep, sessionSettings, ioContextPool)

    def createClient(self, hostname, port):
        sessionSettings = ceda.TcpMsgSessionSettings()
        print('Creating client connecting to ' + str(hostname) + ':' + str(port))
        ep = ceda.CreateWsipcEndPoint(self.workingSet, self.getProtocolId(), self.getHostInfo())
        ioContextPool = None
        self.client = ceda.CreateTcpMsgClient(hostname, str(port), ep, sessionSettings, ioContextPool)

    def close(self):
        self.setPSpaceAndCSpaceForThread()

        if self.server:
            print('Closing server')
            ceda.CloseTcpMsgServer(self.server)
            self.server = None

        if self.client:
            print('Closing client')
            ceda.CloseTcpMsgClient(self.client)
            self.client = None

        if self.workingSet:
            print('Closing WorkingSet')
            with Txn(self):
                self.workingSet.Close()
            self.workingSet = None

        if self.pspace:
            print('Closing PSpace')
            self.pspace.Close()
            self.pspace = None

        if self.pstore:
            print('Closing PersistStore')
            self.pstore.Close()
            self.pstore = None

        ceda.SetThreadPSpace(None)
        ceda.SetThreadCSpace(None)

def DataReplicationTest(dbSrc, dbDst, utRootKey, showRootObjFn):
    print
    print('Replication test ' + dbSrc + ' to ' + dbDst)
    dbServer = Database()
    dbServer.open(dbSrc, dataSetMaster=True)
    dbServer.createServer(3000)
    with Txn(dbServer):
        numServerOperations = dbServer.workingSet.HistoryBufferSize()
    print('num server operations = ' + str(numServerOperations))

    dbClient = Database()
    dbClient.open(dbDst, createNew=True, dataSetMaster=False)
    dbClient.createClient("127.0.0.1", 3000)

    # Wait until client has received the same number of operations
    start = time.time()
    while True:
        with Txn(dbClient):
            numClientOperations = dbClient.workingSet.HistoryBufferSize()
        print('num client operations = ' + str(numClientOperations))
        if numClientOperations == numServerOperations:
            break
        #time.sleep(0.01)
    end = time.time()
    print('time to sync databases = ' + str(end-start) + ' seconds')

    with Txn(dbClient):
        objList = dbClient.utRoot.Map[utRootKey]
        if objList:
            rootObject = objList[0].self
            showRootObjFn(rootObject)

    dbClient.close()
    dbServer.close()
}