Index: src/IceGlobalContext.h |
diff --git a/src/IceGlobalContext.h b/src/IceGlobalContext.h |
index 1f0a600d485fa8dcba7d3c5915d0bdc936e0428d..a0f088a4f3f3f0766d5a6ed154ee0e0783ec4981 100644 |
--- a/src/IceGlobalContext.h |
+++ b/src/IceGlobalContext.h |
@@ -15,8 +15,9 @@ |
#ifndef SUBZERO_SRC_ICEGLOBALCONTEXT_H |
#define SUBZERO_SRC_ICEGLOBALCONTEXT_H |
-#include <memory> |
#include <mutex> |
+#include <queue> |
+#include <thread> |
#include "IceDefs.h" |
#include "IceClFlags.h" |
@@ -96,20 +97,112 @@ class GlobalContext { |
std::vector<TimerStack> Timers; |
}; |
+ // CfgQueue is the translation work queue. It allows multiple |
+ // producers and multiple consumers (though currently only a single |
+ // producer is used). The producer adds entries using add(), and |
+ // may block if the queue is "full" to control Cfg memory footprint. |
+ // The producer uses end() to indicate that no more entries will be |
+ // added. The consumer removes an item using get(), which will |
+ // return nullptr if end() has been called and the queue is empty. |
+ // |
+ // The MaxSize ctor arg controls the maximum size the queue can grow |
+ // to. The Sequential arg indicates purely sequential execution in |
+ // which the single thread should never wait(). |
+ // |
+ // Two condition variables are used in the implementation. |
+ // GrewOrEnded signals waiting workers that the producer has changed |
+ // the state of the queue. Shrunk signals a blocked producer that a |
+ // consumer has changed the state of the queue. |
+ // |
+ // The methods begin with Sequential-specific code to be most clear. |
+ // The lock and condition variables are not used in the Sequential |
+ // case. |
+ class CfgQueue { |
JF
2015/01/25 22:57:00
It's probably better to put this class in its own
Jim Stichnoth
2015/01/26 04:59:43
Done.
|
+ public: |
+ CfgQueue(size_t MaxSize, bool Sequential) |
+ : MaxSize(MaxSize), Sequential(Sequential), IsEnded(false) {} |
JF
2015/01/25 22:57:00
WorkQueue.reserve(MaxSize)
Jim Stichnoth
2015/01/26 04:59:43
There's no std::queue::reserve() or std::deque::re
JF
2015/01/26 17:54:50
:(
I'm still not a fan of using queue, the defaul
Jim Stichnoth
2015/01/27 00:56:18
Done.
|
+ void add(Cfg *Func) { |
JF
2015/01/25 22:57:00
I'd rename to blocking_push or something similar,
Jim Stichnoth
2015/01/26 04:59:43
Done. add() --> blockingPush(), and get() --> blo
|
+ if (Sequential) { |
+ WorkQueue.push(Func); |
+ return; |
+ } |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ // If the work queue is already "full", wait for a consumer to |
+ // grab an element and shrink the queue. |
+ while (WorkQueue.size() >= MaxSize) { |
+ Shrunk.wait(L); |
+ } |
JF
2015/01/25 22:57:00
This code:
while (WorkQueue.size() >= MaxSize) {
Jim Stichnoth
2015/01/26 04:59:43
Wow. Done.
(my first ever C++ lambda)
|
+ WorkQueue.push(Func); |
+ L.unlock(); |
+ GrewOrEnded.notify_one(); |
+ } |
+ Cfg *get() { |
JF
2015/01/25 22:57:00
I'd rename to wait_and_pop or something similar.
Jim Stichnoth
2015/01/26 04:59:43
Done.
|
+ if (Sequential) { |
+ Cfg *Func = nullptr; |
+ if (!WorkQueue.empty()) { |
+ Func = WorkQueue.front(); |
+ WorkQueue.pop(); |
+ } |
+ return Func; |
+ } |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ while (!IsEnded || !WorkQueue.empty()) { |
JF
2015/01/25 22:57:00
Similarly here, I'd go with:
Cfg *wait_and_pop()
Jim Stichnoth
2015/01/26 04:59:43
Yeah, your rewrite could have the workers shut dow
JF
2015/01/26 17:54:50
I'm in for killing Sequential, it'll make the non-
Jim Stichnoth
2015/01/27 00:56:18
Done.
|
+ if (!WorkQueue.empty()) { |
+ Cfg *Func = WorkQueue.front(); |
+ WorkQueue.pop(); |
+ L.unlock(); |
+ Shrunk.notify_one(); |
+ return Func; |
+ } |
+ GrewOrEnded.wait(L); |
+ } |
+ return nullptr; |
+ } |
+ void end() { |
+ if (Sequential) |
+ return; |
+ std::unique_lock<GlobalLockType> L(Lock); |
+ IsEnded = true; |
+ L.unlock(); |
JF
2015/01/25 22:57:00
Could you just use a lock_guard and scoping here?
Jim Stichnoth
2015/01/26 04:59:43
I thought about this for all 3 methods, but since
JF
2015/01/26 18:10:40
As discussed offline, lock_guard is simpler than u
Jim Stichnoth
2015/01/27 00:56:18
Done.
|
+ GrewOrEnded.notify_all(); |
+ } |
+ |
+ private: |
JF
2015/01/25 22:57:00
CfGQueue() = delete;
CfgQueue(const CfgQueue &) =
Jim Stichnoth
2015/01/26 04:59:43
<shamecube> Done.
|
+ // WorkQueue and Lock are read/written by all. |
+ // TODO(stichnot): Since WorkQueue has an enforced maximum size, |
+ // implement it on top of something like std::array to minimize |
+ // contention. |
+ alignas(MaxCacheLineSize) std::queue<Cfg *> WorkQueue; |
+ // Lock guards access to WorkQueue and IsEnded. |
+ alignas(MaxCacheLineSize) GlobalLockType Lock; |
+ |
+ // GrewOrEnded is written by the producer and read by the |
+ // consumers. It is notified (by the producer) when something is |
+ // added to the queue, in case consumers are waiting for a |
+ // non-empty queue. |
+ alignas(MaxCacheLineSize) std::condition_variable GrewOrEnded; |
+ |
+ // Shrunk is notified (by the consumer) when something is removed |
+ // from the queue, in case the producer is waiting for the queue |
+ // to drop below maximum capacity. It is written by the consumers |
+ // and read by the producer. |
+ alignas(MaxCacheLineSize) std::condition_variable Shrunk; |
+ |
+ // MaxSize and Sequential are read by all and written by none. |
+ alignas(MaxCacheLineSize) const size_t MaxSize; |
+ const bool Sequential; |
+ // IsEnded is read by the consumers, and only written once by the |
+ // producer. |
+ bool IsEnded; |
+ }; |
+ |
public: |
GlobalContext(Ostream *OsDump, Ostream *OsEmit, ELFStreamer *ELFStreamer, |
VerboseMask Mask, TargetArch Arch, OptLevel Opt, |
IceString TestPrefix, const ClFlags &Flags); |
~GlobalContext(); |
- // Returns true if any of the specified options in the verbose mask |
- // are set. If the argument is omitted, it checks if any verbose |
- // options at all are set. |
VerboseMask getVerbose() const { return VMask; } |
- bool isVerbose(VerboseMask Mask = IceV_All) const { return VMask & Mask; } |
- void setVerbose(VerboseMask Mask) { VMask = Mask; } |
- void addVerbose(VerboseMask Mask) { VMask |= Mask; } |
- void subVerbose(VerboseMask Mask) { VMask &= ~Mask; } |
// The dump and emit streams need to be used by only one thread at a |
// time. This is done by exclusively reserving the streams via |
@@ -129,6 +222,9 @@ public: |
TargetArch getTargetArch() const { return Arch; } |
OptLevel getOptLevel() const { return Opt; } |
+ LockedPtr<std::error_code> getErrorStatus() { |
+ return LockedPtr<std::error_code>(&ErrorStatus, &ErrorStatusLock); |
+ } |
// When emitting assembly, we allow a string to be prepended to |
// names of translated functions. This makes it easier to create an |
@@ -229,25 +325,91 @@ public: |
void dumpTimers(TimerStackIdT StackID = TSK_Default, |
bool DumpCumulative = true); |
+ // Adds a newly parsed and constructed function to the Cfg work |
+ // queue. Notifies any idle workers that a new function is |
+ // available for translating. May block if the work queue is too |
+ // large, in order to control memory footprint. |
+ void cfgQueueAdd(Cfg *Func) { CfgQ.add(Func); } |
+ // Takes a Cfg from the work queue for translating. May block if |
+ // the work queue is currently empty. Returns nullptr if there is |
+ // no more work - the queue is empty and either end() has been |
+ // called or the Sequential flag was set. |
+ Cfg *cfgQueueGet() { return CfgQ.get(); } |
+ // Notifies that no more work will be added to the work queue. |
+ void cfgQueueEnd() { CfgQ.end(); } |
+ |
+ void startWorkerThreads() { |
+ size_t NumWorkers = getFlags().NumTranslationThreads; |
+ for (size_t i = 0; i < NumWorkers; ++i) { |
+ ThreadContext *WorkerTLS = new ThreadContext(); |
+ AllThreadContexts.push_back(WorkerTLS); |
+ TranslationThreads.push_back(std::thread( |
+ &GlobalContext::translateFunctionsWrapper, this, WorkerTLS)); |
+ } |
+ if (NumWorkers) { |
+ // TODO(stichnot): start a new thread for the emitter queue worker. |
+ } |
+ } |
+ |
+ void waitForWorkerThreads() { |
+ cfgQueueEnd(); |
+ // TODO(stichnot): call end() on the emitter work queue. |
+ for (std::thread &Worker : TranslationThreads) { |
+ Worker.join(); |
+ } |
+ TranslationThreads.clear(); |
+ // TODO(stichnot): join the emitter thread. |
+ } |
+ |
+ // Translation thread startup routine. |
+ void translateFunctionsWrapper(ThreadContext *MyTLS) { |
+ TLS = MyTLS; |
+ translateFunctions(); |
+ } |
+ // Translate functions from the Cfg queue until the queue is empty. |
+ void translateFunctions(); |
+ |
+ // Utility function to match a symbol name against a match string. |
+ // This is used in a few cases where we want to take some action on |
+ // a particular function or symbol based on a command-line argument, |
+ // such as changing the verbose level for a particular function. An |
+ // empty Match argument means match everything. Returns true if |
+ // there is a match. |
+ static bool matchSymbolName(const IceString &SymbolName, |
+ const IceString &Match) { |
+ return Match.empty() || Match == SymbolName; |
+ } |
+ |
private: |
- // Try to make sure the mutexes are allocated on separate cache |
- // lines, assuming the maximum cache line size is 64. |
- const static size_t MaxCacheLineSize = 64; |
+ // Try to ensure mutexes are allocated on separate cache lines. |
+ |
+ // Managed by getAllocator() |
alignas(MaxCacheLineSize) GlobalLockType AllocLock; |
+ ArenaAllocator<> Allocator; |
+ |
+ // Managed by getConstantPool() |
alignas(MaxCacheLineSize) GlobalLockType ConstPoolLock; |
+ std::unique_ptr<ConstantPool> ConstPool; |
+ |
+ // Managed by getErrorStatus() |
+ alignas(MaxCacheLineSize) GlobalLockType ErrorStatusLock; |
+ std::error_code ErrorStatus; |
+ |
+ // Managed by getStatsCumulative() |
alignas(MaxCacheLineSize) GlobalLockType StatsLock; |
+ CodeStats StatsCumulative; |
+ |
+ // Managed by getTimers() |
alignas(MaxCacheLineSize) GlobalLockType TimerLock; |
+ std::vector<TimerStack> Timers; |
// StrLock is a global lock on the dump and emit output streams. |
typedef std::mutex StrLockType; |
- StrLockType StrLock; |
- |
+ alignas(MaxCacheLineSize) StrLockType StrLock; |
Ostream *StrDump; // Stream for dumping / diagnostics |
Ostream *StrEmit; // Stream for code emission |
- ArenaAllocator<> Allocator; |
- VerboseMask VMask; |
- std::unique_ptr<ConstantPool> ConstPool; |
+ const VerboseMask VMask; |
JF
2015/01/25 22:57:00
alignas after the streams.
Jim Stichnoth
2015/01/26 04:59:44
Done. (wish it was easier to add these boundaries
JF
2015/01/26 17:54:50
Your wish is my command:
#define ICE_CACHELINE_BO
JF
2015/01/26 19:11:48
A standard-compliant solution pointed out by Richa
Jim Stichnoth
2015/01/27 00:56:18
Done.
Jim Stichnoth
2015/01/27 00:56:18
Cool, thanks!
|
Intrinsics IntrinsicsInfo; |
const TargetArch Arch; |
const OptLevel Opt; |
@@ -255,8 +417,7 @@ private: |
const ClFlags &Flags; |
RandomNumberGenerator RNG; |
JF
2015/01/25 22:57:00
Add a TODO to move the out of this class.
Jim Stichnoth
2015/01/26 04:59:43
Done.
|
std::unique_ptr<ELFObjectWriter> ObjectWriter; |
- CodeStats StatsCumulative; |
- std::vector<TimerStack> Timers; |
+ CfgQueue CfgQ; |
LockedPtr<ArenaAllocator<>> getAllocator() { |
return LockedPtr<ArenaAllocator<>>(&Allocator, &AllocLock); |
@@ -272,6 +433,7 @@ private: |
} |
std::vector<ThreadContext *> AllThreadContexts; |
+ std::vector<std::thread> TranslationThreads; |
// Each thread has its own TLS pointer which is also held in |
// AllThreadContexts. |
ICE_ATTRIBUTE_TLS static ThreadContext *TLS; |