Chromium Code Reviews| Index: src/IceUtils.h |
| diff --git a/src/IceUtils.h b/src/IceUtils.h |
| index ffeb792e80617ed7793d4f1d1695b86058fca705..7e66ba7ee350ebd7a611d129c9ca0be84a33fe12 100644 |
| --- a/src/IceUtils.h |
| +++ b/src/IceUtils.h |
| @@ -54,6 +54,111 @@ public: |
| } |
| }; |
| +// BoundedProducerConsumerQueue is a work queue that allows multiple |
| +// producers and multiple consumers. The producer adds entries using |
| +// blockingPush(), and may block if the queue is "full". The producer |
| +// uses end() to indicate that no more entries will be added. The |
| +// consumer removes an item using blockingPop(), which will return |
| +// nullptr if end() has been called and the queue is empty (it never |
| +// returns nullptr if the queue contained any items). |
| +// |
| +// The MaxSize ctor arg controls the maximum size the queue can grow |
| +// to. The Sequential arg indicates purely sequential execution in |
| +// which the single thread should never wait(). |
| +// |
| +// Two condition variables are used in the implementation. |
| +// GrewOrEnded signals a waiting worker that the producer has changed |
| +// the state of the queue. Shrunk signals a blocked producer that a |
| +// consumer has changed the state of the queue. |
| +// |
| +// The methods begin with Sequential-specific code to be most clear. |
| +// The lock and condition variables are not used in the Sequential |
| +// case. |
| +template <typename T> class BoundedProducerConsumerQueue { |
| + BoundedProducerConsumerQueue() = delete; |
| + BoundedProducerConsumerQueue(const BoundedProducerConsumerQueue &) = delete; |
| + BoundedProducerConsumerQueue & |
| + operator=(const BoundedProducerConsumerQueue &) = delete; |
| + |
| +public: |
| + BoundedProducerConsumerQueue(size_t MaxSize, bool Sequential) |
| + : MaxSize(MaxSize), Sequential(Sequential), IsEnded(false) { |
| + // Do WorkQueue.reserve(MaxSize) if the underlying container |
| + // supports it. |
|
JF
2015/01/26 17:54:51
TODO
Jim Stichnoth
2015/01/27 00:56:18
Implemented a circular buffer instead.
|
| + } |
| + void blockingPush(T *Func) { |
| + if (Sequential) { |
| + WorkQueue.push(Func); |
| + return; |
| + } |
| + { |
| + std::unique_lock<GlobalLockType> L(Lock); |
| + // If the work queue is already "full", wait for a consumer to |
| + // grab an element and shrink the queue. |
| + Shrunk.wait(L, [this] { return WorkQueue.size() < MaxSize; }); |
| + WorkQueue.push(Func); |
| + } |
| + GrewOrEnded.notify_one(); |
| + } |
| + T *blockingPop() { |
| + if (Sequential) { |
| + T *Func = nullptr; |
| + if (!WorkQueue.empty()) { |
| + Func = WorkQueue.front(); |
| + WorkQueue.pop(); |
| + } |
| + return Func; |
| + } |
| + std::unique_lock<GlobalLockType> L(Lock); |
| + GrewOrEnded.wait(L, [this] { return IsEnded || !WorkQueue.empty(); }); |
| + T *Func = nullptr; |
| + if (!WorkQueue.empty()) { |
| + Func = WorkQueue.front(); |
| + WorkQueue.pop(); |
| + L.unlock(); |
| + Shrunk.notify_one(); |
| + } |
| + return Func; |
|
JF
2015/01/26 18:05:26
How about this:
T *Func = nullptr;
bool ShouldNot
Jim Stichnoth
2015/01/27 00:56:18
Done.
|
| + } |
| + void end() { |
| + if (Sequential) |
| + return; |
| + { |
| + std::unique_lock<GlobalLockType> L(Lock); |
| + IsEnded = true; |
| + } |
| + GrewOrEnded.notify_all(); |
| + } |
| + |
| +private: |
| + // WorkQueue and Lock are read/written by all. |
| + // TODO(stichnot): Since WorkQueue has an enforced maximum size, |
| + // implement it on top of something like std::array to minimize |
| + // contention. |
| + alignas(MaxCacheLineSize) std::queue<T *> WorkQueue; |
| + // Lock guards access to WorkQueue and IsEnded. |
| + alignas(MaxCacheLineSize) GlobalLockType Lock; |
| + |
| + // GrewOrEnded is written by the producer and read by the |
| + // consumers. It is notified (by the producer) when something is |
| + // added to the queue, in case consumers are waiting for a |
| + // non-empty queue. |
| + alignas(MaxCacheLineSize) std::condition_variable GrewOrEnded; |
| + |
| + // Shrunk is notified (by the consumer) when something is removed |
| + // from the queue, in case the producer is waiting for the queue |
| + // to drop below maximum capacity. It is written by the consumers |
| + // and read by the producer. |
| + alignas(MaxCacheLineSize) std::condition_variable Shrunk; |
| + |
| + // MaxSize and Sequential are read by all and written by none. |
| + alignas(MaxCacheLineSize) const size_t MaxSize; |
| + const bool Sequential; |
| + // IsEnded is read by the consumers, and only written once by the |
| + // producer. |
| + bool IsEnded; |
| +}; |
| + |
| } // end of namespace Ice |
| #endif // SUBZERO_SRC_ICEUTILS_H |