| Index: src/core/SkTaskGroup.cpp
|
| diff --git a/src/core/SkTaskGroup.cpp b/src/core/SkTaskGroup.cpp
|
| index 17c61af960b4d02cb572aae26f6191ca4e855ee2..59319c148d649c4d69f1ab6c30af5d5d338c46a5 100644
|
| --- a/src/core/SkTaskGroup.cpp
|
| +++ b/src/core/SkTaskGroup.cpp
|
| @@ -5,12 +5,11 @@
|
| * found in the LICENSE file.
|
| */
|
|
|
| -#include "SkTaskGroup.h"
|
| -
|
| -#include "SkCondVar.h"
|
| #include "SkRunnable.h"
|
| +#include "SkSemaphore.h"
|
| +#include "SkSpinlock.h"
|
| #include "SkTDArray.h"
|
| -#include "SkThread.h"
|
| +#include "SkTaskGroup.h"
|
| #include "SkThreadUtils.h"
|
|
|
| #if defined(SK_BUILD_FOR_WIN32)
|
| @@ -63,7 +62,10 @@ public:
|
| // Lend a hand until our SkTaskGroup of interest is done.
|
| Work work;
|
| {
|
| - AutoLock lock(&gGlobal->fReady);
|
| + // We're stealing work opportunistically,
|
| + // so we never call fWorkAvailable.wait(), which could sleep us if there's no work.
|
| + // This means fWorkAvailable is only an upper bound on fWork.count().
|
| + AutoLock lock(&gGlobal->fWorkLock);
|
| if (gGlobal->fWork.isEmpty()) {
|
| // Someone has picked up all the work (including ours). How nice of them!
|
| // (They may still be working on it, so we can't assert *pending == 0 here.)
|
| @@ -80,10 +82,10 @@ public:
|
|
|
| private:
|
| struct AutoLock {
|
| - AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
|
| - ~AutoLock() { fC->unlock(); }
|
| + AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); }
|
| + ~AutoLock() { fLock->release(); }
|
| private:
|
| - SkCondVar* fC;
|
| + SkSpinlock* fLock;
|
| };
|
|
|
| static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
|
| @@ -94,7 +96,7 @@ private:
|
| SkAtomic<int32_t>* pending; // then decrement pending afterwards.
|
| };
|
|
|
| - explicit ThreadPool(int threads) : fDraining(false) {
|
| + explicit ThreadPool(int threads) {
|
| if (threads == -1) {
|
| threads = num_cores();
|
| }
|
| @@ -106,11 +108,13 @@ private:
|
|
|
| ~ThreadPool() {
|
| SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now.
|
| - {
|
| - AutoLock lock(&fReady);
|
| - fDraining = true;
|
| - fReady.broadcast();
|
| +
|
| + // Send a poison pill to each thread.
|
| + SkAtomic<int> dummy(0);
|
| + for (int i = 0; i < fThreads.count(); i++) {
|
| + this->add(NULL, NULL, &dummy);
|
| }
|
| + // Wait for them all to swallow the pill and die.
|
| for (int i = 0; i < fThreads.count(); i++) {
|
| fThreads[i]->join();
|
| }
|
| @@ -122,50 +126,65 @@ private:
|
| Work work = { fn, arg, pending };
|
| pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed.
|
| {
|
| - AutoLock lock(&fReady);
|
| + AutoLock lock(&fWorkLock);
|
| fWork.push(work);
|
| - fReady.signal();
|
| }
|
| + fWorkAvailable.signal(1);
|
| }
|
|
|
| void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int32_t>* pending) {
|
| pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed.
|
| {
|
| - AutoLock lock(&fReady);
|
| + AutoLock lock(&fWorkLock);
|
| Work* batch = fWork.append(N);
|
| for (int i = 0; i < N; i++) {
|
| Work work = { fn, (char*)arg + i*stride, pending };
|
| batch[i] = work;
|
| }
|
| - fReady.broadcast();
|
| }
|
| + fWorkAvailable.signal(N);
|
| }
|
|
|
| static void Loop(void* arg) {
|
| ThreadPool* pool = (ThreadPool*)arg;
|
| Work work;
|
| while (true) {
|
| + // Sleep until there's work available, and claim one unit of Work as we wake.
|
| + pool->fWorkAvailable.wait();
|
| {
|
| - AutoLock lock(&pool->fReady);
|
| - while (pool->fWork.isEmpty()) {
|
| - if (pool->fDraining) {
|
| - return;
|
| - }
|
| - pool->fReady.wait();
|
| + AutoLock lock(&pool->fWorkLock);
|
| + if (pool->fWork.isEmpty()) {
|
| + // Someone in Wait() stole our work (fWorkAvailable is an upper bound).
|
| + // Well, that's fine, back to sleep for us.
|
| + continue;
|
| }
|
| pool->fWork.pop(&work);
|
| }
|
| + if (!work.fn) {
|
| + return; // Poison pill. Time... to die.
|
| + }
|
| work.fn(work.arg);
|
| work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait().
|
| }
|
| }
|
|
|
| - SkTDArray<Work> fWork;
|
| - SkTDArray<SkThread*> fThreads;
|
| - SkCondVar fReady;
|
| - bool fDraining;
|
| + // fWorkLock must be held when reading or modifying fWork.
|
| + SkSpinlock fWorkLock;
|
| + SkTDArray<Work> fWork;
|
|
|
| + // A thread-safe upper bound for fWork.count().
|
| + //
|
| + // We'd have it be an exact count but for the loop in Wait():
|
| + // we never want that to block, so it can't call fWorkAvailable.wait(),
|
| + // and that's the only way to decrement fWorkAvailable.
|
| + // So fWorkAvailable may overcount actual the work available.
|
| + // We make do, but this means some worker threads may wake spuriously.
|
| + SkSemaphore fWorkAvailable;
|
| +
|
| + // These are only changed in a single-threaded context.
|
| + SkTDArray<SkThread*> fThreads;
|
| static ThreadPool* gGlobal;
|
| +
|
| friend struct SkTaskGroup::Enabler;
|
| };
|
| ThreadPool* ThreadPool::gGlobal = NULL;
|
| @@ -174,7 +193,7 @@ ThreadPool* ThreadPool::gGlobal = NULL;
|
|
|
| SkTaskGroup::Enabler::Enabler(int threads) {
|
| SkASSERT(ThreadPool::gGlobal == NULL);
|
| - if (threads != 0 && SkCondVar::Supported()) {
|
| + if (threads != 0) {
|
| ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
|
| }
|
| }
|
|
|