Python class to open a ceda database

Using your favorite text editor, create a file named cedadatabase.py as follows:


# cedadatabase.py
# This is python

import time
import sys
import traceback
import pyceda
ceda = pyceda.cns.ceda

class Txn(object):
    def __init__(self, db):
        self.db = db

    def __enter__(self):
        self.db.openTxn()

    def __exit__(self, etype, value, tb):
        if etype is not None:        # an exception has occurred
            traceback.print_exception(etype, value, tb)

            # We do not allow transactions to be automatically committed when exceptions are
            # raised, it is too easy for an invalid state to be committed to the database
            # The safest thing to do is to exit the process without comitting the transaction.
            sys.exit(-1)
        self.db.closeTxn()

class Database(object):
    def __init__(self, iocp = None):
        self.pstore = None
        self.pspace = None
        self.cspace = None
        self.workingSet = None
        self.utRoot = None
        self.iocp = iocp
        self.local_iocp = None
        self.client = None
        self.server = None

    def setPSpaceAndCSpaceForThread(self):
        ceda.SetThreadPSpace(self.pspace)
        ceda.SetThreadCSpace(self.cspace)

    # dataSetMaster determines whether a data set identifier is allocated using CreateGuid()
    # when the working set is created for the first time
    def open(self,path,createNew=False,dataSetMaster=True):
        # Open/create a PersistStore
        print 'Opening ' + path
        self.pstore = ceda.OpenPersistStore(
            path,
            ceda.EOpenMode.OM_CREATE_ALWAYS if createNew else ceda.EOpenMode.OM_OPEN_ALWAYS,
            ceda.PersistStoreSettings())

        # Open/create a PSpace in the PersistStore
        print 'Opening PSpace'
        self.pspace = ceda.OpenPSpace(self.pstore, "MyPSpace", ceda.EOpenMode.OM_OPEN_ALWAYS, None)

        # Get the CSpace associated with the PSpace
        self.cspace = self.pspace.GetCSpace()

        with Txn(self):
            # Create/open a working set in the PSpace
            print 'Opening working set'
            self.workingSet = ceda.OpenWorkingSetMachine("MyWorkingSet", dataSetMaster)

            # Get the root of the Universal Tree of the working set
            self.utRoot = self.workingSet.GetUTRoot()

    def bootstrapRootObject(self,utRootKey, createRootObjectFn):
        # bootstrap root object into the database
        with Txn(self):
            # self.utRoot.Map is of type xmap > > >
            # objList is of type xvector > >
            # When the database is first created this vector is empty
            objList = self.utRoot.Map[utRootKey]
            if objList:
                # We have opened an existing database
                self.justCreated = False
                self.rootObj = objList[0].self      # Bind to existing object in the database
            else:
                # We have created a new database
                self.justCreated = True
                self.rootObj = createRootObjectFn()
                objList.append(self.rootObj)
            self.cspace.AddGcRoot(self.rootObj)  # protect object from garbage collection
        return self.rootObj

    def addGcRoot(self,r):
        self.cspace.AddGcRoot(r)

    def removeGcRoot(self,r):
        self.cspace.RemoveGcRoot(r)

    # Open a transaction on the PSpace/CSpace
    def openTxn(self):
        self.setPSpaceAndCSpaceForThread()
        self.cspace.Lock(ceda.ECSpaceLockMode.Transaction)

    # Close the transaction on the PSpace
    def closeTxn(self):
        self.cspace.Unlock()

    def getProtocolId(self):
        return ceda.MakeWsipcSessionProtocolId("--protocol---", 1)

    def getHostInfo(self):
        return ceda.WsipcHostInfo()

    def createServer(self, port):
        if self.iocp is None:
            self.iocp = self.local_iocp = ceda.CreateIocp2(0)
        tcpSettings = ceda.TcpSettings()
        print 'Creating server listening on port ' + `port`
        self.server = ceda.CreateWsipcServer(self.iocp, self.workingSet, self.getProtocolId(), self.getHostInfo(), port, tcpSettings)

    def createClient(self, hostname, port):
        if self.iocp is None:
            self.iocp = self.local_iocp = ceda.CreateIocp2(0)
        tcpSettings = ceda.TcpSettings()
        print 'Creating client connecting to ' + `hostname` + ':' + `port`
        self.client = ceda.CreateWsipcClient(self.iocp, self.workingSet, self.getProtocolId(), hostname, port, tcpSettings)

    def close(self):
        self.setPSpaceAndCSpaceForThread()

        if self.server:
            print 'Closing server'
            ceda.CloseEndPoint(self.server)
            self.server = None

        if self.client:
            print 'Closing client'
            ceda.CloseEndPoint(self.client)
            self.client = None

        if self.local_iocp:
            print 'Closing Iocp'
            ceda.CloseIocp(self.local_iocp)
            self.local_iocp = None

        if self.workingSet:
            print 'Closing WorkingSet'
            self.workingSet.Close()
            self.workingSet = None

        if self.pspace:
            print 'Closing PSpace'
            self.pspace.Close()
            self.pspace = None

        if self.pstore:
            print 'Closing PersistStore'
            self.pstore.Close()
            self.pstore = None

        ceda.SetThreadPSpace(None)
        ceda.SetThreadCSpace(None)

def DataReplicationTest(dbSrc, dbDst, utRootKey, showRootObjFn):
    print
    print 'Replication test ' + dbSrc + ' to ' + dbDst
    dbServer = Database()
    dbServer.open(dbSrc, dataSetMaster=True)
    dbServer.createServer(3000)
    with Txn(dbServer):
        numServerOperations = dbServer.workingSet.HistoryBufferSize()
    print 'num server operations = ' + `numServerOperations`

    dbClient = Database()
    dbClient.open(dbDst, createNew=True, dataSetMaster=False)
    dbClient.createClient("127.0.0.1", 3000)

    # Wait until client has received the same number of operations
    start = time.time()
    while True:
        with Txn(dbClient):
            numClientOperations = dbClient.workingSet.HistoryBufferSize()
        print 'num client operations = ' + `numClientOperations`
        if numClientOperations == numServerOperations:
            break
        #time.sleep(0.01)
    end = time.time()
    print 'time to sync databases = ' + `(end-start)` + ' seconds'

    with Txn(dbClient):
        objList = dbClient.utRoot.Map[utRootKey]
        if objList:
            rootObject = objList[0].self
            showRootObjFn(rootObject)

    dbClient.close()
    dbServer.close()