IWorkingSetMachine.h

// IWorkingSetMachine.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2010

@import "cxOperation.h"
@import "VecTime.h"
@import "Ceda/cxPersistStore/IPersistStore.h"
#include "Ceda/cxUtils/xvector.h"
#include "Ceda/cxUtils/BasicTypes.h"

/*
DeltaRW and RodSession allow for sending and receiving deltas or replicate-on-demand objects,
without being concerned with a high level mechanism to multiplex messages, or policies for 
prioritising what object or delta is to be sent next on a given connection.
*/

namespace ceda
{
$struct UTRoot;

// todo: make Xcpp generate this when have forward declaration $struct+ UTRoot with the +
@api ceda::ReflectedClass const& _GetReflected(UTRoot const*);


///////////////////////////////////////////////////////////////////////////////////////////////////
// IMoreToSend

struct IMoreToSend
{
    // Called with a lock on the PSpace when either local or remote operations are being applied), 
    // to indicate that there is more to be sent by the associated DetaRW.
    virtual void OnMoreToSend() = 0;
};

$extern+ IMoreToSend;

struct EmptyMoreToSendHandler : public IMoreToSend
{
    virtual void OnMoreToSend() {}
};

///////////////////////////////////////////////////////////////////////////////////////////////////
// DeltaRW
/*
A DeltaRW is associated with full duplex communication between a local and remote working set.

It supports synchronous reading and writing of an ordered sequence of deltas sent between the 
working sets in order to synchronise them.

A DeltaRW has transient state in order to compress the byte stream efficiently, and to record
transient information about what parts of the UT have changed.

Since a DeltaRW is stateful it is regarded as a state machine, not just pure functions to 
send or receive deltas.  Nevertheless it is not concerned with creating or managing 
internal threads or sockets for IPC.  Instead deltas are read/written synchronously to a given 
archive.

Threading contract:

    (
        DeserialiseNextDelta |
        (
            !Init <
            (
                $$(HaveChangesToSend, WriteNextDelta)
            )
        )
    ) <
    ! Close
*/

$adt+ DeltaRW
{
    // Requires an lock on the associated CSpace, and CSpace/PSpace to be set in TLS
    void Close();

    // Get the associated PSpace.  Never returns nullptr.
    // Can be called with or without a lock on the CSpace.
    PSpace* GetPSpace();

    /////////////////////// Reader ////////////////////////

    // Read and apply the next delta from 'ar'.  Sets 'appliedChanges' if changes were 
    // actually applied.
    // Requires an exclusive lock on the associated CSpace, and CSpace/PSpace to be set in TLS
    void DeserialiseNextDelta(InputArchive& ar, bool& appliedChanges);

    /////////////////////// Writer ////////////////////////
    
    // Must be called exactly once, and before all other public methods in this class.
    // Requires an exclusive lock on the associated CSpace, and CSpace/PSpace to be set in TLS
    void Init(const VecTime& rhv);

    // Are there changes waiting to be sent?
    // Requires a lock on the associated CSpace, and CSpace/PSpace to be set in TLS
    bool HaveChangesToSend() const;
    
    // Write the next delta to ar.  Returns false if nothing to write.
    // Requires a lock on the associated CSpace, and CSpace/PSpace to be set in TLS
    bool WriteNextDelta(Archive& ar);
};

///////////////////////////////////////////////////////////////////////////////////////////////////
// RodSession
/*
A RodSession can be used to send/receive the read only objects which are replicated on demand 
in a working set.
*/

$adt+ RodSession
{
    // Requires an lock on the associated CSpace, and CSpace/PSpace to be set in TLS
    void Close();

    // Get the associated PSpace.  Never returns nullptr.
    // Can be called with or without a lock on the CSpace.
    PSpace* GetPSpace();

    /////////////////////// Reader ////////////////////////

    // Read the next message from 'ar'
    // Requires an lock on the associated CSpace, and CSpace/PSpace to be set in TLS
    void ReadNextMessage(InputArchive& ar);

    /////////////////////// Writer ////////////////////////
    
    // Write the next message to ar.  Returns false if nothing to write.
    // Requires a lock on the associated CSpace, and CSpace/PSpace to be set in TLS
    bool WriteNextMessage(Archive& ar);
};

///////////////////////////////////////////////////////////////////////////////////////////////////
// WorkingSetMachineCounters

/*
Note that these counters are only transient (they are zeroed when the WorkingSetMachine is
opened)
*/

$struct+ WorkingSetMachineCounters
{
    int64 numLocalCreatedRodObjects;
    int64 numReceivedRodObjects;
};

///////////////////////////////////////////////////////////////////////////////////////////////////
// WorkingSetMachine

/*
A WorkingSet refers to the persistent state, regarded as an abstract value that records a 
Universal Tree (UT) plus all the changes on the UT in a form suitable for synchronisation 
using Operational Transformation (OT).

A WorkingSetMachine is associated with a single persistent WorkingSet.  It manages a set of 
DeltaRWs which are used to read/write deltas.
*/

$adt+ WorkingSetMachine
{
    ///////////////// Functions that do not care whether there is a lock on the PSpace, or whether CSpace/PSpace has been set in TLS  /////////////////

    const xstring& GetName() const;

    // Get the PSpace associated with the WorkingSet.  Never returns nullptr
    // A client may call this in order to lock the PSpace in order to read/write objects in 
    // the UT
    PSpace* GetPSpace();

    ///////////////// Functions that require a lock on the PSpace, and CSpace/PSpace to be set in TLS  /////////////////

    // Any WorkingSet that is successfully opened must be closed exactly once when it is no longer 
    // required.
    // Every DeltaRW and RodSession opened on the working set must be closed prior to closing the 
    // working set.
    // Close() must be called after all other functions on the WorkingSetMachine.
    void Close();

    // Permanently destroy the given WorkingSet.  
    // todo 1. This can be very time consuming, suggesting that it should be performed asynchronously
    //      2. This is currently not implemented.
    void PermanentlyDestroy();

    // Create a new delta reader/writer. 'n' is notified each time there is more 
    // available to send.
    // Note:   !OpenWorkingSet < ||(CreateDeltaRW,CreateRodSession) < !Close
    DeltaRW* CreateDeltaRW(IMoreToSend* n);

    RodSession* CreateRodSession(IMoreToSend* n);

    Guid GetDataSetId() const;
    bool CheckCompatibleDataSetId(const Guid& peerDataSetId);

    // Get the SiteId associated with this working set
    SiteId GetSiteId() const;
    
    // Get the current number of operations in the HB
    int64 HistoryBufferSize() const;
    
    // Get the vector time describing the set of operations that have been applied to 
    // the working set.
    void GetVectorTime(VecTime& v) const;
    
    bool IsRodObject(OID oid) const;

    bool IsCustodian() const;

    void GetAllocationOidhighs(xvector<OidHigh>& oidhighs) const;
    void GetSubscriptionOids(xvector<OID>& oids) const;

    // This can be called when a DGS node is invalidated to determine whether it occurred as a 
    // result of a remote operation being applied to this working set
    bool ApplyingRemoteOperation() const;

    void GetCounters(WorkingSetMachineCounters& counters) const;

    // Get the UT root for this working set.  
    UTRoot& GetUTRoot();

    void SetIsCustodian(bool isCustodian);

    OID AllocateRodOid();
};

/*
Open or create a working set with the given name within the current PSpace in thread local storage.  
Must be called with a lock on the PSpace.

Working sets within a given PSpace must be uniquely named.

A working set should be closed when it is no longer required. It is an error to try to 
open a working set that is already open.

dataSetMaster determines whether a data set identifier is allocated using CreateGuid() when the 
working set is created for the first time.

If the working set already exists then:
    datasetid is read as part of the persistent state of the working set
Otherwise (the working set is being created for the first time):
    If 'dataSetMaster' is true:
        datasetid is allocated with a call to CreateGuid()
    else
        datasetid is initialised to the null guid
*/
$function+ WorkingSetMachine* OpenWorkingSetMachine(ConstStringZ name, bool dataSetMaster);

$function+ inline void SetTlsForWorkingSet(WorkingSetMachine* ws)
{
    SetTlsForPSpace(GetPSpace(ws));
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// Send/receive functions that work with xvector<octet_t>

// Calls WriteNextDelta() on the given DeltaRW and writes the bytes to the given buffer
// Returns false if the delta is empty.
// Requires a lock on the associated CSpace, and the CSpace/PSpace to be set in TLS
@api bool GetNextDeltaToSend(DeltaRW* drw, xvector<octet_t>& buffer);

// Calls DeserialiseNextDelta() on the given DeltaRW using the given buffer
// Returns false if no changes were applied.
// Requires a lock on the associated CSpace, and the CSpace/PSpace to be set in TLS
@api bool ApplyReceivedDelta(DeltaRW* drw, const xvector<octet_t>& buffer);

// Calls WriteNextMessage() on the given RodSession and writes the bytes to the given buffer
// Returns false if there is no message to send.
// Requires a lock on the associated CSpace, and the CSpace/PSpace to be set in TLS
@api bool GetNextRodSessionMessageToSend(RodSession* rs, xvector<octet_t>& buffer);

// Calls ReadNextMessage() on the given RodSession using the given buffer
// Returns false if no changes were applied.
// Requires a lock on the associated CSpace, and the CSpace/PSpace to be set in TLS
@api void ApplyReceivedRodSessionMessage(RodSession* rs, const xvector<octet_t>& buffer);

///////////////////////////////////////////////////////////////////////////////////////////////////
// In-process synchronisation between working sets

/*
Apply all changes in wsSrc to wsDst, but not the reverse.
Must be called without locks on the associated PSpaces.
NOTE WELL: HalfDupleSync performs an exchange of vector times every time, which
can be quite slow, it should not be called frequently.
*/
$function+ void HalfDuplexSync(WorkingSetMachine* wsSrc, WorkingSetMachine* wsDst);

// Like HalfDuplexSync except that changes are applied in both directions.
$function+ void FullDuplexSync(WorkingSetMachine* ws1, WorkingSetMachine* ws2);

$function+ void SendAndReceiveRodObjects(RodSession* rsSrc, RodSession* rsDst);

// Retrieve the delta from drwSrc and apply it to drwDst
// Must be called without any PSpace locks
// Returns the number of bytes in the delta
$function+ ssize_t SendAndReceiveDelta(DeltaRW* drwSrc, DeltaRW* drwDst);

// Must be called without locks on the associated PSpaces
$function+ void ExchangeVectorTimes(
    WorkingSetMachine* ws1, WorkingSetMachine* ws2,
    DeltaRW* drw1, DeltaRW* drw2);

} // namespace ceda