Index: src/IceUtils.h |
diff --git a/src/IceUtils.h b/src/IceUtils.h |
index ffeb792e80617ed7793d4f1d1695b86058fca705..197527aba9810d237e8bcb27de35ef0315b68897 100644 |
--- a/src/IceUtils.h |
+++ b/src/IceUtils.h |
@@ -54,6 +54,135 @@ public: |
} |
}; |
+// BoundedProducerConsumerQueue is a work queue that allows multiple |
+// producers and multiple consumers. The producer adds entries using |
JF
2015/01/27 01:53:34
You should probably pluralize "producer" everywher
Jim Stichnoth
2015/01/27 05:35:11
Changed to "a producer" / "a consumer".
|
+// blockingPush(), and may block if the queue is "full". The producer |
+// uses notifyEnd() to indicate that no more entries will be added. |
+// The consumer removes an item using blockingPop(), which will return |
+// nullptr if notifyEnd() has been called and the queue is empty (it |
+// never returns nullptr if the queue contained any items). |
+// |
+// The MaxSize ctor arg controls the maximum size the queue can grow |
+// to (subject to a hard limit of MaxStaticSize-1). The Sequential |
+// arg indicates purely sequential execution in which the single |
+// thread should never wait(). |
+// |
+// Two condition variables are used in the implementation. |
+// GrewOrEnded signals a waiting worker that the producer has changed |
+// the state of the queue. Shrunk signals a blocked producer that a |
+// consumer has changed the state of the queue. |
+// |
+// The methods begin with Sequential-specific code to be most clear. |
+// The lock and condition variables are not used in the Sequential |
+// case. |
+// |
+// Internally, the queue is implemented as a circular array of size |
+// MaxStaticSize, where the queue boundaries are denoted by the Front |
+// and Back fields. Front==Back indicates an empty queue, and this |
+// implies that the maximum queue size is actually MaxStaticSize-1. |
+template <typename T, size_t MaxStaticSize = 128> |
+class BoundedProducerConsumerQueue { |
+ BoundedProducerConsumerQueue() = delete; |
+ BoundedProducerConsumerQueue(const BoundedProducerConsumerQueue &) = delete; |
+ BoundedProducerConsumerQueue & |
+ operator=(const BoundedProducerConsumerQueue &) = delete; |
+ |
+public: |
+ BoundedProducerConsumerQueue(size_t MaxSize, bool Sequential) |
+ : Back(0), Front(0), MaxSize(std::min(MaxSize, MaxStaticSize - 1)), |
+ Sequential(Sequential), IsEnded(false) {} |
+ void blockingPush(T *Item) { |
+ { |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ // If the work queue is already "full", wait for a consumer to |
+ // grab an element and shrink the queue. |
+ Shrunk.wait(L, [this] { return size() < MaxSize || Sequential; }); |
+ push(Item); |
+ } |
+ GrewOrEnded.notify_one(); |
+ } |
+ T *blockingPop() { |
+ T *Item = nullptr; |
+ bool ShouldNotifyProducer = false; |
+ { |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ GrewOrEnded.wait(L, [this] { return IsEnded || !empty() || Sequential; }); |
+ if (!empty()) { |
+ Item = pop(); |
+ ShouldNotifyProducer = !IsEnded; |
+ } |
+ } |
+ if (ShouldNotifyProducer) |
+ Shrunk.notify_one(); |
+ return Item; |
+ } |
+ void notifyEnd() { |
+ { |
+ std::lock_guard<GlobalLockType> L(Lock); |
+ IsEnded = true; |
+ } |
+ GrewOrEnded.notify_all(); |
+ } |
+ |
+private: |
+ const static size_t MaxStaticSizeMask = MaxStaticSize - 1; |
+ static_assert(!(MaxStaticSize & (MaxStaticSize - 1)), |
+ "MaxStaticSize must be a power of 2"); |
+ |
+ // WorkItems and Lock are read/written by all. |
+ ICE_CACHELINE_BOUNDARY; |
+ T *WorkItems[MaxStaticSize]; |
+ ICE_CACHELINE_BOUNDARY; |
+ // Lock guards access to WorkItems, Front, Back, and IsEnded. |
+ GlobalLockType Lock; |
+ |
+ ICE_CACHELINE_BOUNDARY; |
+ // GrewOrEnded is written by the producer and read by the |
+ // consumers. It is notified (by the producer) when something is |
+ // added to the queue, in case consumers are waiting for a |
+ // non-empty queue. |
+ std::condition_variable GrewOrEnded; |
+ // Back is the index into WorkItems[] of where the next element will |
+ // be pushed. |
JF
2015/01/27 01:53:33
Written by the producer.
Jim Stichnoth
2015/01/27 05:35:11
Done.
|
+ size_t Back; |
+ |
+ ICE_CACHELINE_BOUNDARY; |
+ // Shrunk is notified (by the consumer) when something is removed |
+ // from the queue, in case the producer is waiting for the queue |
+ // to drop below maximum capacity. It is written by the consumers |
+ // and read by the producer. |
+ std::condition_variable Shrunk; |
+ // Front is the index into WorkItems[] of the oldest element, |
+ // i.e. the next to be popped. |
JF
2015/01/27 01:53:33
Written by the consumer.
Jim Stichnoth
2015/01/27 05:35:11
Done.
|
+ size_t Front; |
+ |
+ ICE_CACHELINE_BOUNDARY; |
+ |
+ // MaxSize and Sequential are read by all and written by none. |
+ const size_t MaxSize; |
+ const bool Sequential; |
+ // IsEnded is read by the consumers, and only written once by the |
+ // producer. |
+ bool IsEnded; |
+ |
+ // The lock must be held when the following methods are called. |
+ bool empty() const { return Front == Back; } |
+ size_t size() const { return (Back - Front) & MaxStaticSizeMask; } |
JF
2015/01/27 01:53:34
It took me a moment to convince myself that this w
Jim Stichnoth
2015/01/27 05:35:11
Me too, because it's too simple, right?
Actually,
JF
2015/01/27 06:04:36
Oh yeah that's way better!
|
+ void push(T *Item) { |
+ WorkItems[Back] = Item; |
+ Back = (Back + 1) & MaxStaticSizeMask; |
+ // If too many items are pushed, it "rolls over" and appears to be |
+ // empty. |
+ assert(!empty()); |
+ } |
+ T *pop() { |
+ assert(!empty()); |
+ T *Item = WorkItems[Front]; |
+ Front = (Front + 1) & MaxStaticSizeMask; |
+ return Item; |
+ } |
+}; |
+ |
} // end of namespace Ice |
#endif // SUBZERO_SRC_ICEUTILS_H |