Index: src/IceUtils.h |
diff --git a/src/IceUtils.h b/src/IceUtils.h |
index ffeb792e80617ed7793d4f1d1695b86058fca705..7e66ba7ee350ebd7a611d129c9ca0be84a33fe12 100644 |
--- a/src/IceUtils.h |
+++ b/src/IceUtils.h |
@@ -54,6 +54,111 @@ public: |
} |
}; |
+// BoundedProducerConsumerQueue is a work queue that allows multiple |
+// producers and multiple consumers. The producer adds entries using |
+// blockingPush(), and may block if the queue is "full". The producer |
+// uses end() to indicate that no more entries will be added. The |
+// consumer removes an item using blockingPop(), which will return |
+// nullptr if end() has been called and the queue is empty (it never |
+// returns nullptr if the queue contained any items). |
+// |
+// The MaxSize ctor arg controls the maximum size the queue can grow |
+// to. The Sequential arg indicates purely sequential execution in |
+// which the single thread should never wait(). |
+// |
+// Two condition variables are used in the implementation. |
+// GrewOrEnded signals a waiting worker that the producer has changed |
+// the state of the queue. Shrunk signals a blocked producer that a |
+// consumer has changed the state of the queue. |
+// |
+// The methods begin with Sequential-specific code to be most clear. |
+// The lock and condition variables are not used in the Sequential |
+// case. |
+template <typename T> class BoundedProducerConsumerQueue { |
+ BoundedProducerConsumerQueue() = delete; |
+ BoundedProducerConsumerQueue(const BoundedProducerConsumerQueue &) = delete; |
+ BoundedProducerConsumerQueue & |
+ operator=(const BoundedProducerConsumerQueue &) = delete; |
+ |
+public: |
+ BoundedProducerConsumerQueue(size_t MaxSize, bool Sequential) |
+ : MaxSize(MaxSize), Sequential(Sequential), IsEnded(false) { |
+ // Do WorkQueue.reserve(MaxSize) if the underlying container |
+ // supports it. |
JF
2015/01/26 17:54:51
TODO
Jim Stichnoth
2015/01/27 00:56:18
Implemented a circular buffer instead.
|
+ } |
+ void blockingPush(T *Func) { |
+ if (Sequential) { |
+ WorkQueue.push(Func); |
+ return; |
+ } |
+ { |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ // If the work queue is already "full", wait for a consumer to |
+ // grab an element and shrink the queue. |
+ Shrunk.wait(L, [this] { return WorkQueue.size() < MaxSize; }); |
+ WorkQueue.push(Func); |
+ } |
+ GrewOrEnded.notify_one(); |
+ } |
+ T *blockingPop() { |
+ if (Sequential) { |
+ T *Func = nullptr; |
+ if (!WorkQueue.empty()) { |
+ Func = WorkQueue.front(); |
+ WorkQueue.pop(); |
+ } |
+ return Func; |
+ } |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ GrewOrEnded.wait(L, [this] { return IsEnded || !WorkQueue.empty(); }); |
+ T *Func = nullptr; |
+ if (!WorkQueue.empty()) { |
+ Func = WorkQueue.front(); |
+ WorkQueue.pop(); |
+ L.unlock(); |
+ Shrunk.notify_one(); |
+ } |
+ return Func; |
JF
2015/01/26 18:05:26
How about this:
T *Func = nullptr;
bool ShouldNot
Jim Stichnoth
2015/01/27 00:56:18
Done.
|
+ } |
+ void end() { |
+ if (Sequential) |
+ return; |
+ { |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ IsEnded = true; |
+ } |
+ GrewOrEnded.notify_all(); |
+ } |
+ |
+private: |
+ // WorkQueue and Lock are read/written by all. |
+ // TODO(stichnot): Since WorkQueue has an enforced maximum size, |
+ // implement it on top of something like std::array to minimize |
+ // contention. |
+ alignas(MaxCacheLineSize) std::queue<T *> WorkQueue; |
+ // Lock guards access to WorkQueue and IsEnded. |
+ alignas(MaxCacheLineSize) GlobalLockType Lock; |
+ |
+ // GrewOrEnded is written by the producer and read by the |
+ // consumers. It is notified (by the producer) when something is |
+ // added to the queue, in case consumers are waiting for a |
+ // non-empty queue. |
+ alignas(MaxCacheLineSize) std::condition_variable GrewOrEnded; |
+ |
+ // Shrunk is notified (by the consumer) when something is removed |
+ // from the queue, in case the producer is waiting for the queue |
+ // to drop below maximum capacity. It is written by the consumers |
+ // and read by the producer. |
+ alignas(MaxCacheLineSize) std::condition_variable Shrunk; |
+ |
+ // MaxSize and Sequential are read by all and written by none. |
+ alignas(MaxCacheLineSize) const size_t MaxSize; |
+ const bool Sequential; |
+ // IsEnded is read by the consumers, and only written once by the |
+ // producer. |
+ bool IsEnded; |
+}; |
+ |
} // end of namespace Ice |
#endif // SUBZERO_SRC_ICEUTILS_H |