Index: src/IceUtils.h |
diff --git a/src/IceUtils.h b/src/IceUtils.h |
index dc3a5ff1d77ef160d2b321038f17db3ed64cdbde..7b1ab81bd08b5b6f6a724ff8ea63276829b4097f 100644 |
--- a/src/IceUtils.h |
+++ b/src/IceUtils.h |
@@ -13,9 +13,7 @@ |
#ifndef SUBZERO_SRC_ICEUTILS_H |
#define SUBZERO_SRC_ICEUTILS_H |
- |
#include <climits> |
-#include <condition_variable> |
namespace Ice { |
@@ -63,133 +61,6 @@ public: |
} |
}; |
-// BoundedProducerConsumerQueue is a work queue that allows multiple |
-// producers and multiple consumers. A producer adds entries using |
-// blockingPush(), and may block if the queue is "full". A producer |
-// uses notifyEnd() to indicate that no more entries will be added. A |
-// consumer removes an item using blockingPop(), which will return |
-// nullptr if notifyEnd() has been called and the queue is empty (it |
-// never returns nullptr if the queue contained any items). |
-// |
-// The MaxSize ctor arg controls the maximum size the queue can grow |
-// to (subject to a hard limit of MaxStaticSize-1). The Sequential |
-// arg indicates purely sequential execution in which the single |
-// thread should never wait(). |
-// |
-// Two condition variables are used in the implementation. |
-// GrewOrEnded signals a waiting worker that a producer has changed |
-// the state of the queue. Shrunk signals a blocked producer that a |
-// consumer has changed the state of the queue. |
-// |
-// The methods begin with Sequential-specific code to be most clear. |
-// The lock and condition variables are not used in the Sequential |
-// case. |
-// |
-// Internally, the queue is implemented as a circular array of size |
-// MaxStaticSize, where the queue boundaries are denoted by the Front |
-// and Back fields. Front==Back indicates an empty queue. |
-template <typename T, size_t MaxStaticSize = 128> |
-class BoundedProducerConsumerQueue { |
- BoundedProducerConsumerQueue() = delete; |
- BoundedProducerConsumerQueue(const BoundedProducerConsumerQueue &) = delete; |
- BoundedProducerConsumerQueue & |
- operator=(const BoundedProducerConsumerQueue &) = delete; |
- |
-public: |
- BoundedProducerConsumerQueue(size_t MaxSize, bool Sequential) |
- : Back(0), Front(0), MaxSize(std::min(MaxSize, MaxStaticSize)), |
- Sequential(Sequential), IsEnded(false) {} |
- void blockingPush(T *Item) { |
- { |
- std::unique_lock<GlobalLockType> L(Lock); |
- // If the work queue is already "full", wait for a consumer to |
- // grab an element and shrink the queue. |
- Shrunk.wait(L, [this] { return size() < MaxSize || Sequential; }); |
- push(Item); |
- } |
- GrewOrEnded.notify_one(); |
- } |
- T *blockingPop() { |
- T *Item = nullptr; |
- bool ShouldNotifyProducer = false; |
- { |
- std::unique_lock<GlobalLockType> L(Lock); |
- GrewOrEnded.wait(L, [this] { return IsEnded || !empty() || Sequential; }); |
- if (!empty()) { |
- Item = pop(); |
- ShouldNotifyProducer = !IsEnded; |
- } |
- } |
- if (ShouldNotifyProducer) |
- Shrunk.notify_one(); |
- return Item; |
- } |
- void notifyEnd() { |
- { |
- std::lock_guard<GlobalLockType> L(Lock); |
- IsEnded = true; |
- } |
- GrewOrEnded.notify_all(); |
- } |
- |
-private: |
- const static size_t MaxStaticSizeMask = MaxStaticSize - 1; |
- static_assert(!(MaxStaticSize & (MaxStaticSize - 1)), |
- "MaxStaticSize must be a power of 2"); |
- |
- // WorkItems and Lock are read/written by all. |
- ICE_CACHELINE_BOUNDARY; |
- T *WorkItems[MaxStaticSize]; |
- ICE_CACHELINE_BOUNDARY; |
- // Lock guards access to WorkItems, Front, Back, and IsEnded. |
- GlobalLockType Lock; |
- |
- ICE_CACHELINE_BOUNDARY; |
- // GrewOrEnded is written by the producers and read by the |
- // consumers. It is notified (by the producer) when something is |
- // added to the queue, in case consumers are waiting for a non-empty |
- // queue. |
- std::condition_variable GrewOrEnded; |
- // Back is the index into WorkItems[] of where the next element will |
- // be pushed. (More precisely, Back&MaxStaticSize is the index.) |
- // It is written by the producers, and read by all via size() and |
- // empty(). |
- size_t Back; |
- |
- ICE_CACHELINE_BOUNDARY; |
- // Shrunk is notified (by the consumer) when something is removed |
- // from the queue, in case a producer is waiting for the queue to |
- // drop below maximum capacity. It is written by the consumers and |
- // read by the producers. |
- std::condition_variable Shrunk; |
- // Front is the index into WorkItems[] of the oldest element, |
- // i.e. the next to be popped. (More precisely Front&MaxStaticSize |
- // is the index.) It is written by the consumers, and read by all |
- // via size() and empty(). |
- size_t Front; |
- |
- ICE_CACHELINE_BOUNDARY; |
- |
- // MaxSize and Sequential are read by all and written by none. |
- const size_t MaxSize; |
- const bool Sequential; |
- // IsEnded is read by the consumers, and only written once by the |
- // producer. |
- bool IsEnded; |
- |
- // The lock must be held when the following methods are called. |
- bool empty() const { return Front == Back; } |
- size_t size() const { return Back - Front; } |
- void push(T *Item) { |
- WorkItems[Back++ & MaxStaticSizeMask] = Item; |
- assert(size() <= MaxStaticSize); |
- } |
- T *pop() { |
- assert(!empty()); |
- return WorkItems[Front++ & MaxStaticSizeMask]; |
- } |
-}; |
- |
} // end of namespace Ice |
#endif // SUBZERO_SRC_ICEUTILS_H |