Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(183)

Unified Diff: src/IceUtils.h

Issue 870653002: Subzero: Initial implementation of multithreaded translation. (Closed) Base URL: https://chromium.googlesource.com/native_client/pnacl-subzero.git@master
Patch Set: clang-format Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: src/IceUtils.h
diff --git a/src/IceUtils.h b/src/IceUtils.h
index ffeb792e80617ed7793d4f1d1695b86058fca705..7e66ba7ee350ebd7a611d129c9ca0be84a33fe12 100644
--- a/src/IceUtils.h
+++ b/src/IceUtils.h
@@ -54,6 +54,111 @@ public:
}
};
+// BoundedProducerConsumerQueue is a work queue that allows multiple
+// producers and multiple consumers. The producer adds entries using
+// blockingPush(), and may block if the queue is "full". The producer
+// uses end() to indicate that no more entries will be added. The
+// consumer removes an item using blockingPop(), which will return
+// nullptr if end() has been called and the queue is empty (it never
+// returns nullptr if the queue contained any items).
+//
+// The MaxSize ctor arg controls the maximum size the queue can grow
+// to. The Sequential arg indicates purely sequential execution in
+// which the single thread should never wait().
+//
+// Two condition variables are used in the implementation.
+// GrewOrEnded signals a waiting worker that the producer has changed
+// the state of the queue. Shrunk signals a blocked producer that a
+// consumer has changed the state of the queue.
+//
+// The methods begin with Sequential-specific code to be most clear.
+// The lock and condition variables are not used in the Sequential
+// case.
+template <typename T> class BoundedProducerConsumerQueue {
+ BoundedProducerConsumerQueue() = delete;
+ BoundedProducerConsumerQueue(const BoundedProducerConsumerQueue &) = delete;
+ BoundedProducerConsumerQueue &
+ operator=(const BoundedProducerConsumerQueue &) = delete;
+
+public:
+ BoundedProducerConsumerQueue(size_t MaxSize, bool Sequential)
+ : MaxSize(MaxSize), Sequential(Sequential), IsEnded(false) {
+ // Do WorkQueue.reserve(MaxSize) if the underlying container
+ // supports it.
JF 2015/01/26 17:54:51 TODO
Jim Stichnoth 2015/01/27 00:56:18 Implemented a circular buffer instead.
+ }
+ void blockingPush(T *Func) {
+ if (Sequential) {
+ WorkQueue.push(Func);
+ return;
+ }
+ {
+ std::unique_lock<GlobalLockType> L(Lock);
+ // If the work queue is already "full", wait for a consumer to
+ // grab an element and shrink the queue.
+ Shrunk.wait(L, [this] { return WorkQueue.size() < MaxSize; });
+ WorkQueue.push(Func);
+ }
+ GrewOrEnded.notify_one();
+ }
+ T *blockingPop() {
+ if (Sequential) {
+ T *Func = nullptr;
+ if (!WorkQueue.empty()) {
+ Func = WorkQueue.front();
+ WorkQueue.pop();
+ }
+ return Func;
+ }
+ std::unique_lock<GlobalLockType> L(Lock);
+ GrewOrEnded.wait(L, [this] { return IsEnded || !WorkQueue.empty(); });
+ T *Func = nullptr;
+ if (!WorkQueue.empty()) {
+ Func = WorkQueue.front();
+ WorkQueue.pop();
+ L.unlock();
+ Shrunk.notify_one();
+ }
+ return Func;
JF 2015/01/26 18:05:26 How about this: T *Func = nullptr; bool ShouldNot
Jim Stichnoth 2015/01/27 00:56:18 Done.
+ }
+ void end() {
+ if (Sequential)
+ return;
+ {
+ std::unique_lock<GlobalLockType> L(Lock);
+ IsEnded = true;
+ }
+ GrewOrEnded.notify_all();
+ }
+
+private:
+ // WorkQueue and Lock are read/written by all.
+ // TODO(stichnot): Since WorkQueue has an enforced maximum size,
+ // implement it on top of something like std::array to minimize
+ // contention.
+ alignas(MaxCacheLineSize) std::queue<T *> WorkQueue;
+ // Lock guards access to WorkQueue and IsEnded.
+ alignas(MaxCacheLineSize) GlobalLockType Lock;
+
+ // GrewOrEnded is written by the producer and read by the
+ // consumers. It is notified (by the producer) when something is
+ // added to the queue, in case consumers are waiting for a
+ // non-empty queue.
+ alignas(MaxCacheLineSize) std::condition_variable GrewOrEnded;
+
+ // Shrunk is notified (by the consumer) when something is removed
+ // from the queue, in case the producer is waiting for the queue
+ // to drop below maximum capacity. It is written by the consumers
+ // and read by the producer.
+ alignas(MaxCacheLineSize) std::condition_variable Shrunk;
+
+ // MaxSize and Sequential are read by all and written by none.
+ alignas(MaxCacheLineSize) const size_t MaxSize;
+ const bool Sequential;
+ // IsEnded is read by the consumers, and only written once by the
+ // producer.
+ bool IsEnded;
+};
+
} // end of namespace Ice
#endif // SUBZERO_SRC_ICEUTILS_H

Powered by Google App Engine
This is Rietveld 408576698