| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2014 Google Inc. | 2 * Copyright 2014 Google Inc. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license that can be | 4 * Use of this source code is governed by a BSD-style license that can be |
| 5 * found in the LICENSE file. | 5 * found in the LICENSE file. |
| 6 */ | 6 */ |
| 7 | 7 |
| 8 #include "SkRunnable.h" |
| 9 #include "SkSemaphore.h" |
| 10 #include "SkSpinlock.h" |
| 11 #include "SkTDArray.h" |
| 8 #include "SkTaskGroup.h" | 12 #include "SkTaskGroup.h" |
| 9 | |
| 10 #include "SkCondVar.h" | |
| 11 #include "SkRunnable.h" | |
| 12 #include "SkTDArray.h" | |
| 13 #include "SkThread.h" | |
| 14 #include "SkThreadUtils.h" | 13 #include "SkThreadUtils.h" |
| 15 | 14 |
| 16 #if defined(SK_BUILD_FOR_WIN32) | 15 #if defined(SK_BUILD_FOR_WIN32) |
| 17 static inline int num_cores() { | 16 static inline int num_cores() { |
| 18 SYSTEM_INFO sysinfo; | 17 SYSTEM_INFO sysinfo; |
| 19 GetSystemInfo(&sysinfo); | 18 GetSystemInfo(&sysinfo); |
| 20 return sysinfo.dwNumberOfProcessors; | 19 return sysinfo.dwNumberOfProcessors; |
| 21 } | 20 } |
| 22 #else | 21 #else |
| 23 #include <unistd.h> | 22 #include <unistd.h> |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 56 static void Wait(SkAtomic<int32_t>* pending) { | 55 static void Wait(SkAtomic<int32_t>* pending) { |
| 57 if (!gGlobal) { // If we have no threads, the work must already be done
. | 56 if (!gGlobal) { // If we have no threads, the work must already be done
. |
| 58 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); | 57 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); |
| 59 return; | 58 return; |
| 60 } | 59 } |
| 61 // Acquire pairs with decrement release here or in Loop. | 60 // Acquire pairs with decrement release here or in Loop. |
| 62 while (pending->load(sk_memory_order_acquire) > 0) { | 61 while (pending->load(sk_memory_order_acquire) > 0) { |
| 63 // Lend a hand until our SkTaskGroup of interest is done. | 62 // Lend a hand until our SkTaskGroup of interest is done. |
| 64 Work work; | 63 Work work; |
| 65 { | 64 { |
| 66 AutoLock lock(&gGlobal->fReady); | 65 // We're stealing work opportunistically, |
| 66 // so we never call fWorkAvailable.wait(), which could sleep us
if there's no work. |
| 67 // This means fWorkAvailable is only an upper bound on fWork.cou
nt(). |
| 68 AutoLock lock(&gGlobal->fWorkLock); |
| 67 if (gGlobal->fWork.isEmpty()) { | 69 if (gGlobal->fWork.isEmpty()) { |
| 68 // Someone has picked up all the work (including ours). How
nice of them! | 70 // Someone has picked up all the work (including ours). How
nice of them! |
| 69 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) | 71 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) |
| 70 continue; | 72 continue; |
| 71 } | 73 } |
| 72 gGlobal->fWork.pop(&work); | 74 gGlobal->fWork.pop(&work); |
| 73 } | 75 } |
| 74 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. | 76 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. |
| 75 // We threads gotta stick together. We're always making forward pro
gress. | 77 // We threads gotta stick together. We're always making forward pro
gress. |
| 76 work.fn(work.arg); | 78 work.fn(work.arg); |
| 77 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load above. | 79 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load above. |
| 78 } | 80 } |
| 79 } | 81 } |
| 80 | 82 |
| 81 private: | 83 private: |
| 82 struct AutoLock { | 84 struct AutoLock { |
| 83 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } | 85 AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } |
| 84 ~AutoLock() { fC->unlock(); } | 86 ~AutoLock() { fLock->release(); } |
| 85 private: | 87 private: |
| 86 SkCondVar* fC; | 88 SkSpinlock* fLock; |
| 87 }; | 89 }; |
| 88 | 90 |
| 89 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} | 91 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} |
| 90 | 92 |
| 91 struct Work { | 93 struct Work { |
| 92 void (*fn)(void*); // A function to call, | 94 void (*fn)(void*); // A function to call, |
| 93 void* arg; // its argument, | 95 void* arg; // its argument, |
| 94 SkAtomic<int32_t>* pending; // then decrement pending afterwards. | 96 SkAtomic<int32_t>* pending; // then decrement pending afterwards. |
| 95 }; | 97 }; |
| 96 | 98 |
| 97 explicit ThreadPool(int threads) : fDraining(false) { | 99 explicit ThreadPool(int threads) { |
| 98 if (threads == -1) { | 100 if (threads == -1) { |
| 99 threads = num_cores(); | 101 threads = num_cores(); |
| 100 } | 102 } |
| 101 for (int i = 0; i < threads; i++) { | 103 for (int i = 0; i < threads; i++) { |
| 102 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | 104 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); |
| 103 fThreads.top()->start(); | 105 fThreads.top()->start(); |
| 104 } | 106 } |
| 105 } | 107 } |
| 106 | 108 |
| 107 ~ThreadPool() { | 109 ~ThreadPool() { |
| 108 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. | 110 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. |
| 109 { | 111 |
| 110 AutoLock lock(&fReady); | 112 // Send a poison pill to each thread. |
| 111 fDraining = true; | 113 SkAtomic<int> dummy(0); |
| 112 fReady.broadcast(); | 114 for (int i = 0; i < fThreads.count(); i++) { |
| 115 this->add(NULL, NULL, &dummy); |
| 113 } | 116 } |
| 117 // Wait for them all to swallow the pill and die. |
| 114 for (int i = 0; i < fThreads.count(); i++) { | 118 for (int i = 0; i < fThreads.count(); i++) { |
| 115 fThreads[i]->join(); | 119 fThreads[i]->join(); |
| 116 } | 120 } |
| 117 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | 121 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. |
| 118 fThreads.deleteAll(); | 122 fThreads.deleteAll(); |
| 119 } | 123 } |
| 120 | 124 |
| 121 void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { | 125 void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { |
| 122 Work work = { fn, arg, pending }; | 126 Work work = { fn, arg, pending }; |
| 123 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. | 127 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. |
| 124 { | 128 { |
| 125 AutoLock lock(&fReady); | 129 AutoLock lock(&fWorkLock); |
| 126 fWork.push(work); | 130 fWork.push(work); |
| 127 fReady.signal(); | |
| 128 } | 131 } |
| 132 fWorkAvailable.signal(1); |
| 129 } | 133 } |
| 130 | 134 |
| 131 void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3
2_t>* pending) { | 135 void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3
2_t>* pending) { |
| 132 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. | 136 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. |
| 133 { | 137 { |
| 134 AutoLock lock(&fReady); | 138 AutoLock lock(&fWorkLock); |
| 135 Work* batch = fWork.append(N); | 139 Work* batch = fWork.append(N); |
| 136 for (int i = 0; i < N; i++) { | 140 for (int i = 0; i < N; i++) { |
| 137 Work work = { fn, (char*)arg + i*stride, pending }; | 141 Work work = { fn, (char*)arg + i*stride, pending }; |
| 138 batch[i] = work; | 142 batch[i] = work; |
| 139 } | 143 } |
| 140 fReady.broadcast(); | |
| 141 } | 144 } |
| 145 fWorkAvailable.signal(N); |
| 142 } | 146 } |
| 143 | 147 |
| 144 static void Loop(void* arg) { | 148 static void Loop(void* arg) { |
| 145 ThreadPool* pool = (ThreadPool*)arg; | 149 ThreadPool* pool = (ThreadPool*)arg; |
| 146 Work work; | 150 Work work; |
| 147 while (true) { | 151 while (true) { |
| 152 // Sleep until there's work available, and claim one unit of Work as
we wake. |
| 153 pool->fWorkAvailable.wait(); |
| 148 { | 154 { |
| 149 AutoLock lock(&pool->fReady); | 155 AutoLock lock(&pool->fWorkLock); |
| 150 while (pool->fWork.isEmpty()) { | 156 if (pool->fWork.isEmpty()) { |
| 151 if (pool->fDraining) { | 157 // Someone in Wait() stole our work (fWorkAvailable is an up
per bound). |
| 152 return; | 158 // Well, that's fine, back to sleep for us. |
| 153 } | 159 continue; |
| 154 pool->fReady.wait(); | |
| 155 } | 160 } |
| 156 pool->fWork.pop(&work); | 161 pool->fWork.pop(&work); |
| 157 } | 162 } |
| 163 if (!work.fn) { |
| 164 return; // Poison pill. Time... to die. |
| 165 } |
| 158 work.fn(work.arg); | 166 work.fn(work.arg); |
| 159 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load in Wait(). | 167 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load in Wait(). |
| 160 } | 168 } |
| 161 } | 169 } |
| 162 | 170 |
| 163 SkTDArray<Work> fWork; | 171 // fWorkLock must be held when reading or modifying fWork. |
| 172 SkSpinlock fWorkLock; |
| 173 SkTDArray<Work> fWork; |
| 174 |
| 175 // A thread-safe upper bound for fWork.count(). |
| 176 // |
| 177 // We'd have it be an exact count but for the loop in Wait(): |
| 178 // we never want that to block, so it can't call fWorkAvailable.wait(), |
| 179 // and that's the only way to decrement fWorkAvailable. |
| 180 // So fWorkAvailable may overcount actual the work available. |
| 181 // We make do, but this means some worker threads may wake spuriously. |
| 182 SkSemaphore fWorkAvailable; |
| 183 |
| 184 // These are only changed in a single-threaded context. |
| 164 SkTDArray<SkThread*> fThreads; | 185 SkTDArray<SkThread*> fThreads; |
| 165 SkCondVar fReady; | 186 static ThreadPool* gGlobal; |
| 166 bool fDraining; | |
| 167 | 187 |
| 168 static ThreadPool* gGlobal; | |
| 169 friend struct SkTaskGroup::Enabler; | 188 friend struct SkTaskGroup::Enabler; |
| 170 }; | 189 }; |
| 171 ThreadPool* ThreadPool::gGlobal = NULL; | 190 ThreadPool* ThreadPool::gGlobal = NULL; |
| 172 | 191 |
| 173 } // namespace | 192 } // namespace |
| 174 | 193 |
| 175 SkTaskGroup::Enabler::Enabler(int threads) { | 194 SkTaskGroup::Enabler::Enabler(int threads) { |
| 176 SkASSERT(ThreadPool::gGlobal == NULL); | 195 SkASSERT(ThreadPool::gGlobal == NULL); |
| 177 if (threads != 0 && SkCondVar::Supported()) { | 196 if (threads != 0) { |
| 178 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); | 197 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); |
| 179 } | 198 } |
| 180 } | 199 } |
| 181 | 200 |
| 182 SkTaskGroup::Enabler::~Enabler() { | 201 SkTaskGroup::Enabler::~Enabler() { |
| 183 SkDELETE(ThreadPool::gGlobal); | 202 SkDELETE(ThreadPool::gGlobal); |
| 184 } | 203 } |
| 185 | 204 |
| 186 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 205 SkTaskGroup::SkTaskGroup() : fPending(0) {} |
| 187 | 206 |
| 188 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } | 207 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } |
| 189 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } | 208 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } |
| 190 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } | 209 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } |
| 191 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { | 210 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { |
| 192 ThreadPool::Batch(fn, args, N, stride, &fPending); | 211 ThreadPool::Batch(fn, args, N, stride, &fPending); |
| 193 } | 212 } |
| 194 | 213 |
| OLD | NEW |