ThreadPoolMixin.h

// ThreadPoolMixin.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2008

#pragma once
@import "Ceda/cxObject/Object.h"
@import "Ceda/cxObject/InnerClasses.h"
#include "Ceda/cxMessage/AutoResetEvent.h"
#include "Ceda/cxMessage/IThreadPool.h"

namespace ceda
{

///////////////////////////////////////////////////////////////////////////////////////////////////
// ThreadPoolMixin

$mixin ThreadPoolMixin
{
    $$() : threadPool_(nullptr) {}

    ~$$()
    {
        // Failure indicates need to call StopThreadPool()
        cxAssert(!threadPool_);
    }

    void StartThreadPool(ssize_t numThreads)
    {
        cxAssert(!threadPool_);
        threadPool_ = CreateThreadPool(numThreads);
    }

    void StopThreadPool()
    {
        cxAssert(threadPool_);
        threadPool_->Close();
        threadPool_ = nullptr;
    }
    
    void PostThreadPoolTask(IThreadPoolTask* task)
    {
         threadPool_->PostTask(task);
    }
    
    IThreadPool* GetThreadPool()
    { 
        return threadPool_; 
    }

private:
    IThreadPool* threadPool_;
};

///////////////////////////////////////////////////////////////////////////////////////////////////
// TaskExecuterMixin

$mixin TaskExecuterMixin
{
    $$() : executer_(nullptr) {}

    ~$$()
    {
        // Failure indicates need to call StopTaskExecuter()
        cxAssert(!executer_);
    }

    void StartTaskExecuter()
    {
        cxAssert(!executer_);
        executer_ = GetThreadPool()->CreateTaskExecuter();
    }

    void StopTaskExecuter()
    {
        cxAssert(executer_);
        executer_->Close();
        executer_ = nullptr;
    }
    
    void PostThreadPoolTask(IThreadPoolTask* task)
    {
         executer_->PostTask(task);
    }
    
    ITaskExecuter* GetTaskExecuter()
    { 
        return executer_;
    }

private:
    ITaskExecuter* executer_;
};


@def mBindTaskDelegator(member,fn) =
{
    struct Task : public ceda::IThreadPoolTask
    {
        mDefineOuterClassDelegator($$,member);
        
        virtual void ExecuteTask()
        {
            OuterClass().$$::fn();
        }
    } member;
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// IThreadPoolTask_delegator
/*
template<typename T>
struct IThreadPoolTask_delegator : public IThreadPoolTask
{
    IThreadPoolTask_delegator(T* delegator) : m_delegator(delegator) {}
    virtual void ExecuteTask() { m_delegate->ExecuteTask(); }
    T* m_delegate;
};
*/

///////////////////////////////////////////////////////////////////////////////////////////////////
// AddThreadPoolTaskMixin

/*
Allows for implementation of IThreadPoolTask according to implementations of the same named
methods at the current point in the mixin chain.

Can be used to allow a single class to implement IThreadPoolTask more than once (for
different purposes) during the mixin chain, and implement a single method 
PostAllThreadPoolTasks() that posts all the tasks to a given task executer.
*/

$mixin NullThreadPoolTaskMixin
{
    enum { s_numThreadPoolTasks = 0 };
    
    void PostAllThreadPoolTasks(ITaskExecuter* te) {}
};

$mixin AddThreadPoolTaskMixin
{
    enum { s_numThreadPoolTasks = BaseClass::s_numThreadPoolTasks + 1 };
    
    // Recurse all through the mixin chain, posting all the tasks to the given thread pool
    void PostAllThreadPoolTasks(ITaskExecuter* te)
    {
        BaseClass::PostAllThreadPoolTasks(te);

        // Post this task to the given thread pool
        te->PostTask(&task_);            
    }

private:
    mBindTaskDelegator(task_,ExecuteTask)
};

/*
///////////////////////////////////////////////////////////////////////////////////////////////////
// DeclareThreadPSpace_ExecuteTask_Mixin

$mixin DeclareThreadPSpace_ExecuteTask_Mixin
{
    void ExecuteTask()
    {
        DeclareThreadPSpace dtps(GetAssociatedPSpace());
        BaseClass::ExecuteTask();
    }
};
*/

///////////////////////////////////////////////////////////////////////////////////////////////////
// NullThreadTaskMixin

$mixin NullThreadTaskMixin
{
    void ExecuteTask() {}
};

///////////////////////////////////////////////////////////////////////////////////////////////////
// TraceThreadTaskMixin

$mixin TraceThreadTaskMixin
{
    void ExecuteTask()
    {
        if (EnableTrace())
        {
            Tracer() << ClassName() << "::ExecuteTask()\n";
        }
        BaseClass::ExecuteTask();
    }
};

///////////////////////////////////////////////////////////////////////////////////////////////////
// SetThreadIndentMixin

@def mDefineThreadIndentMixin(indent) =
{
    {
        void ExecuteTask()
        {
            ceda::TraceIndenter indenter(indent);
            BaseClass::ExecuteTask();
        }
    }
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// WakeableThreadMixin

$mixin WakeableThreadMixin
{
    $$() : m_stopThread(false)
    {
    }
    void ExecuteTask()
    {
        while(!m_stopThread)
        {
            m_wakeThread.Wait();
            if (m_stopThread) break;

            BaseClass::ExecuteTask();
        }
    }
    void Abort()
    {
        m_stopThread = true;
        WakeThread();
    }
    void WakeThread()
    {
        m_wakeThread.Signal();
    }
    bool CheckStopThread() const { return m_stopThread; }

private:
    volatile bool m_stopThread;
    
    // Signalled to wake up the thread
    AutoResetEvent m_wakeThread;
};

} // namespace ceda