Chromium Code Reviews| Index: src/IceUtils.h |
| diff --git a/src/IceUtils.h b/src/IceUtils.h |
| index ffeb792e80617ed7793d4f1d1695b86058fca705..197527aba9810d237e8bcb27de35ef0315b68897 100644 |
| --- a/src/IceUtils.h |
| +++ b/src/IceUtils.h |
| @@ -54,6 +54,135 @@ public: |
| } |
| }; |
| +// BoundedProducerConsumerQueue is a work queue that allows multiple |
| +// producers and multiple consumers. The producer adds entries using |
|
JF
2015/01/27 01:53:34
You should probably pluralize "producer" everywher
Jim Stichnoth
2015/01/27 05:35:11
Changed to "a producer" / "a consumer".
|
| +// blockingPush(), and may block if the queue is "full". The producer |
| +// uses notifyEnd() to indicate that no more entries will be added. |
| +// The consumer removes an item using blockingPop(), which will return |
| +// nullptr if notifyEnd() has been called and the queue is empty (it |
| +// never returns nullptr if the queue contained any items). |
| +// |
| +// The MaxSize ctor arg controls the maximum size the queue can grow |
| +// to (subject to a hard limit of MaxStaticSize-1). The Sequential |
| +// arg indicates purely sequential execution in which the single |
| +// thread should never wait(). |
| +// |
| +// Two condition variables are used in the implementation. |
| +// GrewOrEnded signals a waiting worker that the producer has changed |
| +// the state of the queue. Shrunk signals a blocked producer that a |
| +// consumer has changed the state of the queue. |
| +// |
| +// The methods begin with Sequential-specific code to be most clear. |
| +// The lock and condition variables are not used in the Sequential |
| +// case. |
| +// |
| +// Internally, the queue is implemented as a circular array of size |
| +// MaxStaticSize, where the queue boundaries are denoted by the Front |
| +// and Back fields. Front==Back indicates an empty queue, and this |
| +// implies that the maximum queue size is actually MaxStaticSize-1. |
| +template <typename T, size_t MaxStaticSize = 128> |
| +class BoundedProducerConsumerQueue { |
| + BoundedProducerConsumerQueue() = delete; |
| + BoundedProducerConsumerQueue(const BoundedProducerConsumerQueue &) = delete; |
| + BoundedProducerConsumerQueue & |
| + operator=(const BoundedProducerConsumerQueue &) = delete; |
| + |
| +public: |
| + BoundedProducerConsumerQueue(size_t MaxSize, bool Sequential) |
| + : Back(0), Front(0), MaxSize(std::min(MaxSize, MaxStaticSize - 1)), |
| + Sequential(Sequential), IsEnded(false) {} |
| + void blockingPush(T *Item) { |
| + { |
| + std::unique_lock<GlobalLockType> L(Lock); |
| + // If the work queue is already "full", wait for a consumer to |
| + // grab an element and shrink the queue. |
| + Shrunk.wait(L, [this] { return size() < MaxSize || Sequential; }); |
| + push(Item); |
| + } |
| + GrewOrEnded.notify_one(); |
| + } |
| + T *blockingPop() { |
| + T *Item = nullptr; |
| + bool ShouldNotifyProducer = false; |
| + { |
| + std::unique_lock<GlobalLockType> L(Lock); |
| + GrewOrEnded.wait(L, [this] { return IsEnded || !empty() || Sequential; }); |
| + if (!empty()) { |
| + Item = pop(); |
| + ShouldNotifyProducer = !IsEnded; |
| + } |
| + } |
| + if (ShouldNotifyProducer) |
| + Shrunk.notify_one(); |
| + return Item; |
| + } |
| + void notifyEnd() { |
| + { |
| + std::lock_guard<GlobalLockType> L(Lock); |
| + IsEnded = true; |
| + } |
| + GrewOrEnded.notify_all(); |
| + } |
| + |
| +private: |
| + const static size_t MaxStaticSizeMask = MaxStaticSize - 1; |
| + static_assert(!(MaxStaticSize & (MaxStaticSize - 1)), |
| + "MaxStaticSize must be a power of 2"); |
| + |
| + // WorkItems and Lock are read/written by all. |
| + ICE_CACHELINE_BOUNDARY; |
| + T *WorkItems[MaxStaticSize]; |
| + ICE_CACHELINE_BOUNDARY; |
| + // Lock guards access to WorkItems, Front, Back, and IsEnded. |
| + GlobalLockType Lock; |
| + |
| + ICE_CACHELINE_BOUNDARY; |
| + // GrewOrEnded is written by the producer and read by the |
| + // consumers. It is notified (by the producer) when something is |
| + // added to the queue, in case consumers are waiting for a |
| + // non-empty queue. |
| + std::condition_variable GrewOrEnded; |
| + // Back is the index into WorkItems[] of where the next element will |
| + // be pushed. |
|
JF
2015/01/27 01:53:33
Written by the producer.
Jim Stichnoth
2015/01/27 05:35:11
Done.
|
| + size_t Back; |
| + |
| + ICE_CACHELINE_BOUNDARY; |
| + // Shrunk is notified (by the consumer) when something is removed |
| + // from the queue, in case the producer is waiting for the queue |
| + // to drop below maximum capacity. It is written by the consumers |
| + // and read by the producer. |
| + std::condition_variable Shrunk; |
| + // Front is the index into WorkItems[] of the oldest element, |
| + // i.e. the next to be popped. |
|
JF
2015/01/27 01:53:33
Written by the consumer.
Jim Stichnoth
2015/01/27 05:35:11
Done.
|
| + size_t Front; |
| + |
| + ICE_CACHELINE_BOUNDARY; |
| + |
| + // MaxSize and Sequential are read by all and written by none. |
| + const size_t MaxSize; |
| + const bool Sequential; |
| + // IsEnded is read by the consumers, and only written once by the |
| + // producer. |
| + bool IsEnded; |
| + |
| + // The lock must be held when the following methods are called. |
| + bool empty() const { return Front == Back; } |
| + size_t size() const { return (Back - Front) & MaxStaticSizeMask; } |
|
JF
2015/01/27 01:53:34
It took me a moment to convince myself that this w
Jim Stichnoth
2015/01/27 05:35:11
Me too, because it's too simple, right?
Actually,
JF
2015/01/27 06:04:36
Oh yeah that's way better!
|
| + void push(T *Item) { |
| + WorkItems[Back] = Item; |
| + Back = (Back + 1) & MaxStaticSizeMask; |
| + // If too many items are pushed, it "rolls over" and appears to be |
| + // empty. |
| + assert(!empty()); |
| + } |
| + T *pop() { |
| + assert(!empty()); |
| + T *Item = WorkItems[Front]; |
| + Front = (Front + 1) & MaxStaticSizeMask; |
| + return Item; |
| + } |
| +}; |
| + |
| } // end of namespace Ice |
| #endif // SUBZERO_SRC_ICEUTILS_H |