Index: src/core/SkTaskGroup.cpp |
diff --git a/src/core/SkTaskGroup.cpp b/src/core/SkTaskGroup.cpp |
index 863195cfd3feea11c8db574ddd7e917d90130c5e..e6b8532bb046e5175f4854bee5c2da147aa105e9 100644 |
--- a/src/core/SkTaskGroup.cpp |
+++ b/src/core/SkTaskGroup.cpp |
@@ -9,6 +9,7 @@ |
#include "SkRunnable.h" |
#include "SkSemaphore.h" |
#include "SkSpinlock.h" |
+#include "SkTArray.h" |
#include "SkTDArray.h" |
#include "SkTaskGroup.h" |
#include "SkThreadUtils.h" |
@@ -43,23 +44,22 @@ public: |
if (!gGlobal) { // If we have no threads, run synchronously. |
return task->run(); |
} |
- gGlobal->add(&CallRunnable, task, pending); |
+ gGlobal->add([task]() { task->run(); }, pending); |
} |
- static void Add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { |
+ static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) { |
if (!gGlobal) { |
- return fn(arg); |
+ return fn(); |
} |
- gGlobal->add(fn, arg, pending); |
+ gGlobal->add(fn, pending); |
} |
- static void Batch(void (*fn)(void*), void* args, int N, size_t stride, |
- SkAtomic<int32_t>* pending) { |
+ static void Batch(std::function<void(int)> fn, int N, SkAtomic<int32_t>* pending) { |
if (!gGlobal) { |
- for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } |
+ for (int i = 0; i < N; i++) { fn(i); } |
return; |
} |
- gGlobal->batch(fn, args, N, stride, pending); |
+ gGlobal->batch(fn, N, pending); |
} |
static void Wait(SkAtomic<int32_t>* pending) { |
@@ -76,16 +76,17 @@ public: |
// so we never call fWorkAvailable.wait(), which could sleep us if there's no work. |
// This means fWorkAvailable is only an upper bound on fWork.count(). |
AutoLock lock(&gGlobal->fWorkLock); |
- if (gGlobal->fWork.isEmpty()) { |
+ if (gGlobal->fWork.empty()) { |
// Someone has picked up all the work (including ours). How nice of them! |
// (They may still be working on it, so we can't assert *pending == 0 here.) |
continue; |
} |
- gGlobal->fWork.pop(&work); |
+ work = gGlobal->fWork.back(); |
+ gGlobal->fWork.pop_back(); |
} |
// This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. |
// We threads gotta stick together. We're always making forward progress. |
- work.fn(work.arg); |
+ work.fn(); |
work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above. |
} |
} |
@@ -101,8 +102,7 @@ private: |
static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); } |
struct Work { |
- void (*fn)(void*); // A function to call, |
- void* arg; // its argument, |
+ std::function<void(void)> fn; // A function to call |
SkAtomic<int32_t>* pending; // then decrement pending afterwards. |
}; |
@@ -117,39 +117,38 @@ private: |
} |
~ThreadPool() { |
- SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now. |
+ SkASSERT(fWork.empty()); // All SkTaskGroups should be destroyed by now. |
// Send a poison pill to each thread. |
SkAtomic<int> dummy(0); |
for (int i = 0; i < fThreads.count(); i++) { |
- this->add(nullptr, nullptr, &dummy); |
+ this->add(nullptr, &dummy); |
} |
// Wait for them all to swallow the pill and die. |
for (int i = 0; i < fThreads.count(); i++) { |
fThreads[i]->join(); |
} |
- SkASSERT(fWork.isEmpty()); // Can't hurt to double check. |
+ SkASSERT(fWork.empty()); // Can't hurt to double check. |
fThreads.deleteAll(); |
} |
- void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { |
- Work work = { fn, arg, pending }; |
+ void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) { |
+ Work work = { fn, pending }; |
pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. |
{ |
AutoLock lock(&fWorkLock); |
- fWork.push(work); |
+ fWork.push_back(work); |
} |
fWorkAvailable.signal(1); |
} |
- void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int32_t>* pending) { |
+ void batch(std::function<void(int)> fn, int N, SkAtomic<int32_t>* pending) { |
pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. |
{ |
AutoLock lock(&fWorkLock); |
- Work* batch = fWork.append(N); |
for (int i = 0; i < N; i++) { |
- Work work = { fn, (char*)arg + i*stride, pending }; |
- batch[i] = work; |
+ Work work = { [i, fn]() { fn(i); }, pending }; |
+ fWork.push_back(work); |
} |
} |
fWorkAvailable.signal(N); |
@@ -163,24 +162,25 @@ private: |
pool->fWorkAvailable.wait(); |
{ |
AutoLock lock(&pool->fWorkLock); |
- if (pool->fWork.isEmpty()) { |
+ if (pool->fWork.empty()) { |
// Someone in Wait() stole our work (fWorkAvailable is an upper bound). |
// Well, that's fine, back to sleep for us. |
continue; |
} |
- pool->fWork.pop(&work); |
+ work = pool->fWork.back(); |
+ pool->fWork.pop_back(); |
} |
if (!work.fn) { |
return; // Poison pill. Time... to die. |
} |
- work.fn(work.arg); |
+ work.fn(); |
work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait(). |
} |
} |
// fWorkLock must be held when reading or modifying fWork. |
SkSpinlock fWorkLock; |
- SkTDArray<Work> fWork; |
+ SkTArray<Work> fWork; |
// A thread-safe upper bound for fWork.count(). |
// |
@@ -215,9 +215,9 @@ SkTaskGroup::SkTaskGroup() : fPending(0) {} |
void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } |
void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); } |
-void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); } |
-void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { |
- ThreadPool::Batch(fn, args, N, stride, &fPending); |
+void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPending); } |
+void SkTaskGroup::batch (std::function<void(int)> fn, int N) { |
+ ThreadPool::Batch(fn, N, &fPending); |
} |
int sk_parallel_for_thread_count() { |