DGAsyncNode.h
@import "Ceda/cxObject/Object.h"
@import "Ceda/cxObject/DGNode.h"
@import "Ceda/cxObject/WCSpace.h"
@import "Ceda/cxObject/ThreadPtr.h"
#include "Ceda/cxUtils/Tracer.h"
#include <utility>
namespace ceda
{
///////////////////////////////////////////////////////////////////////////////////////////////////
// assign_c
/*
assign_c(x,y) denotes a "consumable assignment" and typically performs x = y. Unlike normal
assignment, both x and y must denote mutable variables of the same type. y is allowed
to be modified as necessary to make the assignment as efficient as possible.
post condition: x has original value of y
There is no post condition on y.
assign_c is useful for generic algorithms that perform assignment between two variables of
the same type, and the algorithm doesn't care about the value of the rhs after the assignment.
For some types such as float it is most efficient to assign and for other types such as
std::vector it is more efficient to swap.
*/
/*
template <typename T>
void assign_c(T& x, T& y)
{
// Appropriate when assignment is more efficient than swap
x = y;
}
template <typename T>
void assign_c(xvector<T>& x, xvector<T>& y)
{
x.swap(y);
}
*/
///////////////////////////////////////////////////////////////////////////////////////////////////
// By default to apply the newly calculated output to the existing output, swap it in
template<typename Output>
void UpdateOutput(Output& dst, Output& src)
{
using std::swap;
swap(dst, src);
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// DGAsyncIndepNode
template <typename FinalClass, typename Self, typename Output>
class DGAsyncIndepNode : public DGIndepNode
{
public:
typedef Output output_type;
virtual ssize_t ByteSize() const
{
//Tracer() << "DGAsyncIndepNode " << Name() << " byte size = " << CacheValueAdditionalSize(_output) << '\n';
// This is the *overhead* of caching the output, it is assumed when OnEvict() calls
// ClearCacheValue(_output), CacheValueAdditionalSize() gives the number of bytes that is recovered.
return CacheValueAdditionalSize(_output);
}
virtual void VisitContainingObject(IObjectVisitor& _v) const
{
_v << _output << static_cast<const FinalClass*>(this)->_GetFinalSelf();
}
virtual void OnEvict() const
{
OnEvictCacheValue(_output);
ClearCacheValue(_output);
}
virtual bool IsEvictable() const
{
return !Enabled(DF_ASYNC_RUNNING) && CacheValueIsEvictable(_output);
}
const output_type& read() const
{
if (Enabled(DF_CALLED_READ))
{
if (Enabled(DF_ASYNC_RUNNING))
{
// read() has already been called, and an async output is currently being calculated.
// Attach out-nodes so when the output is updated the out-nodes will be invalidated
ReadBarrier();
}
else
{
// Calls to read() don't invoke the read barrier once the output has been
// calculated, because it cannot change once it is calculated.
}
}
else
{
// read() has been called for the first time.
SetFlag(DF_CALLED_READ);
// Attach out-nodes - these will be invalidated when the async output has been calculated.
ReadBarrier();
CSpace* cspace = GetThreadPtr<CSpace>();
cxAssert(!Enabled(DF_ASYNC_RUNNING));
SetFlag(DF_ASYNC_RUNNING);
// _output hasn't been calculated before so post a task to calculate it.
PostAsyncTask(cspace, [this]()
{
PrepareThreadLocalStorage(static_cast<const FinalClass*>(this)->_GetFinalSelf());
// Calculate output from key without a CSpace lock, hold output in a local variable
output_type localOutput;
FinalClass::CalcOutput(static_cast<const FinalClass*>(this)->_GetSelf(), localOutput);
{
// Use CSpace lock to swap in the output, then invalidate the out-nodes
CSpaceTxn txn;
cxAssert(Enabled(DF_ASYNC_RUNNING));
UpdateOutput(_output, localOutput);
OnChange();
ClearFlag(DF_ASYNC_RUNNING);
if (TypeHasAdditionalSize<output_type>()) UpdateByteSize();
}
});
}
return _output;
}
protected:
mutable output_type _output; // Protected by a CSpace lock
};
///////////////////////////////////////////////////////////////////////////////////////////////////
// DGKeyedAsyncIndepNode
template <typename FinalClass, typename Self, typename Key, typename Output>
class DGKeyedAsyncIndepNode : public DGIndepNode
{
public:
typedef Key key_type;
typedef Output output_type;
typedef std::map<key_type,FinalClass> map_type;
DGKeyedAsyncIndepNode() :
_self(nullptr)
{
}
virtual ssize_t ByteSize() const
{
const ssize_t MapElementSize = sizeof(typename map_type::value_type);
//Tracer() << "DGKeyedAsyncIndepNode " << Name() << " byte size = " << StdMapElementOverhead << " + "
// << MapElementSize << " + " << CacheValueAdditionalSize(_output) << '\n';
return StdMapElementOverhead + MapElementSize + CacheValueAdditionalSize(_output);
}
key_type const& _GetKey() const
{
return GetKeyFromValueInPair<key_type,FinalClass>(static_cast<const FinalClass*>(this));
}
virtual void VisitContainingObject(IObjectVisitor& _v) const
{
_v << _output << _self;
}
virtual void OnEvict() const
{
OnEvictCacheValue(_output);
auto& m = static_cast<const FinalClass*>(this)->_GetMap();
cxVerify(m.erase(_GetKey()) == 1);
}
virtual bool IsEvictable() const
{
return !Enabled(DF_ASYNC_RUNNING) && CacheValueIsEvictable(_output);
}
const output_type& read(Self const* self) const
{
if (_self)
{
if (Enabled(DF_ASYNC_RUNNING))
{
// read() has already been called, and an async output is currently being calculated.
// Attach out-nodes so when the output is updated the out-nodes will be invalidated
ReadBarrier();
}
else
{
// Calls to read() don't invoke the read barrier once the output has been
// calculated, because it cannot change once it is calculated.
}
}
else
{
// read() has been called for the first time.
// Attach out-nodes - these will be invalidated when the async output has been calculated.
ReadBarrier();
_self = self;
CSpace* cspace = GetThreadPtr<CSpace>();
cxAssert(!Enabled(DF_ASYNC_RUNNING));
SetFlag(DF_ASYNC_RUNNING);
// _output hasn't been calculated before so post a task to calculate it.
PostAsyncTask(cspace, [this]()
{
PrepareThreadLocalStorage(_self);
// Calculate output from key without a CSpace lock, hold output in a local variable
output_type localOutput;
FinalClass::CalcOutput(_self, localOutput, _GetKey());
{
// Use CSpace lock to swap in the output, then invalidate the out-nodes
CSpaceTxn txn;
cxAssert(Enabled(DF_ASYNC_RUNNING));
UpdateOutput(_output, localOutput);
OnChange();
ClearFlag(DF_ASYNC_RUNNING);
if (TypeHasAdditionalSize<output_type>()) UpdateByteSize();
}
});
}
return _output;
}
protected:
mutable Self const* _self;
mutable output_type _output; // Protected by a CSpace lock
};
///////////////////////////////////////////////////////////////////////////////////////////////
// DGAsyncNodeOnInput
template <typename FinalClass, typename Self, typename Input, typename Output>
class DGAsyncNodeOnInput : public DGDepNode
{
public:
typedef Input input_type;
typedef Output output_type;
DGAsyncNodeOnInput() :
DGDepNode(DF_ASYNC)
{
}
virtual ssize_t ByteSize() const
{
// Note that we don't add CacheValueAdditionalSize(input_) because we clear the input
// at the time the output is updated
//Tracer() << "DGAsyncNodeOnInput " << Name() << " byte size = " << CacheValueAdditionalSize(_output) << '\n';
return CacheValueAdditionalSize(_output);
}
virtual void VisitContainingObject(IObjectVisitor& _v) const
{
_v << input_ << _output << static_cast<const FinalClass*>(this)->_GetFinalSelf();
}
virtual void OnEvict() const
{
OnEvictCacheValue(_output);
ClearCacheValue(_output);
}
virtual bool IsEvictable() const
{
return !Enabled(DF_ASYNC_RUNNING) && CacheValueIsEvictable(_output);
}
virtual bool RecalcCache() const
{
FinalClass::CalcInput(static_cast<const FinalClass*>(this)->_GetSelf(), input_);
return false; // Pretend input not changed to avoid propagating dirtiness into the out-nodes
}
virtual void OnInvalidate() const
{
if (Enabled(DF_ASYNC_RUNNING))
{
// Already running an async task, don't post another
}
else
{
SetFlag(DF_ASYNC_RUNNING);
PostAsyncTask(GetThreadPtr<CSpace>(), [this]()
{
PrepareThreadLocalStorage(static_cast<const FinalClass*>(this)->_GetFinalSelf());
// Calculate the inputs
{
CSpaceTxn txn;
cxAssert(Enabled(DF_ASYNC_RUNNING));
ReadBarrierDep(); // This cleans this node
}
while(1)
{
// Calculate output from input without a CSpace lock, hold output in a local variable
output_type localOutput;
FinalClass::CalcOutput(static_cast<const FinalClass*>(this)->_GetSelf(), localOutput, input_);
// Apply the output and invalidate out-nodes
{
CSpaceTxn txn;
cxAssert(Enabled(DF_ASYNC_RUNNING));
UpdateOutput(_output, localOutput);
input_ = input_type(); // Clear the input - might allow some memory reclamation
MarkOutNodesAsSoftAndHardDirty();
if (!Enabled(DF_SOFT_DIRTY))
{
// Finished
ClearFlag(DF_ASYNC_RUNNING);
if (TypeHasAdditionalSize<output_type>()) UpdateByteSize();
return;
}
else
{
// This DGS node is dirty indicating that the inputs have changed again, so we need to recalculate again
// Recalculate the input. This cleans this node
ReadBarrierDep();
// continue in the while loop
}
}
}
});
}
}
const output_type& read() const
{
if (!Enabled(DF_CALLED_READ))
{
SetFlag(DF_CALLED_READ);
OnInvalidate();
}
ReadBarrierIndep(); // Attach out-nodes
return _output;
}
protected:
///////////////////////////// Protected by a CSpace lock
mutable input_type input_;
mutable output_type _output;
};
///////////////////////////////////////////////////////////////////////////////////////////////
// DGKeyedAsyncNodeOnInput
template <typename FinalClass, typename Self, typename Key, typename Input, typename Output>
class DGKeyedAsyncNodeOnInput : public DGDepNode
{
public:
typedef Key key_type;
typedef Input input_type;
typedef Output output_type;
typedef std::map<key_type,FinalClass> map_type;
DGKeyedAsyncNodeOnInput() :
DGDepNode(DF_ASYNC),
_self(nullptr)
{
}
virtual ssize_t ByteSize() const
{
// Note that we don't add CacheValueAdditionalSize(input_) because we clear the input
// at the time the output is updated
const ssize_t MapElementSize = sizeof(typename map_type::value_type);
//Tracer() << "DGKeyedAsyncNodeOnInput " << Name() << " byte size = " << StdMapElementOverhead << " + "
// << MapElementSize << " + " << CacheValueAdditionalSize(_output) << '\n';
return StdMapElementOverhead + MapElementSize + CacheValueAdditionalSize(_output);
}
key_type const& _GetKey() const
{
return GetKeyFromValueInPair<key_type,FinalClass>(static_cast<const FinalClass*>(this));
}
virtual void VisitContainingObject(IObjectVisitor& _v) const
{
_v << input_ << _output << _self;
}
virtual void OnEvict() const
{
OnEvictCacheValue(_output);
auto& m = static_cast<const FinalClass*>(this)->_GetMap();
cxVerify(m.erase(_GetKey()) == 1);
}
virtual bool IsEvictable() const
{
return !Enabled(DF_ASYNC_RUNNING) && CacheValueIsEvictable(_output);
}
virtual bool RecalcCache() const
{
FinalClass::CalcInput(_self, input_, _GetKey());
return false; // Pretend input not changed to avoid propagating dirtiness into the out-nodes
}
virtual void OnInvalidate() const
{
if (Enabled(DF_ASYNC_RUNNING))
{
// Already running an async task, don't post another
}
else
{
SetFlag(DF_ASYNC_RUNNING);
PostAsyncTask(GetThreadPtr<CSpace>(), [this]()
{
PrepareThreadLocalStorage(_self);
auto& key = _GetKey();
// Calculate the inputs
{
CSpaceTxn txn;
cxAssert(Enabled(DF_ASYNC_RUNNING));
ReadBarrierDep(); // This cleans this node
}
while(1)
{
// Calculate output from input and key without a CSpace lock, hold output in a local variable
output_type localOutput;
FinalClass::CalcOutput(_self, localOutput, key, input_);
// Apply the output and invalidate out-nodes
{
CSpaceTxn txn;
cxAssert(Enabled(DF_ASYNC_RUNNING));
UpdateOutput(_output, localOutput);
input_ = input_type(); // Clear the input - might allow some memory reclamation
MarkOutNodesAsSoftAndHardDirty();
if (!Enabled(DF_SOFT_DIRTY))
{
// Finished
ClearFlag(DF_ASYNC_RUNNING);
if (TypeHasAdditionalSize<output_type>()) UpdateByteSize();
return;
}
else
{
// This DGS node is dirty indicating that the inputs have changed again, so we need to recalculate again
// Recalculate the input. This cleans this node
ReadBarrierDep();
// continue in the while loop
}
}
}
});
}
}
const output_type& read(Self const* self) const
{
if (!_self)
{
_self = self;
OnInvalidate();
}
ReadBarrierIndep(); // Attach out-nodes
return _output;
}
protected:
mutable Self const* _self;
///////////////////////////// All state below is protected by a CSpace lock
mutable input_type input_;
mutable output_type _output;
};
} // namespace ceda