ProducerConsumerExchange.h

// ProducerConsumerExchange.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2010

#pragma once
#ifndef Ceda_cxUtils_ProducerConsumerExchange_H
#define Ceda_cxUtils_ProducerConsumerExchange_H

#include "cxUtils.h"
#include <mutex>

/*
Let there be one or more producers, one or more consumers and a single 
object called a Producer Consumer Exchange (PCX). 

Each producer owns a single private buffer of some type T.  A producer 
typically spends much of its time filling its private buffer with data 
without holding any lock. 

Similarly each consumer owns a private buffer of type T and spends 
much of its time emptying its buffer of data without holding any lock. 

The PCX also owns a private buffer of type T.   The PCX keeps track of 
whether its buffer is currently filled or empty.  In the single 
producer, single consumer case we have 3 buffers - i.e. triple 
buffering. 

After filling its private buffer with data, a producer calls the 
non-blocking function SwapFilledForEmpty() to try to swap its filled buffer 
for an empty one.  This indicates failure if the buffer in the PCX is 
already filled. 

Similarly after emptying its private buffer of data, a consumer calls 
the non-blocking function SwapEmptyForFilled() to try to swap its empty 
buffer for a filled one.  This indicates failure if the buffer in the 
PCX is already empty. 

Since the PCX contributes an extra buffer there is no need to 
atomically transfer a full buffer all the way from a producer to a 
consumer (nor an empty buffer in the other direction).  This for 
example avoids the need to have a producer block on a consumer in 
order to perform a swap, as can happen with double buffering. 

It is assumed there is an efficient no-throw swap function 

  void swap(T& x, T& y) 


which swaps the values of two variables of type T.   No assumptions 
about thread-safety or atomicity of T are made, and that includes the 
swap function. 

During operation it is possible for every producer and consumer to 
concurrently and independently fill or empty their local buffer, 
hopefully without false sharing.  Most generally consumers are allowed 
to modify their buffers, e.g. they may swap out data by swapping 
pointers to avoid heap operations and memcpy.  Since we recycle empty 
buffers from consumers back to producers we actually have symmetry 
(i.e. consumers are producers of empty buffers, and producers are 
consumers of empty buffers).  This symmetry is apparent in the 
implementation of the PCX. 

Late starter and early leaver producers and consumers are allowed. 
Buffers have simple value semantics and there is no need for elaborate 
memory reclamation techniques like lock free reference counting, 
hazard pointers etc.    Value semantics doesn't actually imply 
inefficiency because we never invoke a copy constructor or copy 
assignment on T.  T could be implemented as a pointer to a heap 
allocated object that is freed when T destructs.  In such cases an 
implementation of swap() will typically only swap the underlying 
pointers. 

There is a lot of flexibility in what can be done when a producer 
tries to swap its filled buffer for an empty one, but it fails.  A 
producer could be designed to go back to adding yet more data to its 
private buffer.  It shouldn't be assumed that T necessarily represents 
some fixed size buffer that cannot grow.  Indeed T may represent a 
queue to allow for ordered delivery of items that are passed to a 
single consumer in batches, and the batch size self tunes to the 
smallest batch size (and hence memory foot print and latency) that is 
compatible with the throughput (which can be a function of batch 
size).  As another example a producer may choose to overwrite its 
buffer with a more up to date version because dropping data is 
appropriate when consumption cannot keep up with production.  This 
occurs for example when sending live audio or video over the wire. 
Also, it is possible that the data structures created by the producer 
aren't ordered temporally at all.  For example, a producer that is 
batching values to be inserted into an ordered set by the consumer 
might throw away repeats and even record a batch of values in sorted 
order using a red-black tree because they makes the consumer more 
efficient.
*/

namespace ceda
{

template <typename T>
class ProducerConsumerExchange
{
public:
    ProducerConsumerExchange() : 
        filled_(false) 
    {
    }

    // Called by a producer to try to swap its filled buffer for an empty one
    bool SwapFilledForEmpty(T& p)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        if (!filled_)
        {
            swap(p,buf_);
            return true;
        }
        else
        {
            return false;
        }
    }

    // Called by a consumer to try to swap its empty buffer for a filled one
    bool SwapEmptyForFilled(T& c)
    {
        std::lock_guard<std::mutex> lock(mutex_);
        if (filled_)
        {
            swap(c,buf_);
            return true;
        }
        else
        {
            return false;
        }
    }

private:
    mutable std::mutex mutex_;
    bool filled_;
    T buf_;
};

} // namespace ceda

#endif // include guard