CacheMap.h

// CacheMap.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2006

#pragma once
#ifndef Ceda_cxMessage_CacheMap_H
#define Ceda_cxMessage_CacheMap_H

#include "cxMessage.h"
#include "ManualResetEvent.h"
#include "Ceda/cxUtils/CedaAssert.h"
#include <map>
#include <mutex>

namespace ceda
{
template <typename Key, typename Value, typename Calculator>
class CacheMap
{
public:
    CacheMap(Calculator c = Calculator()) : calculator_(c) {}

    void Clear()
    {
        std::lock_guard<std::mutex> lock(_mutex);
        map_.clear();
    }

    bool RemoveCachedValue(const Key& key)
    {
        std::lock_guard<std::mutex> lock(_mutex);
        ssize_t numRemoved = map_.erase(key);
        cxAssert(numRemoved == 0 || numRemoved == 1);
        return numRemoved == 1;
    }

    // Used to poll for whether an entry for the given key is currently present in the map.
    bool IsCached(const Key& key) const
    {
        std::lock_guard<std::mutex> lock(_mutex);
        return map_.find(key) != map_.end();
    }

    // Allows for polling whether an entry in the map is already present.  This function always
    // returns quickly without blocking on a calculation - either directly or indirectly.
    bool GetCachedValue(const Key& key, Value& v) const
    {
        std::lock_guard<std::mutex> lock(_mutex);
        typename MAP::const_iterator i = map_.find(key);
        if (i != map_.end())
        {
            v = i->second;
            return true;
        }
        else
        {
            return false;
        }
    }

    // Try to insert a (key,value) pair.  Returns false if insertion failed because that key
    // is already present.
    bool InsertCachedValue(const Key& key, const Value& value)
    {
        std::lock_guard<std::mutex> lock(_mutex);
        std::pair<typename MAP::iterator,bool> r = map_.insert(MAP::value_type(key,value));
        return r.second;
    }

    void SetCachedValue(const Key& key, const Value& value)
    {
        std::lock_guard<std::mutex> lock(_mutex);
        map_[key] = value;
    }
    
    Value GetValue(const Key& key)
    {
retry:
        EventEntry* e = nullptr;
        bool needToCalc = false;
        {
            std::lock_guard<std::mutex> lock(_mutex);

            typename MAP::const_iterator i = map_.find(key);
            if (i != map_.end())
            {
                // Value is cached in the map
                return i->second;
            }
            else
            {
                typename EVENTMAP::iterator j = eventMap_.find(key);
                if (j == eventMap_.end())
                {
                    // This thread is responsible for calculating the value
                    needToCalc = true;
                    e = &eventMap_[key];
                    cxAssert(e->status_ == NOT_CALCULATED);
                    e->status_ = CALCULATING;
                }
                else
                {
                    // Another thread has already taken responsibility for calculating the value
                    e = &j->second;

                    if (e->status_ == NOT_CALCULATED)
                    {
                        // The thread that had tried to calculate the value must have thrown an 
                        // exception.  Therefore make this thread responsible for calculating the
                        // value
                        needToCalc = true;
                        e->status_ = CALCULATING;
                        cxAssert(e->count_ > 0);
                        cxAssert(e->event_);
                        e->event_->Reset();
                    }
                    else if (e->status_ == CALCULATED)
                    {
                        // This is only possible if entries have been removed from the main map
                        // or AllowInsertionInMap() can return false.
                        return e->value_;
                    }
                    else
                    {
                        cxAssert(e->status_ == CALCULATING);
                        if (e->count_++ == 0)
                        {
                            cxAssert( e->event_ == nullptr );
                            e->event_ = new ManualResetEvent;
                        }
                    }
                    cxAssert( e->event_ );
                }
            }
        }

        cxAssert(e);
        if (needToCalc)
        {
            Value v;

            try
            {
                cxAssert(e->status_ == CALCULATING);
                v = calculator_(key);
            }
            catch(...)              // todo: find a better solution, catching with ... and rethrowing stops the visual studio debugger stopping at the point where an access violation occurred etc
            {
                std::lock_guard<std::mutex> lock(_mutex);
                if (e->count_ == 0)
                {
                    cxAssert( e->event_ == nullptr );
                    eventMap_.erase(key);
                }
                else
                {
                    // Calculation failed.  Need to indicate failure to other threads and signal
                    // them
                    cxAssert( e->event_ );
                    e->status_ = NOT_CALCULATED;
                    e->event_->Signal();
                }
                throw;
            }

            std::lock_guard<std::mutex> lock(_mutex);
            
            if (calculator_.AllowInsertionInMap(v))
            {
                map_[key] = v;
            }
            
            if (e->count_ == 0)
            {
                cxAssert( e->event_ == nullptr );
                eventMap_.erase(key);
            }
            else
            {
                cxAssert( e->event_ );
                e->value_ = v;
                e->status_ = CALCULATED;
                e->event_->Signal();
            }

            return v;
        }
        else
        {
            cxAssert( e->event_ );
            e->event_->Wait();

            std::lock_guard<std::mutex> lock(_mutex);

            if (e->status_ == CALCULATED)
            {
                Value v = e->value_;
                if (--e->count_ == 0)
                {
                    delete e->event_;
                    eventMap_.erase(key);
                }
                return v;
            }
            else
            {
                if (--e->count_ == 0)
                {
                    delete e->event_;
                    e->event_ = nullptr;
                    if (e->status_ == NOT_CALCULATED)
                    {
                        eventMap_.erase(key);
                    }
                }
                goto retry;
            }
        }
    }
    
    void Transfer(CacheMap& src)
    {
        std::lock_guard<std::mutex> lock1(_mutex);
        std::lock_guard<std::mutex> lock2(src._mutex);
        
        cxAssert(src.eventMap_.empty());
        
        for (typename MAP::iterator i = src.map_.begin() ; i != src.map_.end() ; ++i)
        {
            cxAssert(map_.find(i->first) == map_.end());
            map_[i->first] = i->second;
        }
        src.map_.clear();
    }
    
    Calculator& GetCalculator() { return calculator_; }

protected:
    mutable std::mutex _mutex;

    Calculator calculator_;
    
    typedef std::map<Key,Value> MAP;
    MAP map_;

    enum EStatus
    {
        NOT_CALCULATED,
        CALCULATING,
        CALCULATED
    };
    
    struct EventEntry
    {
        EventEntry() : status_(NOT_CALCULATED), count_(0), event_(nullptr) {}
    
        EStatus status_;
        int count_;
        ManualResetEvent* event_;
        Value value_;
    };
    friend struct EventEntry;
    typedef std::map<Key, EventEntry> EVENTMAP;
    EVENTMAP eventMap_;
};

} // namespace ceda

#endif // include guard