Index: src/core/SkTaskGroup.cpp |
diff --git a/src/core/SkTaskGroup.cpp b/src/core/SkTaskGroup.cpp |
index 17c61af960b4d02cb572aae26f6191ca4e855ee2..59319c148d649c4d69f1ab6c30af5d5d338c46a5 100644 |
--- a/src/core/SkTaskGroup.cpp |
+++ b/src/core/SkTaskGroup.cpp |
@@ -5,12 +5,11 @@ |
* found in the LICENSE file. |
*/ |
-#include "SkTaskGroup.h" |
- |
-#include "SkCondVar.h" |
#include "SkRunnable.h" |
+#include "SkSemaphore.h" |
+#include "SkSpinlock.h" |
#include "SkTDArray.h" |
-#include "SkThread.h" |
+#include "SkTaskGroup.h" |
#include "SkThreadUtils.h" |
#if defined(SK_BUILD_FOR_WIN32) |
@@ -63,7 +62,10 @@ public: |
// Lend a hand until our SkTaskGroup of interest is done. |
Work work; |
{ |
- AutoLock lock(&gGlobal->fReady); |
+ // We're stealing work opportunistically, |
+ // so we never call fWorkAvailable.wait(), which could sleep us if there's no work. |
+ // This means fWorkAvailable is only an upper bound on fWork.count(). |
+ AutoLock lock(&gGlobal->fWorkLock); |
if (gGlobal->fWork.isEmpty()) { |
// Someone has picked up all the work (including ours). How nice of them! |
// (They may still be working on it, so we can't assert *pending == 0 here.) |
@@ -80,10 +82,10 @@ public: |
private: |
struct AutoLock { |
- AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } |
- ~AutoLock() { fC->unlock(); } |
+ AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } |
+ ~AutoLock() { fLock->release(); } |
private: |
- SkCondVar* fC; |
+ SkSpinlock* fLock; |
}; |
static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); } |
@@ -94,7 +96,7 @@ private: |
SkAtomic<int32_t>* pending; // then decrement pending afterwards. |
}; |
- explicit ThreadPool(int threads) : fDraining(false) { |
+ explicit ThreadPool(int threads) { |
if (threads == -1) { |
threads = num_cores(); |
} |
@@ -106,11 +108,13 @@ private: |
~ThreadPool() { |
SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now. |
- { |
- AutoLock lock(&fReady); |
- fDraining = true; |
- fReady.broadcast(); |
+ |
+ // Send a poison pill to each thread. |
+ SkAtomic<int> dummy(0); |
+ for (int i = 0; i < fThreads.count(); i++) { |
+ this->add(NULL, NULL, &dummy); |
} |
+ // Wait for them all to swallow the pill and die. |
for (int i = 0; i < fThreads.count(); i++) { |
fThreads[i]->join(); |
} |
@@ -122,50 +126,65 @@ private: |
Work work = { fn, arg, pending }; |
pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. |
{ |
- AutoLock lock(&fReady); |
+ AutoLock lock(&fWorkLock); |
fWork.push(work); |
- fReady.signal(); |
} |
+ fWorkAvailable.signal(1); |
} |
void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int32_t>* pending) { |
pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. |
{ |
- AutoLock lock(&fReady); |
+ AutoLock lock(&fWorkLock); |
Work* batch = fWork.append(N); |
for (int i = 0; i < N; i++) { |
Work work = { fn, (char*)arg + i*stride, pending }; |
batch[i] = work; |
} |
- fReady.broadcast(); |
} |
+ fWorkAvailable.signal(N); |
} |
static void Loop(void* arg) { |
ThreadPool* pool = (ThreadPool*)arg; |
Work work; |
while (true) { |
+ // Sleep until there's work available, and claim one unit of Work as we wake. |
+ pool->fWorkAvailable.wait(); |
{ |
- AutoLock lock(&pool->fReady); |
- while (pool->fWork.isEmpty()) { |
- if (pool->fDraining) { |
- return; |
- } |
- pool->fReady.wait(); |
+ AutoLock lock(&pool->fWorkLock); |
+ if (pool->fWork.isEmpty()) { |
+ // Someone in Wait() stole our work (fWorkAvailable is an upper bound). |
+ // Well, that's fine, back to sleep for us. |
+ continue; |
} |
pool->fWork.pop(&work); |
} |
+ if (!work.fn) { |
+ return; // Poison pill. Time... to die. |
+ } |
work.fn(work.arg); |
work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait(). |
} |
} |
- SkTDArray<Work> fWork; |
- SkTDArray<SkThread*> fThreads; |
- SkCondVar fReady; |
- bool fDraining; |
+ // fWorkLock must be held when reading or modifying fWork. |
+ SkSpinlock fWorkLock; |
+ SkTDArray<Work> fWork; |
+ // A thread-safe upper bound for fWork.count(). |
+ // |
+ // We'd have it be an exact count but for the loop in Wait(): |
+ // we never want that to block, so it can't call fWorkAvailable.wait(), |
+ // and that's the only way to decrement fWorkAvailable. |
+ // So fWorkAvailable may overcount actual the work available. |
+ // We make do, but this means some worker threads may wake spuriously. |
+ SkSemaphore fWorkAvailable; |
+ |
+ // These are only changed in a single-threaded context. |
+ SkTDArray<SkThread*> fThreads; |
static ThreadPool* gGlobal; |
+ |
friend struct SkTaskGroup::Enabler; |
}; |
ThreadPool* ThreadPool::gGlobal = NULL; |
@@ -174,7 +193,7 @@ ThreadPool* ThreadPool::gGlobal = NULL; |
SkTaskGroup::Enabler::Enabler(int threads) { |
SkASSERT(ThreadPool::gGlobal == NULL); |
- if (threads != 0 && SkCondVar::Supported()) { |
+ if (threads != 0) { |
ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); |
} |
} |