Index: src/IceGlobalContext.h |
diff --git a/src/IceGlobalContext.h b/src/IceGlobalContext.h |
index 1f0a600d485fa8dcba7d3c5915d0bdc936e0428d..5cec34bfe623294a5f632d7cb184da4f1b3535dd 100644 |
--- a/src/IceGlobalContext.h |
+++ b/src/IceGlobalContext.h |
@@ -15,8 +15,9 @@ |
#ifndef SUBZERO_SRC_ICEGLOBALCONTEXT_H |
#define SUBZERO_SRC_ICEGLOBALCONTEXT_H |
-#include <memory> |
#include <mutex> |
+#include <queue> |
+#include <thread> |
#include "IceDefs.h" |
#include "IceClFlags.h" |
@@ -96,20 +97,97 @@ class GlobalContext { |
std::vector<TimerStack> Timers; |
}; |
+ // CfgQueue is the translation work queue. It allows multiple |
+ // producers and multiple consumers (though currently only a single |
+ // producer is used). The producer adds entries using add(), and |
+ // may block if the queue is "full" to control Cfg memory footprint. |
+ // The producer uses end() to indicate that no more entries will be |
+ // added. The consumer removes an item using get(), which will |
+ // return nullptr if end() has been called and the queue is empty. |
+ // |
+ // The MaxSize ctor arg controls the maximum size the queue can grow |
+ // to. The Sequential arg indicates purely sequential execution in |
+ // which the single thread should never wait(). |
JF
2015/01/23 23:01:47
Shouldn't the locks be entirely bypassed when sequ
Jim Stichnoth
2015/01/25 07:29:38
Nice, done.
|
+ // |
+ // Two condition variables are used in the implementation. |
+ // GrewOrEnded signals waiting workers that the producer has changed |
+ // the state of the queue. Shrunk signals a blocked producer that a |
+ // consumer has changed the state of the queue. |
+ class CfgQueue { |
+ public: |
+ CfgQueue(size_t MaxSize, bool Sequential) |
+ : IsEnded(false), MaxSize(MaxSize), Sequential(Sequential) {} |
JF
2015/01/23 23:01:47
Should the CfgQueue assert that the queue is empty
Jim Stichnoth
2015/01/25 07:29:38
I don't see how the WorkQueue could ever be non-em
|
+ void add(Cfg *Func) { |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ // If the work queue is already "full", wait for a consumer to |
+ // grab an element and shrink the queue. |
+ while (!Sequential && WorkQueue.size() >= MaxSize) { |
+ Shrunk.wait(L); |
+ } |
+ WorkQueue.push(Func); |
+ L.unlock(); |
+ GrewOrEnded.notify_one(); |
+ } |
+ Cfg *get() { |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ while (!IsEnded || !WorkQueue.empty()) { |
+ if (!WorkQueue.empty()) { |
+ Cfg *Func = WorkQueue.front(); |
+ WorkQueue.pop(); |
+ L.unlock(); |
+ Shrunk.notify_one(); |
+ return Func; |
+ } |
+ // If the work queue is empty, and this is pure sequential |
+ // execution, then return nullptr. |
+ if (Sequential) |
+ return nullptr; |
+ GrewOrEnded.wait(L); |
+ } |
+ return nullptr; |
+ } |
+ void end() { |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ IsEnded = true; |
+ L.unlock(); |
+ GrewOrEnded.notify_all(); |
+ } |
+ |
+ private: |
+ // WorkQueue and Lock are read/written by all. |
+ // TODO(stichnot): Since WorkQueue has an enforced maximum size, |
+ // implement it on top of something like std::array to minimize |
+ // contention. |
+ alignas(MaxCacheLineSize) std::queue<Cfg *> WorkQueue; |
+ // Lock guards access to WorkQueue and IsEnded. |
+ alignas(MaxCacheLineSize) GlobalLockType Lock; |
+ |
+ // IsEnded and GrewOrEnded are written by the producer and read by |
+ // the consumers. |
+ alignas(MaxCacheLineSize) bool IsEnded; |
JF
2015/01/23 22:22:11
Move to end with Sequential: it's only written to
Jim Stichnoth
2015/01/25 07:29:38
Done.
|
+ // GrewOrEnded is notified (by the producer) when something is |
+ // added to the queue, in case consumers are waiting for a |
+ // non-empty queue. |
+ std::condition_variable GrewOrEnded; |
+ |
+ // Shrunk is notified (by the consumer) when something is removed |
+ // from the queue, in case the producer is waiting for the queue |
+ // to drop below maximum capacity. It is written by the consumers |
+ // and read by the producer. |
+ alignas(MaxCacheLineSize) std::condition_variable Shrunk; |
+ |
+ // MaxSize and Sequential are read by all and written by none. |
+ alignas(MaxCacheLineSize) const size_t MaxSize; |
+ const bool Sequential; |
+ }; |
+ |
public: |
GlobalContext(Ostream *OsDump, Ostream *OsEmit, ELFStreamer *ELFStreamer, |
VerboseMask Mask, TargetArch Arch, OptLevel Opt, |
IceString TestPrefix, const ClFlags &Flags); |
~GlobalContext(); |
- // Returns true if any of the specified options in the verbose mask |
- // are set. If the argument is omitted, it checks if any verbose |
- // options at all are set. |
VerboseMask getVerbose() const { return VMask; } |
- bool isVerbose(VerboseMask Mask = IceV_All) const { return VMask & Mask; } |
- void setVerbose(VerboseMask Mask) { VMask = Mask; } |
- void addVerbose(VerboseMask Mask) { VMask |= Mask; } |
- void subVerbose(VerboseMask Mask) { VMask &= ~Mask; } |
// The dump and emit streams need to be used by only one thread at a |
// time. This is done by exclusively reserving the streams via |
@@ -129,6 +207,7 @@ public: |
TargetArch getTargetArch() const { return Arch; } |
OptLevel getOptLevel() const { return Opt; } |
+ std::error_code getErrorStatus() const { return ErrorStatus; } |
// When emitting assembly, we allow a string to be prepended to |
// names of translated functions. This makes it easier to create an |
@@ -229,10 +308,63 @@ public: |
void dumpTimers(TimerStackIdT StackID = TSK_Default, |
bool DumpCumulative = true); |
+ // Adds a newly parsed and constructed function to the Cfg work |
+ // queue. Notifies any idle workers that a new function is |
+ // available for translating. May block if the work queue is too |
+ // large, in order to control memory footprint. |
+ void cfgQueueAdd(Cfg *Func) { CfgQ.add(Func); } |
+ // Takes a Cfg from the work queue for translating. May block if |
+ // the work queue is currently empty. Returns nullptr if there is |
+ // no more work - the queue is empty and either end() has been |
+ // called or the Sequential flag was set. |
+ Cfg *cfgQueueGet() { return CfgQ.get(); } |
+ // Notifies that no more work will be added to the work queue. |
+ void cfgQueueEnd() { CfgQ.end(); } |
+ |
+ void startWorkerThreads() { |
+ size_t NumWorkers = getFlags().NumTranslationThreads; |
+ for (size_t i = 0; i < NumWorkers; ++i) { |
+ ThreadContext *WorkerTLS = new ThreadContext(); |
+ AllThreadContexts.push_back(WorkerTLS); |
+ TranslationThreads.push_back(std::thread( |
+ &GlobalContext::translateFunctionsWrapper, this, WorkerTLS)); |
+ } |
+ if (NumWorkers) { |
+ // TODO(stichnot): start a new thread for the emitter queue worker. |
+ } |
+ } |
+ |
+ void waitForWorkerThreads() { |
+ cfgQueueEnd(); |
+ // TODO(stichnot): call end() on the emitter work queue. |
+ for (std::thread &Worker : TranslationThreads) { |
+ Worker.join(); |
+ } |
+ TranslationThreads.clear(); |
+ // TODO(stichnot): join the emitter thread. |
+ } |
+ |
+ // Translation thread startup routine. |
+ void translateFunctionsWrapper(ThreadContext *MyTLS) { |
+ TLS = MyTLS; |
+ translateFunctions(); |
+ } |
+ // Translate functions from the Cfg queue until the queue is empty. |
+ void translateFunctions(); |
+ |
+ // Utility function to match a symbol name against a match string. |
+ // This is used in a few cases where we want to take some action on |
+ // a particular function or symbol based on a command-line argument, |
+ // such as changing the verbose level for a particular function. An |
+ // empty Match argument means match everything. Returns true if |
+ // there is a match. |
+ static bool matchSymbolName(const IceString &SymbolName, |
+ const IceString &Match) { |
+ return Match.empty() || Match == SymbolName; |
+ } |
+ |
private: |
- // Try to make sure the mutexes are allocated on separate cache |
- // lines, assuming the maximum cache line size is 64. |
- const static size_t MaxCacheLineSize = 64; |
+ // Try to ensure mutexes are allocated on separate cache lines. |
alignas(MaxCacheLineSize) GlobalLockType AllocLock; |
alignas(MaxCacheLineSize) GlobalLockType ConstPoolLock; |
alignas(MaxCacheLineSize) GlobalLockType StatsLock; |
@@ -257,6 +389,8 @@ private: |
std::unique_ptr<ELFObjectWriter> ObjectWriter; |
CodeStats StatsCumulative; |
std::vector<TimerStack> Timers; |
+ CfgQueue CfgQ; |
+ std::error_code ErrorStatus; |
LockedPtr<ArenaAllocator<>> getAllocator() { |
return LockedPtr<ArenaAllocator<>>(&Allocator, &AllocLock); |
@@ -272,6 +406,7 @@ private: |
} |
std::vector<ThreadContext *> AllThreadContexts; |
+ std::vector<std::thread> TranslationThreads; |
// Each thread has its own TLS pointer which is also held in |
// AllThreadContexts. |
ICE_ATTRIBUTE_TLS static ThreadContext *TLS; |