Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1053)

Unified Diff: src/core/SkTaskGroup.cpp

Issue 1192573003: Add and use SkSemaphore (Closed) Base URL: https://skia.googlesource.com/skia.git@master
Patch Set: typo Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « src/core/SkSemaphore.cpp ('k') | src/utils/SkCondVar.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: src/core/SkTaskGroup.cpp
diff --git a/src/core/SkTaskGroup.cpp b/src/core/SkTaskGroup.cpp
index 17c61af960b4d02cb572aae26f6191ca4e855ee2..59319c148d649c4d69f1ab6c30af5d5d338c46a5 100644
--- a/src/core/SkTaskGroup.cpp
+++ b/src/core/SkTaskGroup.cpp
@@ -5,12 +5,11 @@
* found in the LICENSE file.
*/
-#include "SkTaskGroup.h"
-
-#include "SkCondVar.h"
#include "SkRunnable.h"
+#include "SkSemaphore.h"
+#include "SkSpinlock.h"
#include "SkTDArray.h"
-#include "SkThread.h"
+#include "SkTaskGroup.h"
#include "SkThreadUtils.h"
#if defined(SK_BUILD_FOR_WIN32)
@@ -63,7 +62,10 @@ public:
// Lend a hand until our SkTaskGroup of interest is done.
Work work;
{
- AutoLock lock(&gGlobal->fReady);
+ // We're stealing work opportunistically,
+ // so we never call fWorkAvailable.wait(), which could sleep us if there's no work.
+ // This means fWorkAvailable is only an upper bound on fWork.count().
+ AutoLock lock(&gGlobal->fWorkLock);
if (gGlobal->fWork.isEmpty()) {
// Someone has picked up all the work (including ours). How nice of them!
// (They may still be working on it, so we can't assert *pending == 0 here.)
@@ -80,10 +82,10 @@ public:
private:
struct AutoLock {
- AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
- ~AutoLock() { fC->unlock(); }
+ AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); }
+ ~AutoLock() { fLock->release(); }
private:
- SkCondVar* fC;
+ SkSpinlock* fLock;
};
static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
@@ -94,7 +96,7 @@ private:
SkAtomic<int32_t>* pending; // then decrement pending afterwards.
};
- explicit ThreadPool(int threads) : fDraining(false) {
+ explicit ThreadPool(int threads) {
if (threads == -1) {
threads = num_cores();
}
@@ -106,11 +108,13 @@ private:
~ThreadPool() {
SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now.
- {
- AutoLock lock(&fReady);
- fDraining = true;
- fReady.broadcast();
+
+ // Send a poison pill to each thread.
+ SkAtomic<int> dummy(0);
+ for (int i = 0; i < fThreads.count(); i++) {
+ this->add(NULL, NULL, &dummy);
}
+ // Wait for them all to swallow the pill and die.
for (int i = 0; i < fThreads.count(); i++) {
fThreads[i]->join();
}
@@ -122,50 +126,65 @@ private:
Work work = { fn, arg, pending };
pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed.
{
- AutoLock lock(&fReady);
+ AutoLock lock(&fWorkLock);
fWork.push(work);
- fReady.signal();
}
+ fWorkAvailable.signal(1);
}
void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int32_t>* pending) {
pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed.
{
- AutoLock lock(&fReady);
+ AutoLock lock(&fWorkLock);
Work* batch = fWork.append(N);
for (int i = 0; i < N; i++) {
Work work = { fn, (char*)arg + i*stride, pending };
batch[i] = work;
}
- fReady.broadcast();
}
+ fWorkAvailable.signal(N);
}
static void Loop(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
Work work;
while (true) {
+ // Sleep until there's work available, and claim one unit of Work as we wake.
+ pool->fWorkAvailable.wait();
{
- AutoLock lock(&pool->fReady);
- while (pool->fWork.isEmpty()) {
- if (pool->fDraining) {
- return;
- }
- pool->fReady.wait();
+ AutoLock lock(&pool->fWorkLock);
+ if (pool->fWork.isEmpty()) {
+ // Someone in Wait() stole our work (fWorkAvailable is an upper bound).
+ // Well, that's fine, back to sleep for us.
+ continue;
}
pool->fWork.pop(&work);
}
+ if (!work.fn) {
+ return; // Poison pill. Time... to die.
+ }
work.fn(work.arg);
work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait().
}
}
- SkTDArray<Work> fWork;
- SkTDArray<SkThread*> fThreads;
- SkCondVar fReady;
- bool fDraining;
+ // fWorkLock must be held when reading or modifying fWork.
+ SkSpinlock fWorkLock;
+ SkTDArray<Work> fWork;
+ // A thread-safe upper bound for fWork.count().
+ //
+ // We'd have it be an exact count but for the loop in Wait():
+ // we never want that to block, so it can't call fWorkAvailable.wait(),
+ // and that's the only way to decrement fWorkAvailable.
+ // So fWorkAvailable may overcount actual the work available.
+ // We make do, but this means some worker threads may wake spuriously.
+ SkSemaphore fWorkAvailable;
+
+ // These are only changed in a single-threaded context.
+ SkTDArray<SkThread*> fThreads;
static ThreadPool* gGlobal;
+
friend struct SkTaskGroup::Enabler;
};
ThreadPool* ThreadPool::gGlobal = NULL;
@@ -174,7 +193,7 @@ ThreadPool* ThreadPool::gGlobal = NULL;
SkTaskGroup::Enabler::Enabler(int threads) {
SkASSERT(ThreadPool::gGlobal == NULL);
- if (threads != 0 && SkCondVar::Supported()) {
+ if (threads != 0) {
ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
}
}
« no previous file with comments | « src/core/SkSemaphore.cpp ('k') | src/utils/SkCondVar.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698