Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 /* | 1 /* |
| 2 * Copyright 2014 Google Inc. | 2 * Copyright 2014 Google Inc. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license that can be | 4 * Use of this source code is governed by a BSD-style license that can be |
| 5 * found in the LICENSE file. | 5 * found in the LICENSE file. |
| 6 */ | 6 */ |
| 7 | 7 |
| 8 #include "SkOnce.h" | 8 #include "SkOnce.h" |
| 9 #include "SkRunnable.h" | 9 #include "SkRunnable.h" |
| 10 #include "SkSemaphore.h" | 10 #include "SkSemaphore.h" |
| 11 #include "SkSpinlock.h" | 11 #include "SkSpinlock.h" |
| 12 #include "SkTArray.h" | |
| 12 #include "SkTDArray.h" | 13 #include "SkTDArray.h" |
| 13 #include "SkTaskGroup.h" | 14 #include "SkTaskGroup.h" |
| 14 #include "SkThreadUtils.h" | 15 #include "SkThreadUtils.h" |
| 15 | 16 |
| 16 #if defined(SK_BUILD_FOR_WIN32) | 17 #if defined(SK_BUILD_FOR_WIN32) |
| 17 static void query_num_cores(int* num_cores) { | 18 static void query_num_cores(int* num_cores) { |
| 18 SYSTEM_INFO sysinfo; | 19 SYSTEM_INFO sysinfo; |
| 19 GetNativeSystemInfo(&sysinfo); | 20 GetNativeSystemInfo(&sysinfo); |
| 20 *num_cores = sysinfo.dwNumberOfProcessors; | 21 *num_cores = sysinfo.dwNumberOfProcessors; |
| 21 } | 22 } |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 36 } | 37 } |
| 37 | 38 |
| 38 namespace { | 39 namespace { |
| 39 | 40 |
| 40 class ThreadPool : SkNoncopyable { | 41 class ThreadPool : SkNoncopyable { |
| 41 public: | 42 public: |
| 42 static void Add(SkRunnable* task, SkAtomic<int32_t>* pending) { | 43 static void Add(SkRunnable* task, SkAtomic<int32_t>* pending) { |
| 43 if (!gGlobal) { // If we have no threads, run synchronously. | 44 if (!gGlobal) { // If we have no threads, run synchronously. |
| 44 return task->run(); | 45 return task->run(); |
| 45 } | 46 } |
| 46 gGlobal->add(&CallRunnable, task, pending); | 47 gGlobal->add([task]() { task->run(); }, pending); |
| 47 } | 48 } |
| 48 | 49 |
| 49 static void Add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { | 50 static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) { |
| 50 if (!gGlobal) { | 51 if (!gGlobal) { |
| 51 return fn(arg); | 52 return fn(); |
| 52 } | 53 } |
| 53 gGlobal->add(fn, arg, pending); | 54 gGlobal->add(fn, pending); |
| 54 } | 55 } |
| 55 | 56 |
| 56 static void Batch(void (*fn)(void*), void* args, int N, size_t stride, | 57 static void Batch(std::function<void(int)> fn, int N, SkAtomic<int32_t>* pen ding) { |
| 57 SkAtomic<int32_t>* pending) { | |
| 58 if (!gGlobal) { | 58 if (!gGlobal) { |
| 59 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } | 59 for (int i = 0; i < N; i++) { fn(i); } |
| 60 return; | 60 return; |
| 61 } | 61 } |
| 62 gGlobal->batch(fn, args, N, stride, pending); | 62 gGlobal->batch(fn, N, pending); |
| 63 } | 63 } |
| 64 | 64 |
| 65 static void Wait(SkAtomic<int32_t>* pending) { | 65 static void Wait(SkAtomic<int32_t>* pending) { |
| 66 if (!gGlobal) { // If we have no threads, the work must already be done . | 66 if (!gGlobal) { // If we have no threads, the work must already be done . |
| 67 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); | 67 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); |
| 68 return; | 68 return; |
| 69 } | 69 } |
| 70 // Acquire pairs with decrement release here or in Loop. | 70 // Acquire pairs with decrement release here or in Loop. |
| 71 while (pending->load(sk_memory_order_acquire) > 0) { | 71 while (pending->load(sk_memory_order_acquire) > 0) { |
| 72 // Lend a hand until our SkTaskGroup of interest is done. | 72 // Lend a hand until our SkTaskGroup of interest is done. |
| 73 Work work; | 73 Work work; |
| 74 { | 74 { |
| 75 // We're stealing work opportunistically, | 75 // We're stealing work opportunistically, |
| 76 // so we never call fWorkAvailable.wait(), which could sleep us if there's no work. | 76 // so we never call fWorkAvailable.wait(), which could sleep us if there's no work. |
| 77 // This means fWorkAvailable is only an upper bound on fWork.cou nt(). | 77 // This means fWorkAvailable is only an upper bound on fWork.cou nt(). |
| 78 AutoLock lock(&gGlobal->fWorkLock); | 78 AutoLock lock(&gGlobal->fWorkLock); |
| 79 if (gGlobal->fWork.isEmpty()) { | 79 if (gGlobal->fWork.empty()) { |
| 80 // Someone has picked up all the work (including ours). How nice of them! | 80 // Someone has picked up all the work (including ours). How nice of them! |
| 81 // (They may still be working on it, so we can't assert *pen ding == 0 here.) | 81 // (They may still be working on it, so we can't assert *pen ding == 0 here.) |
| 82 continue; | 82 continue; |
| 83 } | 83 } |
| 84 gGlobal->fWork.pop(&work); | 84 work = gGlobal->fWork.back(); |
| 85 gGlobal->fWork.pop_back(); | |
| 85 } | 86 } |
| 86 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. | 87 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. |
| 87 // We threads gotta stick together. We're always making forward pro gress. | 88 // We threads gotta stick together. We're always making forward pro gress. |
| 88 work.fn(work.arg); | 89 work.fn(); |
| 89 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above. | 90 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above. |
| 90 } | 91 } |
| 91 } | 92 } |
| 92 | 93 |
| 93 private: | 94 private: |
| 94 struct AutoLock { | 95 struct AutoLock { |
| 95 AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } | 96 AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } |
| 96 ~AutoLock() { fLock->release(); } | 97 ~AutoLock() { fLock->release(); } |
| 97 private: | 98 private: |
| 98 SkSpinlock* fLock; | 99 SkSpinlock* fLock; |
| 99 }; | 100 }; |
| 100 | 101 |
| 101 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); } | 102 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); } |
| 102 | 103 |
| 103 struct Work { | 104 struct Work { |
| 104 void (*fn)(void*); // A function to call, | 105 std::function<void(void)> fn; // A function to call |
| 105 void* arg; // its argument, | |
| 106 SkAtomic<int32_t>* pending; // then decrement pending afterwards. | 106 SkAtomic<int32_t>* pending; // then decrement pending afterwards. |
| 107 }; | 107 }; |
| 108 | 108 |
| 109 explicit ThreadPool(int threads) { | 109 explicit ThreadPool(int threads) { |
| 110 if (threads == -1) { | 110 if (threads == -1) { |
| 111 threads = sk_num_cores(); | 111 threads = sk_num_cores(); |
| 112 } | 112 } |
| 113 for (int i = 0; i < threads; i++) { | 113 for (int i = 0; i < threads; i++) { |
| 114 fThreads.push(new SkThread(&ThreadPool::Loop, this)); | 114 fThreads.push(new SkThread(&ThreadPool::Loop, this)); |
| 115 fThreads.top()->start(); | 115 fThreads.top()->start(); |
| 116 } | 116 } |
| 117 } | 117 } |
| 118 | 118 |
| 119 ~ThreadPool() { | 119 ~ThreadPool() { |
| 120 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n ow. | 120 SkASSERT(fWork.empty()); // All SkTaskGroups should be destroyed by now . |
| 121 | 121 |
| 122 // Send a poison pill to each thread. | 122 // Send a poison pill to each thread. |
| 123 SkAtomic<int> dummy(0); | 123 SkAtomic<int> dummy(0); |
| 124 for (int i = 0; i < fThreads.count(); i++) { | 124 for (int i = 0; i < fThreads.count(); i++) { |
| 125 this->add(nullptr, nullptr, &dummy); | 125 this->add(nullptr, &dummy); |
|
mtklein
2015/12/10 20:58:35
So does nullptr turn into a std::function<void(voi
herb_g
2015/12/10 21:58:35
Yes. Yes it does. I got lucky.
| |
| 126 } | 126 } |
| 127 // Wait for them all to swallow the pill and die. | 127 // Wait for them all to swallow the pill and die. |
| 128 for (int i = 0; i < fThreads.count(); i++) { | 128 for (int i = 0; i < fThreads.count(); i++) { |
| 129 fThreads[i]->join(); | 129 fThreads[i]->join(); |
| 130 } | 130 } |
| 131 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | 131 SkASSERT(fWork.empty()); // Can't hurt to double check. |
| 132 fThreads.deleteAll(); | 132 fThreads.deleteAll(); |
| 133 } | 133 } |
| 134 | 134 |
| 135 void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { | 135 void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) { |
| 136 Work work = { fn, arg, pending }; | 136 Work work = { fn, pending }; |
| 137 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. | 137 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. |
| 138 { | 138 { |
| 139 AutoLock lock(&fWorkLock); | 139 AutoLock lock(&fWorkLock); |
| 140 fWork.push(work); | 140 fWork.push_back(work); |
| 141 } | 141 } |
| 142 fWorkAvailable.signal(1); | 142 fWorkAvailable.signal(1); |
| 143 } | 143 } |
| 144 | 144 |
| 145 void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3 2_t>* pending) { | 145 void batch(std::function<void(int)> fn, int N, SkAtomic<int32_t>* pending) { |
| 146 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. | 146 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. |
| 147 { | 147 { |
| 148 AutoLock lock(&fWorkLock); | 148 AutoLock lock(&fWorkLock); |
| 149 Work* batch = fWork.append(N); | |
| 150 for (int i = 0; i < N; i++) { | 149 for (int i = 0; i < N; i++) { |
| 151 Work work = { fn, (char*)arg + i*stride, pending }; | 150 Work work = { [i, fn]() { fn(i); }, pending }; |
| 152 batch[i] = work; | 151 fWork.push_back(work); |
| 153 } | 152 } |
| 154 } | 153 } |
| 155 fWorkAvailable.signal(N); | 154 fWorkAvailable.signal(N); |
| 156 } | 155 } |
| 157 | 156 |
| 158 static void Loop(void* arg) { | 157 static void Loop(void* arg) { |
| 159 ThreadPool* pool = (ThreadPool*)arg; | 158 ThreadPool* pool = (ThreadPool*)arg; |
| 160 Work work; | 159 Work work; |
| 161 while (true) { | 160 while (true) { |
| 162 // Sleep until there's work available, and claim one unit of Work as we wake. | 161 // Sleep until there's work available, and claim one unit of Work as we wake. |
| 163 pool->fWorkAvailable.wait(); | 162 pool->fWorkAvailable.wait(); |
| 164 { | 163 { |
| 165 AutoLock lock(&pool->fWorkLock); | 164 AutoLock lock(&pool->fWorkLock); |
| 166 if (pool->fWork.isEmpty()) { | 165 if (pool->fWork.empty()) { |
| 167 // Someone in Wait() stole our work (fWorkAvailable is an up per bound). | 166 // Someone in Wait() stole our work (fWorkAvailable is an up per bound). |
| 168 // Well, that's fine, back to sleep for us. | 167 // Well, that's fine, back to sleep for us. |
| 169 continue; | 168 continue; |
| 170 } | 169 } |
| 171 pool->fWork.pop(&work); | 170 work = pool->fWork.back(); |
| 171 pool->fWork.pop_back(); | |
| 172 } | 172 } |
| 173 if (!work.fn) { | 173 if (!work.fn) { |
| 174 return; // Poison pill. Time... to die. | 174 return; // Poison pill. Time... to die. |
| 175 } | 175 } |
| 176 work.fn(work.arg); | 176 work.fn(); |
| 177 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait(). | 177 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait(). |
| 178 } | 178 } |
| 179 } | 179 } |
| 180 | 180 |
| 181 // fWorkLock must be held when reading or modifying fWork. | 181 // fWorkLock must be held when reading or modifying fWork. |
| 182 SkSpinlock fWorkLock; | 182 SkSpinlock fWorkLock; |
| 183 SkTDArray<Work> fWork; | 183 SkTArray<Work> fWork; |
| 184 | 184 |
| 185 // A thread-safe upper bound for fWork.count(). | 185 // A thread-safe upper bound for fWork.count(). |
| 186 // | 186 // |
| 187 // We'd have it be an exact count but for the loop in Wait(): | 187 // We'd have it be an exact count but for the loop in Wait(): |
| 188 // we never want that to block, so it can't call fWorkAvailable.wait(), | 188 // we never want that to block, so it can't call fWorkAvailable.wait(), |
| 189 // and that's the only way to decrement fWorkAvailable. | 189 // and that's the only way to decrement fWorkAvailable. |
| 190 // So fWorkAvailable may overcount actual the work available. | 190 // So fWorkAvailable may overcount actual the work available. |
| 191 // We make do, but this means some worker threads may wake spuriously. | 191 // We make do, but this means some worker threads may wake spuriously. |
| 192 SkSemaphore fWorkAvailable; | 192 SkSemaphore fWorkAvailable; |
| 193 | 193 |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 208 ThreadPool::gGlobal = new ThreadPool(threads); | 208 ThreadPool::gGlobal = new ThreadPool(threads); |
| 209 } | 209 } |
| 210 } | 210 } |
| 211 | 211 |
| 212 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; } | 212 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; } |
| 213 | 213 |
| 214 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 214 SkTaskGroup::SkTaskGroup() : fPending(0) {} |
| 215 | 215 |
| 216 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending ); } | 216 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending ); } |
| 217 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe nding); } | 217 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe nding); } |
| 218 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, & fPending); } | 218 void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPend ing); } |
| 219 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { | 219 void SkTaskGroup::batch (std::function<void(int)> fn, int N) { |
| 220 ThreadPool::Batch(fn, args, N, stride, &fPending); | 220 ThreadPool::Batch(fn, N, &fPending); |
| 221 } | 221 } |
| 222 | 222 |
| 223 int sk_parallel_for_thread_count() { | 223 int sk_parallel_for_thread_count() { |
| 224 if (ThreadPool::gGlobal != nullptr) { | 224 if (ThreadPool::gGlobal != nullptr) { |
| 225 return ThreadPool::gGlobal->fThreads.count(); | 225 return ThreadPool::gGlobal->fThreads.count(); |
| 226 } | 226 } |
| 227 return 0; | 227 return 0; |
| 228 } | 228 } |
| OLD | NEW |