ProducerConsumerExchange.h
// ProducerConsumerExchange.h
//
// Author David Barrett-Lennard
// (C)opyright Cedanet Pty Ltd 2010
#pragma once
#ifndef Ceda_cxUtils_ProducerConsumerExchange_H
#define Ceda_cxUtils_ProducerConsumerExchange_H
#include "cxUtils.h"
#include <mutex>
/*
Let there be one or more producers, one or more consumers and a single
object called a Producer Consumer Exchange (PCX).
Each producer owns a single private buffer of some type T. A producer
typically spends much of its time filling its private buffer with data
without holding any lock.
Similarly each consumer owns a private buffer of type T and spends
much of its time emptying its buffer of data without holding any lock.
The PCX also owns a private buffer of type T. The PCX keeps track of
whether its buffer is currently filled or empty. In the single
producer, single consumer case we have 3 buffers - i.e. triple
buffering.
After filling its private buffer with data, a producer calls the
non-blocking function SwapFilledForEmpty() to try to swap its filled buffer
for an empty one. This indicates failure if the buffer in the PCX is
already filled.
Similarly after emptying its private buffer of data, a consumer calls
the non-blocking function SwapEmptyForFilled() to try to swap its empty
buffer for a filled one. This indicates failure if the buffer in the
PCX is already empty.
Since the PCX contributes an extra buffer there is no need to
atomically transfer a full buffer all the way from a producer to a
consumer (nor an empty buffer in the other direction). This for
example avoids the need to have a producer block on a consumer in
order to perform a swap, as can happen with double buffering.
It is assumed there is an efficient no-throw swap function
void swap(T& x, T& y)
which swaps the values of two variables of type T. No assumptions
about thread-safety or atomicity of T are made, and that includes the
swap function.
During operation it is possible for every producer and consumer to
concurrently and independently fill or empty their local buffer,
hopefully without false sharing. Most generally consumers are allowed
to modify their buffers, e.g. they may swap out data by swapping
pointers to avoid heap operations and memcpy. Since we recycle empty
buffers from consumers back to producers we actually have symmetry
(i.e. consumers are producers of empty buffers, and producers are
consumers of empty buffers). This symmetry is apparent in the
implementation of the PCX.
Late starter and early leaver producers and consumers are allowed.
Buffers have simple value semantics and there is no need for elaborate
memory reclamation techniques like lock free reference counting,
hazard pointers etc. Value semantics doesn't actually imply
inefficiency because we never invoke a copy constructor or copy
assignment on T. T could be implemented as a pointer to a heap
allocated object that is freed when T destructs. In such cases an
implementation of swap() will typically only swap the underlying
pointers.
There is a lot of flexibility in what can be done when a producer
tries to swap its filled buffer for an empty one, but it fails. A
producer could be designed to go back to adding yet more data to its
private buffer. It shouldn't be assumed that T necessarily represents
some fixed size buffer that cannot grow. Indeed T may represent a
queue to allow for ordered delivery of items that are passed to a
single consumer in batches, and the batch size self tunes to the
smallest batch size (and hence memory foot print and latency) that is
compatible with the throughput (which can be a function of batch
size). As another example a producer may choose to overwrite its
buffer with a more up to date version because dropping data is
appropriate when consumption cannot keep up with production. This
occurs for example when sending live audio or video over the wire.
Also, it is possible that the data structures created by the producer
aren't ordered temporally at all. For example, a producer that is
batching values to be inserted into an ordered set by the consumer
might throw away repeats and even record a batch of values in sorted
order using a red-black tree because they makes the consumer more
efficient.
*/
namespace ceda
{
template <typename T>
class ProducerConsumerExchange
{
public:
ProducerConsumerExchange() :
filled_(false)
{
}
// Called by a producer to try to swap its filled buffer for an empty one
bool SwapFilledForEmpty(T& p)
{
std::lock_guard<std::mutex> lock(mutex_);
if (!filled_)
{
swap(p,buf_);
return true;
}
else
{
return false;
}
}
// Called by a consumer to try to swap its empty buffer for a filled one
bool SwapEmptyForFilled(T& c)
{
std::lock_guard<std::mutex> lock(mutex_);
if (filled_)
{
swap(c,buf_);
return true;
}
else
{
return false;
}
}
private:
mutable std::mutex mutex_;
bool filled_;
T buf_;
};
} // namespace ceda
#endif // include guard