Chromium Code Reviews| Index: src/IceGlobalContext.h |
| diff --git a/src/IceGlobalContext.h b/src/IceGlobalContext.h |
| index 751147ee48502811e0f37eaf21c92d0dae0ad17c..52a082ecdf8843731730994102f8aa7740d0f2c1 100644 |
| --- a/src/IceGlobalContext.h |
| +++ b/src/IceGlobalContext.h |
| @@ -15,8 +15,9 @@ |
| #ifndef SUBZERO_SRC_ICEGLOBALCONTEXT_H |
| #define SUBZERO_SRC_ICEGLOBALCONTEXT_H |
| -#include <memory> |
| #include <mutex> |
| +#include <queue> |
| +#include <thread> |
| #include "IceDefs.h" |
| #include "IceClFlags.h" |
| @@ -96,20 +97,86 @@ class GlobalContext { |
| std::vector<TimerStack> Timers; |
| }; |
| + // CfgQueue is the translation work queue. It allows multiple |
| + // producers and multiple consumers (though currently only a single |
| + // producer is used). The producer adds entries using add(), and |
| + // may block if the queue is "full" to control Cfg memory footprint. |
| + // The producer uses end() to indicate that no more entries will be |
| + // added. The consumer removes an item using get(), which will |
| + // return nullptr if end() has been called and the queue is empty. |
| + // |
| + // The MaxSize ctor arg controls the maximum size the queue can grow |
| + // to. The Sequential arg indicates purely sequential execution in |
| + // which the single thread should never wait(). |
| + // |
| + // Two condition variables are used in the implementation. |
| + // GrewOrEnded signals waiting workers that the producer has changed |
| + // the state of the queue. Shrunk signals a blocked producer that a |
| + // consumer has changed the state of the queue. |
| + class CfgQueue { |
| + public: |
| + CfgQueue(size_t MaxSize, bool Sequential) |
| + : MaxSize(MaxSize), Sequential(Sequential), IsEnded(false) {} |
| + void add(Cfg *Func) { |
| + std::unique_lock<GlobalLockType> L(Lock); |
| + // If the work queue is already "full", wait for a consumer to |
| + // grab an element and shrink the queue. |
| + while (!Sequential && WorkQueue.size() >= MaxSize) { |
| + Shrunk.wait(L); |
| + } |
| + WorkQueue.push(Func); |
| + L.unlock(); |
| + GrewOrEnded.notify_one(); |
| + } |
| + Cfg *get() { |
| + std::unique_lock<GlobalLockType> L(Lock); |
| + while (!IsEnded || !WorkQueue.empty()) { |
| + if (!WorkQueue.empty()) { |
| + Cfg *Func = WorkQueue.front(); |
| + WorkQueue.pop(); |
| + L.unlock(); |
| + Shrunk.notify_one(); |
| + return Func; |
| + } |
| + // If the work queue is empty, and this is pure sequential |
| + // execution, then return nullptr. |
| + if (Sequential) |
| + return nullptr; |
| + GrewOrEnded.wait(L); |
| + } |
| + return nullptr; |
| + } |
| + void end() { |
| + std::unique_lock<GlobalLockType> L(Lock); |
| + IsEnded = true; |
| + L.unlock(); |
| + GrewOrEnded.notify_all(); |
| + } |
| + |
| + private: |
| + std::queue<Cfg *> WorkQueue; |
| + // Lock guards access to WorkQueue and IsEnded. |
| + GlobalLockType Lock; |
| + // GrewOrEnded is notified (by the producer) when something is |
| + // added to the queue, in case consumers are waiting for a |
| + // non-empty queue. |
| + std::condition_variable GrewOrEnded; |
| + // Shrunk is notified (by the consumer) when something is removed |
| + // from the queue, in case the producer is waiting for the queue |
| + // to drop below maximum capacity. |
| + std::condition_variable Shrunk; |
| + const size_t MaxSize; |
| + const bool Sequential; |
| + bool IsEnded; |
|
JF
2015/01/23 17:22:43
I think it's worth reordering and aligning to cach
Jim Stichnoth
2015/01/23 21:54:02
Done. And added a TODO about considering std::arr
|
| + }; |
| + |
| public: |
| GlobalContext(Ostream *OsDump, Ostream *OsEmit, ELFStreamer *ELFStreamer, |
| VerboseMask Mask, TargetArch Arch, OptLevel Opt, |
| IceString TestPrefix, const ClFlags &Flags); |
| ~GlobalContext(); |
| - // Returns true if any of the specified options in the verbose mask |
| - // are set. If the argument is omitted, it checks if any verbose |
| - // options at all are set. |
| VerboseMask getVerbose() const { return VMask; } |
| - bool isVerbose(VerboseMask Mask = IceV_All) const { return VMask & Mask; } |
| - void setVerbose(VerboseMask Mask) { VMask = Mask; } |
| - void addVerbose(VerboseMask Mask) { VMask |= Mask; } |
| - void subVerbose(VerboseMask Mask) { VMask &= ~Mask; } |
| // The dump and emit streams need to be used by only one thread at a |
| // time. This is done by exclusively reserving the streams via |
| @@ -129,6 +196,7 @@ public: |
| TargetArch getTargetArch() const { return Arch; } |
| OptLevel getOptLevel() const { return Opt; } |
| + std::error_code getErrorStatus() const { return ErrorStatus; } |
| // When emitting assembly, we allow a string to be prepended to |
| // names of translated functions. This makes it easier to create an |
| @@ -229,6 +297,61 @@ public: |
| void dumpTimers(TimerStackIdT StackID = TSK_Default, |
| bool DumpCumulative = true); |
| + // Adds a newly parsed and constructed function to the Cfg work |
| + // queue. Notifies any idle workers that a new function is |
| + // available for translating. May block if the work queue is too |
| + // large, in order to control memory footprint. |
| + void cfgQueueAdd(Cfg *Func) { CfgQ.add(Func); } |
| + // Takes a Cfg from the work queue for translating. May block if |
| + // the work queue is currently empty. Returns nullptr if there is |
| + // no more work - the queue is empty and either end() has been |
| + // called or the Sequential flag was set. |
| + Cfg *cfgQueueGet() { return CfgQ.get(); } |
| + // Notifies that no more work will be added to the work queue. |
| + void cfgQueueEnd() { CfgQ.end(); } |
| + |
| + void startWorkerThreads() { |
| + size_t NumWorkers = getFlags().NumTranslationThreads; |
| + for (size_t i = 0; i < NumWorkers; ++i) { |
| + ThreadContext *WorkerTLS = new ThreadContext(); |
| + AllThreadContexts.push_back(WorkerTLS); |
| + TranslationThreads.push_back(std::thread( |
| + &GlobalContext::translateFunctionsWrapper, this, WorkerTLS)); |
| + } |
| + if (NumWorkers) { |
| + // TODO(stichnot): start a new thread for the emitter queue worker. |
| + } |
| + } |
| + |
| + void waitForWorkerThreads() { |
| + cfgQueueEnd(); |
| + // TODO(stichnot): call end() on the emitter work queue. |
| + for (std::thread &Worker : TranslationThreads) { |
| + Worker.join(); |
| + } |
| + TranslationThreads.clear(); |
| + // TODO(stichnot): join the emitter thread. |
| + } |
| + |
| + // Translation thread startup routine. |
| + void translateFunctionsWrapper(ThreadContext *MyTLS) { |
| + TLS = MyTLS; |
| + translateFunctions(); |
| + } |
| + // Translate functions from the Cfg queue until the queue is empty. |
| + void translateFunctions(); |
| + |
| + // Utility function to match a symbol name against a match string. |
| + // This is used in a few cases where we want to take some action on |
| + // a particular function or symbol based on a command-line argument, |
| + // such as changing the verbose level for a particular function. An |
| + // empty Match argument means match everything. Returns true if |
| + // there is a match. |
| + static bool matchSymbolName(const IceString &SymbolName, |
| + const IceString &Match) { |
| + return Match.empty() || Match == SymbolName; |
| + } |
| + |
| private: |
| // Try to make sure the mutexes are allocated on separate cache |
| // lines, assuming the maximum cache line size is 64. |
| @@ -257,6 +380,8 @@ private: |
| std::unique_ptr<ELFObjectWriter> ObjectWriter; |
| CodeStats StatsCumulative; |
| std::vector<TimerStack> Timers; |
| + CfgQueue CfgQ; |
| + std::error_code ErrorStatus; |
| LockedPtr<ArenaAllocator<>> getAllocator() { |
| return LockedPtr<ArenaAllocator<>>(&Allocator, &AllocLock); |
| @@ -272,6 +397,7 @@ private: |
| } |
| std::vector<ThreadContext *> AllThreadContexts; |
| + std::vector<std::thread> TranslationThreads; |
| // Each thread has its own TLS pointer which is also held in |
| // AllThreadContexts. |
| thread_local static ThreadContext *TLS; |