Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(17)

Unified Diff: src/IceGlobalContext.h

Issue 870653002: Subzero: Initial implementation of multithreaded translation. (Closed) Base URL: https://chromium.googlesource.com/native_client/pnacl-subzero.git@master
Patch Set: Make CfgQueue::Sequential logic more clear. Move IsEnded field. Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: src/IceGlobalContext.h
diff --git a/src/IceGlobalContext.h b/src/IceGlobalContext.h
index 1f0a600d485fa8dcba7d3c5915d0bdc936e0428d..a0f088a4f3f3f0766d5a6ed154ee0e0783ec4981 100644
--- a/src/IceGlobalContext.h
+++ b/src/IceGlobalContext.h
@@ -15,8 +15,9 @@
#ifndef SUBZERO_SRC_ICEGLOBALCONTEXT_H
#define SUBZERO_SRC_ICEGLOBALCONTEXT_H
-#include <memory>
#include <mutex>
+#include <queue>
+#include <thread>
#include "IceDefs.h"
#include "IceClFlags.h"
@@ -96,20 +97,112 @@ class GlobalContext {
std::vector<TimerStack> Timers;
};
+ // CfgQueue is the translation work queue. It allows multiple
+ // producers and multiple consumers (though currently only a single
+ // producer is used). The producer adds entries using add(), and
+ // may block if the queue is "full" to control Cfg memory footprint.
+ // The producer uses end() to indicate that no more entries will be
+ // added. The consumer removes an item using get(), which will
+ // return nullptr if end() has been called and the queue is empty.
+ //
+ // The MaxSize ctor arg controls the maximum size the queue can grow
+ // to. The Sequential arg indicates purely sequential execution in
+ // which the single thread should never wait().
+ //
+ // Two condition variables are used in the implementation.
+ // GrewOrEnded signals waiting workers that the producer has changed
+ // the state of the queue. Shrunk signals a blocked producer that a
+ // consumer has changed the state of the queue.
+ //
+ // The methods begin with Sequential-specific code to be most clear.
+ // The lock and condition variables are not used in the Sequential
+ // case.
+ class CfgQueue {
JF 2015/01/25 22:57:00 It's probably better to put this class in its own
Jim Stichnoth 2015/01/26 04:59:43 Done.
+ public:
+ CfgQueue(size_t MaxSize, bool Sequential)
+ : MaxSize(MaxSize), Sequential(Sequential), IsEnded(false) {}
JF 2015/01/25 22:57:00 WorkQueue.reserve(MaxSize)
Jim Stichnoth 2015/01/26 04:59:43 There's no std::queue::reserve() or std::deque::re
JF 2015/01/26 17:54:50 :( I'm still not a fan of using queue, the defaul
Jim Stichnoth 2015/01/27 00:56:18 Done.
+ void add(Cfg *Func) {
JF 2015/01/25 22:57:00 I'd rename to blocking_push or something similar,
Jim Stichnoth 2015/01/26 04:59:43 Done. add() --> blockingPush(), and get() --> blo
+ if (Sequential) {
+ WorkQueue.push(Func);
+ return;
+ }
+ std::unique_lock<GlobalLockType> L(Lock);
+ // If the work queue is already "full", wait for a consumer to
+ // grab an element and shrink the queue.
+ while (WorkQueue.size() >= MaxSize) {
+ Shrunk.wait(L);
+ }
JF 2015/01/25 22:57:00 This code: while (WorkQueue.size() >= MaxSize) {
Jim Stichnoth 2015/01/26 04:59:43 Wow. Done. (my first ever C++ lambda)
+ WorkQueue.push(Func);
+ L.unlock();
+ GrewOrEnded.notify_one();
+ }
+ Cfg *get() {
JF 2015/01/25 22:57:00 I'd rename to wait_and_pop or something similar.
Jim Stichnoth 2015/01/26 04:59:43 Done.
+ if (Sequential) {
+ Cfg *Func = nullptr;
+ if (!WorkQueue.empty()) {
+ Func = WorkQueue.front();
+ WorkQueue.pop();
+ }
+ return Func;
+ }
+ std::unique_lock<GlobalLockType> L(Lock);
+ while (!IsEnded || !WorkQueue.empty()) {
JF 2015/01/25 22:57:00 Similarly here, I'd go with: Cfg *wait_and_pop()
Jim Stichnoth 2015/01/26 04:59:43 Yeah, your rewrite could have the workers shut dow
JF 2015/01/26 17:54:50 I'm in for killing Sequential, it'll make the non-
Jim Stichnoth 2015/01/27 00:56:18 Done.
+ if (!WorkQueue.empty()) {
+ Cfg *Func = WorkQueue.front();
+ WorkQueue.pop();
+ L.unlock();
+ Shrunk.notify_one();
+ return Func;
+ }
+ GrewOrEnded.wait(L);
+ }
+ return nullptr;
+ }
+ void end() {
+ if (Sequential)
+ return;
+ std::unique_lock<GlobalLockType> L(Lock);
+ IsEnded = true;
+ L.unlock();
JF 2015/01/25 22:57:00 Could you just use a lock_guard and scoping here?
Jim Stichnoth 2015/01/26 04:59:43 I thought about this for all 3 methods, but since
JF 2015/01/26 18:10:40 As discussed offline, lock_guard is simpler than u
Jim Stichnoth 2015/01/27 00:56:18 Done.
+ GrewOrEnded.notify_all();
+ }
+
+ private:
JF 2015/01/25 22:57:00 CfGQueue() = delete; CfgQueue(const CfgQueue &) =
Jim Stichnoth 2015/01/26 04:59:43 <shamecube> Done.
+ // WorkQueue and Lock are read/written by all.
+ // TODO(stichnot): Since WorkQueue has an enforced maximum size,
+ // implement it on top of something like std::array to minimize
+ // contention.
+ alignas(MaxCacheLineSize) std::queue<Cfg *> WorkQueue;
+ // Lock guards access to WorkQueue and IsEnded.
+ alignas(MaxCacheLineSize) GlobalLockType Lock;
+
+ // GrewOrEnded is written by the producer and read by the
+ // consumers. It is notified (by the producer) when something is
+ // added to the queue, in case consumers are waiting for a
+ // non-empty queue.
+ alignas(MaxCacheLineSize) std::condition_variable GrewOrEnded;
+
+ // Shrunk is notified (by the consumer) when something is removed
+ // from the queue, in case the producer is waiting for the queue
+ // to drop below maximum capacity. It is written by the consumers
+ // and read by the producer.
+ alignas(MaxCacheLineSize) std::condition_variable Shrunk;
+
+ // MaxSize and Sequential are read by all and written by none.
+ alignas(MaxCacheLineSize) const size_t MaxSize;
+ const bool Sequential;
+ // IsEnded is read by the consumers, and only written once by the
+ // producer.
+ bool IsEnded;
+ };
+
public:
GlobalContext(Ostream *OsDump, Ostream *OsEmit, ELFStreamer *ELFStreamer,
VerboseMask Mask, TargetArch Arch, OptLevel Opt,
IceString TestPrefix, const ClFlags &Flags);
~GlobalContext();
- // Returns true if any of the specified options in the verbose mask
- // are set. If the argument is omitted, it checks if any verbose
- // options at all are set.
VerboseMask getVerbose() const { return VMask; }
- bool isVerbose(VerboseMask Mask = IceV_All) const { return VMask & Mask; }
- void setVerbose(VerboseMask Mask) { VMask = Mask; }
- void addVerbose(VerboseMask Mask) { VMask |= Mask; }
- void subVerbose(VerboseMask Mask) { VMask &= ~Mask; }
// The dump and emit streams need to be used by only one thread at a
// time. This is done by exclusively reserving the streams via
@@ -129,6 +222,9 @@ public:
TargetArch getTargetArch() const { return Arch; }
OptLevel getOptLevel() const { return Opt; }
+ LockedPtr<std::error_code> getErrorStatus() {
+ return LockedPtr<std::error_code>(&ErrorStatus, &ErrorStatusLock);
+ }
// When emitting assembly, we allow a string to be prepended to
// names of translated functions. This makes it easier to create an
@@ -229,25 +325,91 @@ public:
void dumpTimers(TimerStackIdT StackID = TSK_Default,
bool DumpCumulative = true);
+ // Adds a newly parsed and constructed function to the Cfg work
+ // queue. Notifies any idle workers that a new function is
+ // available for translating. May block if the work queue is too
+ // large, in order to control memory footprint.
+ void cfgQueueAdd(Cfg *Func) { CfgQ.add(Func); }
+ // Takes a Cfg from the work queue for translating. May block if
+ // the work queue is currently empty. Returns nullptr if there is
+ // no more work - the queue is empty and either end() has been
+ // called or the Sequential flag was set.
+ Cfg *cfgQueueGet() { return CfgQ.get(); }
+ // Notifies that no more work will be added to the work queue.
+ void cfgQueueEnd() { CfgQ.end(); }
+
+ void startWorkerThreads() {
+ size_t NumWorkers = getFlags().NumTranslationThreads;
+ for (size_t i = 0; i < NumWorkers; ++i) {
+ ThreadContext *WorkerTLS = new ThreadContext();
+ AllThreadContexts.push_back(WorkerTLS);
+ TranslationThreads.push_back(std::thread(
+ &GlobalContext::translateFunctionsWrapper, this, WorkerTLS));
+ }
+ if (NumWorkers) {
+ // TODO(stichnot): start a new thread for the emitter queue worker.
+ }
+ }
+
+ void waitForWorkerThreads() {
+ cfgQueueEnd();
+ // TODO(stichnot): call end() on the emitter work queue.
+ for (std::thread &Worker : TranslationThreads) {
+ Worker.join();
+ }
+ TranslationThreads.clear();
+ // TODO(stichnot): join the emitter thread.
+ }
+
+ // Translation thread startup routine.
+ void translateFunctionsWrapper(ThreadContext *MyTLS) {
+ TLS = MyTLS;
+ translateFunctions();
+ }
+ // Translate functions from the Cfg queue until the queue is empty.
+ void translateFunctions();
+
+ // Utility function to match a symbol name against a match string.
+ // This is used in a few cases where we want to take some action on
+ // a particular function or symbol based on a command-line argument,
+ // such as changing the verbose level for a particular function. An
+ // empty Match argument means match everything. Returns true if
+ // there is a match.
+ static bool matchSymbolName(const IceString &SymbolName,
+ const IceString &Match) {
+ return Match.empty() || Match == SymbolName;
+ }
+
private:
- // Try to make sure the mutexes are allocated on separate cache
- // lines, assuming the maximum cache line size is 64.
- const static size_t MaxCacheLineSize = 64;
+ // Try to ensure mutexes are allocated on separate cache lines.
+
+ // Managed by getAllocator()
alignas(MaxCacheLineSize) GlobalLockType AllocLock;
+ ArenaAllocator<> Allocator;
+
+ // Managed by getConstantPool()
alignas(MaxCacheLineSize) GlobalLockType ConstPoolLock;
+ std::unique_ptr<ConstantPool> ConstPool;
+
+ // Managed by getErrorStatus()
+ alignas(MaxCacheLineSize) GlobalLockType ErrorStatusLock;
+ std::error_code ErrorStatus;
+
+ // Managed by getStatsCumulative()
alignas(MaxCacheLineSize) GlobalLockType StatsLock;
+ CodeStats StatsCumulative;
+
+ // Managed by getTimers()
alignas(MaxCacheLineSize) GlobalLockType TimerLock;
+ std::vector<TimerStack> Timers;
// StrLock is a global lock on the dump and emit output streams.
typedef std::mutex StrLockType;
- StrLockType StrLock;
-
+ alignas(MaxCacheLineSize) StrLockType StrLock;
Ostream *StrDump; // Stream for dumping / diagnostics
Ostream *StrEmit; // Stream for code emission
- ArenaAllocator<> Allocator;
- VerboseMask VMask;
- std::unique_ptr<ConstantPool> ConstPool;
+ const VerboseMask VMask;
JF 2015/01/25 22:57:00 alignas after the streams.
Jim Stichnoth 2015/01/26 04:59:44 Done. (wish it was easier to add these boundaries
JF 2015/01/26 17:54:50 Your wish is my command: #define ICE_CACHELINE_BO
JF 2015/01/26 19:11:48 A standard-compliant solution pointed out by Richa
Jim Stichnoth 2015/01/27 00:56:18 Done.
Jim Stichnoth 2015/01/27 00:56:18 Cool, thanks!
Intrinsics IntrinsicsInfo;
const TargetArch Arch;
const OptLevel Opt;
@@ -255,8 +417,7 @@ private:
const ClFlags &Flags;
RandomNumberGenerator RNG;
JF 2015/01/25 22:57:00 Add a TODO to move the out of this class.
Jim Stichnoth 2015/01/26 04:59:43 Done.
std::unique_ptr<ELFObjectWriter> ObjectWriter;
- CodeStats StatsCumulative;
- std::vector<TimerStack> Timers;
+ CfgQueue CfgQ;
LockedPtr<ArenaAllocator<>> getAllocator() {
return LockedPtr<ArenaAllocator<>>(&Allocator, &AllocLock);
@@ -272,6 +433,7 @@ private:
}
std::vector<ThreadContext *> AllThreadContexts;
+ std::vector<std::thread> TranslationThreads;
// Each thread has its own TLS pointer which is also held in
// AllThreadContexts.
ICE_ATTRIBUTE_TLS static ThreadContext *TLS;
« no previous file with comments | « src/IceDefs.h ('k') | src/IceGlobalContext.cpp » ('j') | src/IceGlobalContext.cpp » ('J')

Powered by Google App Engine
This is Rietveld 408576698