IThreadPool.h

// IThreadPool.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2008

#pragma once
#ifndef Ceda_cxMessage_IThreadPool_H
#define Ceda_cxMessage_IThreadPool_H

#include "cxMessage.h"
#include "Ceda/cxUtils/BasicTypes.h"
#include "Ceda/cxUtils/MsWindows.h"
#include <functional>

/*
This implementation of a thread pool features a queue for pending tasks to be executed by threads
in the thread pool.  Therefore the function PostTask() used to add new tasks *never* needs to
block (i.e. wait for a thread to become available).

It also features a stack for the sleeping threads, allowing it to efficiently manage the thread
pool itself.

Each task is to be executed by a single thread in the thread pool.

The function to post a task will either give the task directly to a sleeping thread, or 
else queue the task to be done later, if no threads are currently available.  Therefore it always
returns immediately without blocking.

With a limited number of threads,  a large number of tasks will tend to be done in sequence.
This is ideal for tasks that are only useful when they are completed.  Having batched tasks done
sequentually allows the user to see things happen,  and is preferable to doing all the batched 
tasks in parallel such that none are completed until the end.

Clients of the thread pool can only add tasks.  They can't remove a task that has previously 
been added.
*/

namespace ceda
{

///////////////////////////////////////////////////////////////////////////////////////////////////
// IThreadPoolTask

struct IThreadPoolTask
{
    // Execute the given task. Must not throw an exception
    virtual void ExecuteTask() = 0;
};

typedef std::function<void()> ThreadPoolTask;

///////////////////////////////////////////////////////////////////////////////////////////////////
// ITaskExecuter

/*
Associated with a group of related tasks that have been posted to the thread pool, making it easy 
to abort all the tasks or wait until all the tasks have been completed.
*/
struct ITaskExecuter
{
    // A task executer must be closed when it is no longer in use.  This implicitly calls Wait()
    virtual void Close() = 0;

    /*
    Call this function in order to have the given task executed by a worker thread some time in the
    future.
    It is guaranteed that every call to PostTask is paired with a subsequent call to ExecuteTask().
    
    If a thread is available the task will be assigned to the thread immediately.  Otherwise
    the task is pushed onto a FIFO queue. There are no limits on the size of queue.  Therefore
    it can be assumed this function will always return without blocking on task execution.
    The pool of worker threads pop tasks from the queue and execute them.  There is no guarantee
    of the order in which the tasks are executed.  However it can be assumed they will be more
    or less processed in the order they were posted.
    
    This function is fully threadsafe - it is possible for multiple threads to call this function
    concurrently
    */
    virtual void PostTask(IThreadPoolTask* task) = 0;
    virtual void PostTask(ThreadPoolTask task) = 0;

    // Waits until the call to ExecuteTask() has been completed  for every task that's been 
    // posted so far.
    virtual void Wait() = 0;

    // Returns the number of tasks that have been posted but not yet finished executing.
    virtual ssize_t GetNumRemainingTasks() const = 0;
    
protected:
    ~ITaskExecuter() {}
};

///////////////////////////////////////////////////////////////////////////////////////////////////
// IThreadPool

struct IThreadPool
{
    // A thread pool must eventually be closed.  
    // All the task executors created by this thread pool must have been closed before calling 
    // this function.
    // Stops and destroys all threads in the thread pool
    // WARNING: A created thread pool must be deleted before static uninitialisation begins
    virtual void Close() = 0;
    
    virtual ITaskExecuter* CreateTaskExecuter() = 0;
    
    // Get the current number of queued tasks (i.e. tasks that are pending - which is distinct from
    // tasks that are currently being executed)
    virtual ssize_t GetNumQueuedTasks() const = 0;
    
protected:
    ~IThreadPool() {}
};

/*
THREAD_PRIORITY_IDLE                -15
THREAD_PRIORITY_LOWEST              -2
THREAD_PRIORITY_BELOW_NORMAL        -1
THREAD_PRIORITY_NORMAL              0
THREAD_PRIORITY_ABOVE_NORMAL        1
THREAD_PRIORITY_HIGHEST             2
THREAD_PRIORITY_TIME_CRITICAL       15
*/
#ifdef _WIN32
    cxMessage_API IThreadPool* CreateThreadPool(ssize_t numThreads, int nPriority = THREAD_PRIORITY_NORMAL);
#else
    cxMessage_API IThreadPool* CreateThreadPool(ceda::ssize_t numThreads, int nPriority = 0);
#endif

} // namespace ceda

#endif // include guard