Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Unified Diff: src/IceGlobalContext.h

Issue 870653002: Subzero: Initial implementation of multithreaded translation. (Closed) Base URL: https://chromium.googlesource.com/native_client/pnacl-subzero.git@master
Patch Set: Code review changes, continued Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « src/IceDefs.h ('k') | src/IceGlobalContext.cpp » ('j') | src/IceTranslator.h » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: src/IceGlobalContext.h
diff --git a/src/IceGlobalContext.h b/src/IceGlobalContext.h
index 1f0a600d485fa8dcba7d3c5915d0bdc936e0428d..5cec34bfe623294a5f632d7cb184da4f1b3535dd 100644
--- a/src/IceGlobalContext.h
+++ b/src/IceGlobalContext.h
@@ -15,8 +15,9 @@
#ifndef SUBZERO_SRC_ICEGLOBALCONTEXT_H
#define SUBZERO_SRC_ICEGLOBALCONTEXT_H
-#include <memory>
#include <mutex>
+#include <queue>
+#include <thread>
#include "IceDefs.h"
#include "IceClFlags.h"
@@ -96,20 +97,97 @@ class GlobalContext {
std::vector<TimerStack> Timers;
};
+ // CfgQueue is the translation work queue. It allows multiple
+ // producers and multiple consumers (though currently only a single
+ // producer is used). The producer adds entries using add(), and
+ // may block if the queue is "full" to control Cfg memory footprint.
+ // The producer uses end() to indicate that no more entries will be
+ // added. The consumer removes an item using get(), which will
+ // return nullptr if end() has been called and the queue is empty.
+ //
+ // The MaxSize ctor arg controls the maximum size the queue can grow
+ // to. The Sequential arg indicates purely sequential execution in
+ // which the single thread should never wait().
JF 2015/01/23 23:01:47 Shouldn't the locks be entirely bypassed when sequ
Jim Stichnoth 2015/01/25 07:29:38 Nice, done.
+ //
+ // Two condition variables are used in the implementation.
+ // GrewOrEnded signals waiting workers that the producer has changed
+ // the state of the queue. Shrunk signals a blocked producer that a
+ // consumer has changed the state of the queue.
+ class CfgQueue {
+ public:
+ CfgQueue(size_t MaxSize, bool Sequential)
+ : IsEnded(false), MaxSize(MaxSize), Sequential(Sequential) {}
JF 2015/01/23 23:01:47 Should the CfgQueue assert that the queue is empty
Jim Stichnoth 2015/01/25 07:29:38 I don't see how the WorkQueue could ever be non-em
+ void add(Cfg *Func) {
+ std::unique_lock<GlobalLockType> L(Lock);
+ // If the work queue is already "full", wait for a consumer to
+ // grab an element and shrink the queue.
+ while (!Sequential && WorkQueue.size() >= MaxSize) {
+ Shrunk.wait(L);
+ }
+ WorkQueue.push(Func);
+ L.unlock();
+ GrewOrEnded.notify_one();
+ }
+ Cfg *get() {
+ std::unique_lock<GlobalLockType> L(Lock);
+ while (!IsEnded || !WorkQueue.empty()) {
+ if (!WorkQueue.empty()) {
+ Cfg *Func = WorkQueue.front();
+ WorkQueue.pop();
+ L.unlock();
+ Shrunk.notify_one();
+ return Func;
+ }
+ // If the work queue is empty, and this is pure sequential
+ // execution, then return nullptr.
+ if (Sequential)
+ return nullptr;
+ GrewOrEnded.wait(L);
+ }
+ return nullptr;
+ }
+ void end() {
+ std::unique_lock<GlobalLockType> L(Lock);
+ IsEnded = true;
+ L.unlock();
+ GrewOrEnded.notify_all();
+ }
+
+ private:
+ // WorkQueue and Lock are read/written by all.
+ // TODO(stichnot): Since WorkQueue has an enforced maximum size,
+ // implement it on top of something like std::array to minimize
+ // contention.
+ alignas(MaxCacheLineSize) std::queue<Cfg *> WorkQueue;
+ // Lock guards access to WorkQueue and IsEnded.
+ alignas(MaxCacheLineSize) GlobalLockType Lock;
+
+ // IsEnded and GrewOrEnded are written by the producer and read by
+ // the consumers.
+ alignas(MaxCacheLineSize) bool IsEnded;
JF 2015/01/23 22:22:11 Move to end with Sequential: it's only written to
Jim Stichnoth 2015/01/25 07:29:38 Done.
+ // GrewOrEnded is notified (by the producer) when something is
+ // added to the queue, in case consumers are waiting for a
+ // non-empty queue.
+ std::condition_variable GrewOrEnded;
+
+ // Shrunk is notified (by the consumer) when something is removed
+ // from the queue, in case the producer is waiting for the queue
+ // to drop below maximum capacity. It is written by the consumers
+ // and read by the producer.
+ alignas(MaxCacheLineSize) std::condition_variable Shrunk;
+
+ // MaxSize and Sequential are read by all and written by none.
+ alignas(MaxCacheLineSize) const size_t MaxSize;
+ const bool Sequential;
+ };
+
public:
GlobalContext(Ostream *OsDump, Ostream *OsEmit, ELFStreamer *ELFStreamer,
VerboseMask Mask, TargetArch Arch, OptLevel Opt,
IceString TestPrefix, const ClFlags &Flags);
~GlobalContext();
- // Returns true if any of the specified options in the verbose mask
- // are set. If the argument is omitted, it checks if any verbose
- // options at all are set.
VerboseMask getVerbose() const { return VMask; }
- bool isVerbose(VerboseMask Mask = IceV_All) const { return VMask & Mask; }
- void setVerbose(VerboseMask Mask) { VMask = Mask; }
- void addVerbose(VerboseMask Mask) { VMask |= Mask; }
- void subVerbose(VerboseMask Mask) { VMask &= ~Mask; }
// The dump and emit streams need to be used by only one thread at a
// time. This is done by exclusively reserving the streams via
@@ -129,6 +207,7 @@ public:
TargetArch getTargetArch() const { return Arch; }
OptLevel getOptLevel() const { return Opt; }
+ std::error_code getErrorStatus() const { return ErrorStatus; }
// When emitting assembly, we allow a string to be prepended to
// names of translated functions. This makes it easier to create an
@@ -229,10 +308,63 @@ public:
void dumpTimers(TimerStackIdT StackID = TSK_Default,
bool DumpCumulative = true);
+ // Adds a newly parsed and constructed function to the Cfg work
+ // queue. Notifies any idle workers that a new function is
+ // available for translating. May block if the work queue is too
+ // large, in order to control memory footprint.
+ void cfgQueueAdd(Cfg *Func) { CfgQ.add(Func); }
+ // Takes a Cfg from the work queue for translating. May block if
+ // the work queue is currently empty. Returns nullptr if there is
+ // no more work - the queue is empty and either end() has been
+ // called or the Sequential flag was set.
+ Cfg *cfgQueueGet() { return CfgQ.get(); }
+ // Notifies that no more work will be added to the work queue.
+ void cfgQueueEnd() { CfgQ.end(); }
+
+ void startWorkerThreads() {
+ size_t NumWorkers = getFlags().NumTranslationThreads;
+ for (size_t i = 0; i < NumWorkers; ++i) {
+ ThreadContext *WorkerTLS = new ThreadContext();
+ AllThreadContexts.push_back(WorkerTLS);
+ TranslationThreads.push_back(std::thread(
+ &GlobalContext::translateFunctionsWrapper, this, WorkerTLS));
+ }
+ if (NumWorkers) {
+ // TODO(stichnot): start a new thread for the emitter queue worker.
+ }
+ }
+
+ void waitForWorkerThreads() {
+ cfgQueueEnd();
+ // TODO(stichnot): call end() on the emitter work queue.
+ for (std::thread &Worker : TranslationThreads) {
+ Worker.join();
+ }
+ TranslationThreads.clear();
+ // TODO(stichnot): join the emitter thread.
+ }
+
+ // Translation thread startup routine.
+ void translateFunctionsWrapper(ThreadContext *MyTLS) {
+ TLS = MyTLS;
+ translateFunctions();
+ }
+ // Translate functions from the Cfg queue until the queue is empty.
+ void translateFunctions();
+
+ // Utility function to match a symbol name against a match string.
+ // This is used in a few cases where we want to take some action on
+ // a particular function or symbol based on a command-line argument,
+ // such as changing the verbose level for a particular function. An
+ // empty Match argument means match everything. Returns true if
+ // there is a match.
+ static bool matchSymbolName(const IceString &SymbolName,
+ const IceString &Match) {
+ return Match.empty() || Match == SymbolName;
+ }
+
private:
- // Try to make sure the mutexes are allocated on separate cache
- // lines, assuming the maximum cache line size is 64.
- const static size_t MaxCacheLineSize = 64;
+ // Try to ensure mutexes are allocated on separate cache lines.
alignas(MaxCacheLineSize) GlobalLockType AllocLock;
alignas(MaxCacheLineSize) GlobalLockType ConstPoolLock;
alignas(MaxCacheLineSize) GlobalLockType StatsLock;
@@ -257,6 +389,8 @@ private:
std::unique_ptr<ELFObjectWriter> ObjectWriter;
CodeStats StatsCumulative;
std::vector<TimerStack> Timers;
+ CfgQueue CfgQ;
+ std::error_code ErrorStatus;
LockedPtr<ArenaAllocator<>> getAllocator() {
return LockedPtr<ArenaAllocator<>>(&Allocator, &AllocLock);
@@ -272,6 +406,7 @@ private:
}
std::vector<ThreadContext *> AllThreadContexts;
+ std::vector<std::thread> TranslationThreads;
// Each thread has its own TLS pointer which is also held in
// AllThreadContexts.
ICE_ATTRIBUTE_TLS static ThreadContext *TLS;
« no previous file with comments | « src/IceDefs.h ('k') | src/IceGlobalContext.cpp » ('j') | src/IceTranslator.h » ('J')

Powered by Google App Engine
This is Rietveld 408576698