IWorkingSetMachine.h
// IWorkingSetMachine.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2010
@import "cxOperation.h"
@import "VecTime.h"
@import "Ceda/cxPersistStore/IPersistStore.h"
#include "Ceda/cxUtils/xvector.h"
#include "Ceda/cxUtils/BasicTypes.h"
/*
DeltaRW and RodSession allow for sending and receiving deltas or replicate-on-demand objects,
without being concerned with a high level mechanism to multiplex messages, or policies for
prioritising what object or delta is to be sent next on a given connection.
*/
namespace ceda
{
$struct UTRoot;
// todo: make Xcpp generate this when have forward declaration $struct+ UTRoot with the +
@api ceda::ReflectedClass const& _GetReflected(UTRoot const*);
///////////////////////////////////////////////////////////////////////////////////////////////////
// IMoreToSend
struct IMoreToSend
{
// Called with a lock on the PSpace when either local or remote operations are being applied),
// to indicate that there is more to be sent by the associated DetaRW.
virtual void OnMoreToSend() = 0;
};
$extern+ IMoreToSend;
struct EmptyMoreToSendHandler : public IMoreToSend
{
virtual void OnMoreToSend() {}
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// DeltaRW
/*
A DeltaRW is associated with full duplex communication between a local and remote working set.
It supports synchronous reading and writing of an ordered sequence of deltas sent between the
working sets in order to synchronise them.
A DeltaRW has transient state in order to compress the byte stream efficiently, and to record
transient information about what parts of the UT have changed.
Since a DeltaRW is stateful it is regarded as a state machine, not just pure functions to
send or receive deltas. Nevertheless it is not concerned with creating or managing
internal threads or sockets for IPC. Instead deltas are read/written synchronously to a given
archive.
Threading contract:
(
DeserialiseNextDelta |
(
!Init <
(
$$(HaveChangesToSend, WriteNextDelta)
)
)
) <
! Close
*/
$adt+ DeltaRW
{
// Requires an lock on the associated CSpace, and CSpace/PSpace to be set in TLS
void Close();
// Get the associated PSpace. Never returns nullptr.
// Can be called with or without a lock on the CSpace.
PSpace* GetPSpace();
/////////////////////// Reader ////////////////////////
// Read and apply the next delta from 'ar'. Sets 'appliedChanges' if changes were
// actually applied.
// Requires an exclusive lock on the associated CSpace, and CSpace/PSpace to be set in TLS
void DeserialiseNextDelta(InputArchive& ar, bool& appliedChanges);
/////////////////////// Writer ////////////////////////
// Must be called exactly once, and before all other public methods in this class.
// Requires an exclusive lock on the associated CSpace, and CSpace/PSpace to be set in TLS
void Init(const VecTime& rhv);
// Are there changes waiting to be sent?
// Requires a lock on the associated CSpace, and CSpace/PSpace to be set in TLS
bool HaveChangesToSend() const;
// Write the next delta to ar. Returns false if nothing to write.
// Requires a lock on the associated CSpace, and CSpace/PSpace to be set in TLS
bool WriteNextDelta(Archive& ar);
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// RodSession
/*
A RodSession can be used to send/receive the read only objects which are replicated on demand
in a working set.
*/
$adt+ RodSession
{
// Requires an lock on the associated CSpace, and CSpace/PSpace to be set in TLS
void Close();
// Get the associated PSpace. Never returns nullptr.
// Can be called with or without a lock on the CSpace.
PSpace* GetPSpace();
/////////////////////// Reader ////////////////////////
// Read the next message from 'ar'
// Requires an lock on the associated CSpace, and CSpace/PSpace to be set in TLS
void ReadNextMessage(InputArchive& ar);
/////////////////////// Writer ////////////////////////
// Write the next message to ar. Returns false if nothing to write.
// Requires a lock on the associated CSpace, and CSpace/PSpace to be set in TLS
bool WriteNextMessage(Archive& ar);
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// WorkingSetMachineCounters
/*
Note that these counters are only transient (they are zeroed when the WorkingSetMachine is
opened)
*/
$struct+ WorkingSetMachineCounters
{
int64 numLocalCreatedRodObjects;
int64 numReceivedRodObjects;
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// WorkingSetMachine
/*
A WorkingSet refers to the persistent state, regarded as an abstract value that records a
Universal Tree (UT) plus all the changes on the UT in a form suitable for synchronisation
using Operational Transformation (OT).
A WorkingSetMachine is associated with a single persistent WorkingSet. It manages a set of
DeltaRWs which are used to read/write deltas.
*/
$adt+ WorkingSetMachine
{
///////////////// Functions that do not care whether there is a lock on the PSpace, or whether CSpace/PSpace has been set in TLS /////////////////
const xstring& GetName() const;
// Get the PSpace associated with the WorkingSet. Never returns nullptr
// A client may call this in order to lock the PSpace in order to read/write objects in
// the UT
PSpace* GetPSpace();
///////////////// Functions that require a lock on the PSpace, and CSpace/PSpace to be set in TLS /////////////////
// Any WorkingSet that is successfully opened must be closed exactly once when it is no longer
// required.
// Every DeltaRW and RodSession opened on the working set must be closed prior to closing the
// working set.
// Close() must be called after all other functions on the WorkingSetMachine.
void Close();
// Permanently destroy the given WorkingSet.
// todo 1. This can be very time consuming, suggesting that it should be performed asynchronously
// 2. This is currently not implemented.
void PermanentlyDestroy();
// Create a new delta reader/writer. 'n' is notified each time there is more
// available to send.
// Note: !OpenWorkingSet < ||(CreateDeltaRW,CreateRodSession) < !Close
DeltaRW* CreateDeltaRW(IMoreToSend* n);
RodSession* CreateRodSession(IMoreToSend* n);
Guid GetDataSetId() const;
bool CheckCompatibleDataSetId(const Guid& peerDataSetId);
// Get the SiteId associated with this working set
SiteId GetSiteId() const;
// Get the current number of operations in the HB
int64 HistoryBufferSize() const;
// Get the vector time describing the set of operations that have been applied to
// the working set.
void GetVectorTime(VecTime& v) const;
bool IsRodObject(OID oid) const;
bool IsCustodian() const;
void GetAllocationOidhighs(xvector<OidHigh>& oidhighs) const;
void GetSubscriptionOids(xvector<OID>& oids) const;
// This can be called when a DGS node is invalidated to determine whether it occurred as a
// result of a remote operation being applied to this working set
bool ApplyingRemoteOperation() const;
void GetCounters(WorkingSetMachineCounters& counters) const;
// Get the UT root for this working set.
UTRoot& GetUTRoot();
void SetIsCustodian(bool isCustodian);
OID AllocateRodOid();
};
/*
Open or create a working set with the given name within the current PSpace in thread local storage.
Must be called with a lock on the PSpace.
Working sets within a given PSpace must be uniquely named.
A working set should be closed when it is no longer required. It is an error to try to
open a working set that is already open.
dataSetMaster determines whether a data set identifier is allocated using CreateGuid() when the
working set is created for the first time.
If the working set already exists then:
datasetid is read as part of the persistent state of the working set
Otherwise (the working set is being created for the first time):
If 'dataSetMaster' is true:
datasetid is allocated with a call to CreateGuid()
else
datasetid is initialised to the null guid
*/
$function+ WorkingSetMachine* OpenWorkingSetMachine(ConstStringZ name, bool dataSetMaster);
$function+ inline void SetTlsForWorkingSet(WorkingSetMachine* ws)
{
SetTlsForPSpace(GetPSpace(ws));
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// Send/receive functions that work with xvector<octet_t>
// Calls WriteNextDelta() on the given DeltaRW and writes the bytes to the given buffer
// Returns false if the delta is empty.
// Requires a lock on the associated CSpace, and the CSpace/PSpace to be set in TLS
@api bool GetNextDeltaToSend(DeltaRW* drw, xvector<octet_t>& buffer);
// Calls DeserialiseNextDelta() on the given DeltaRW using the given buffer
// Returns false if no changes were applied.
// Requires a lock on the associated CSpace, and the CSpace/PSpace to be set in TLS
@api bool ApplyReceivedDelta(DeltaRW* drw, const xvector<octet_t>& buffer);
// Calls WriteNextMessage() on the given RodSession and writes the bytes to the given buffer
// Returns false if there is no message to send.
// Requires a lock on the associated CSpace, and the CSpace/PSpace to be set in TLS
@api bool GetNextRodSessionMessageToSend(RodSession* rs, xvector<octet_t>& buffer);
// Calls ReadNextMessage() on the given RodSession using the given buffer
// Returns false if no changes were applied.
// Requires a lock on the associated CSpace, and the CSpace/PSpace to be set in TLS
@api void ApplyReceivedRodSessionMessage(RodSession* rs, const xvector<octet_t>& buffer);
///////////////////////////////////////////////////////////////////////////////////////////////////
// In-process synchronisation between working sets
/*
Apply all changes in wsSrc to wsDst, but not the reverse.
Must be called without locks on the associated PSpaces.
NOTE WELL: HalfDupleSync performs an exchange of vector times every time, which
can be quite slow, it should not be called frequently.
*/
$function+ void HalfDuplexSync(WorkingSetMachine* wsSrc, WorkingSetMachine* wsDst);
// Like HalfDuplexSync except that changes are applied in both directions.
$function+ void FullDuplexSync(WorkingSetMachine* ws1, WorkingSetMachine* ws2);
$function+ void SendAndReceiveRodObjects(RodSession* rsSrc, RodSession* rsDst);
// Retrieve the delta from drwSrc and apply it to drwDst
// Must be called without any PSpace locks
// Returns the number of bytes in the delta
$function+ ssize_t SendAndReceiveDelta(DeltaRW* drwSrc, DeltaRW* drwDst);
// Must be called without locks on the associated PSpaces
$function+ void ExchangeVectorTimes(
WorkingSetMachine* ws1, WorkingSetMachine* ws2,
DeltaRW* drw1, DeltaRW* drw2);
} // namespace ceda