| Index: src/IceUtils.h
|
| diff --git a/src/IceUtils.h b/src/IceUtils.h
|
| index ffeb792e80617ed7793d4f1d1695b86058fca705..493661c873e5c1de58eacc9c7deb04f69af158aa 100644
|
| --- a/src/IceUtils.h
|
| +++ b/src/IceUtils.h
|
| @@ -54,6 +54,133 @@ public:
|
| }
|
| };
|
|
|
| +// BoundedProducerConsumerQueue is a work queue that allows multiple
|
| +// producers and multiple consumers. A producer adds entries using
|
| +// blockingPush(), and may block if the queue is "full". A producer
|
| +// uses notifyEnd() to indicate that no more entries will be added. A
|
| +// consumer removes an item using blockingPop(), which will return
|
| +// nullptr if notifyEnd() has been called and the queue is empty (it
|
| +// never returns nullptr if the queue contained any items).
|
| +//
|
| +// The MaxSize ctor arg controls the maximum size the queue can grow
|
| +// to (subject to a hard limit of MaxStaticSize-1). The Sequential
|
| +// arg indicates purely sequential execution in which the single
|
| +// thread should never wait().
|
| +//
|
| +// Two condition variables are used in the implementation.
|
| +// GrewOrEnded signals a waiting worker that a producer has changed
|
| +// the state of the queue. Shrunk signals a blocked producer that a
|
| +// consumer has changed the state of the queue.
|
| +//
|
| +// The methods begin with Sequential-specific code to be most clear.
|
| +// The lock and condition variables are not used in the Sequential
|
| +// case.
|
| +//
|
| +// Internally, the queue is implemented as a circular array of size
|
| +// MaxStaticSize, where the queue boundaries are denoted by the Front
|
| +// and Back fields. Front==Back indicates an empty queue.
|
| +template <typename T, size_t MaxStaticSize = 128>
|
| +class BoundedProducerConsumerQueue {
|
| + BoundedProducerConsumerQueue() = delete;
|
| + BoundedProducerConsumerQueue(const BoundedProducerConsumerQueue &) = delete;
|
| + BoundedProducerConsumerQueue &
|
| + operator=(const BoundedProducerConsumerQueue &) = delete;
|
| +
|
| +public:
|
| + BoundedProducerConsumerQueue(size_t MaxSize, bool Sequential)
|
| + : Back(0), Front(0), MaxSize(std::min(MaxSize, MaxStaticSize)),
|
| + Sequential(Sequential), IsEnded(false) {}
|
| + void blockingPush(T *Item) {
|
| + {
|
| + std::unique_lock<GlobalLockType> L(Lock);
|
| + // If the work queue is already "full", wait for a consumer to
|
| + // grab an element and shrink the queue.
|
| + Shrunk.wait(L, [this] { return size() < MaxSize || Sequential; });
|
| + push(Item);
|
| + }
|
| + GrewOrEnded.notify_one();
|
| + }
|
| + T *blockingPop() {
|
| + T *Item = nullptr;
|
| + bool ShouldNotifyProducer = false;
|
| + {
|
| + std::unique_lock<GlobalLockType> L(Lock);
|
| + GrewOrEnded.wait(L, [this] { return IsEnded || !empty() || Sequential; });
|
| + if (!empty()) {
|
| + Item = pop();
|
| + ShouldNotifyProducer = !IsEnded;
|
| + }
|
| + }
|
| + if (ShouldNotifyProducer)
|
| + Shrunk.notify_one();
|
| + return Item;
|
| + }
|
| + void notifyEnd() {
|
| + {
|
| + std::lock_guard<GlobalLockType> L(Lock);
|
| + IsEnded = true;
|
| + }
|
| + GrewOrEnded.notify_all();
|
| + }
|
| +
|
| +private:
|
| + const static size_t MaxStaticSizeMask = MaxStaticSize - 1;
|
| + static_assert(!(MaxStaticSize & (MaxStaticSize - 1)),
|
| + "MaxStaticSize must be a power of 2");
|
| +
|
| + // WorkItems and Lock are read/written by all.
|
| + ICE_CACHELINE_BOUNDARY;
|
| + T *WorkItems[MaxStaticSize];
|
| + ICE_CACHELINE_BOUNDARY;
|
| + // Lock guards access to WorkItems, Front, Back, and IsEnded.
|
| + GlobalLockType Lock;
|
| +
|
| + ICE_CACHELINE_BOUNDARY;
|
| + // GrewOrEnded is written by the producers and read by the
|
| + // consumers. It is notified (by the producer) when something is
|
| + // added to the queue, in case consumers are waiting for a non-empty
|
| + // queue.
|
| + std::condition_variable GrewOrEnded;
|
| + // Back is the index into WorkItems[] of where the next element will
|
| + // be pushed. (More precisely, Back&MaxStaticSize is the index.)
|
| + // It is written by the producers, and read by all via size() and
|
| + // empty().
|
| + size_t Back;
|
| +
|
| + ICE_CACHELINE_BOUNDARY;
|
| + // Shrunk is notified (by the consumer) when something is removed
|
| + // from the queue, in case a producer is waiting for the queue to
|
| + // drop below maximum capacity. It is written by the consumers and
|
| + // read by the producers.
|
| + std::condition_variable Shrunk;
|
| + // Front is the index into WorkItems[] of the oldest element,
|
| + // i.e. the next to be popped. (More precisely Front&MaxStaticSize
|
| + // is the index.) It is written by the consumers, and read by all
|
| + // via size() and empty().
|
| + size_t Front;
|
| +
|
| + ICE_CACHELINE_BOUNDARY;
|
| +
|
| + // MaxSize and Sequential are read by all and written by none.
|
| + const size_t MaxSize;
|
| + const bool Sequential;
|
| + // IsEnded is read by the consumers, and only written once by the
|
| + // producer.
|
| + bool IsEnded;
|
| +
|
| + // The lock must be held when the following methods are called.
|
| + bool empty() const { return Front == Back; }
|
| + size_t size() const { return Back - Front; }
|
| + void push(T *Item) {
|
| + WorkItems[Back++ & MaxStaticSizeMask] = Item;
|
| + assert(size() <= MaxStaticSize);
|
| + }
|
| + T *pop() {
|
| + assert(!empty());
|
| + return WorkItems[Front++ & MaxStaticSizeMask];
|
| + }
|
| +};
|
| +
|
| } // end of namespace Ice
|
|
|
| #endif // SUBZERO_SRC_ICEUTILS_H
|
|
|