| OLD | NEW |
| 1 /* |
| 2 * Copyright 2014 Google Inc. |
| 3 * |
| 4 * Use of this source code is governed by a BSD-style license that can be |
| 5 * found in the LICENSE file. |
| 6 */ |
| 7 |
| 1 #include "SkTaskGroup.h" | 8 #include "SkTaskGroup.h" |
| 2 | 9 |
| 3 #include "SkCondVar.h" | 10 #include "SkCondVar.h" |
| 4 #include "SkRunnable.h" | 11 #include "SkRunnable.h" |
| 5 #include "SkTDArray.h" | 12 #include "SkTDArray.h" |
| 6 #include "SkThread.h" | 13 #include "SkThread.h" |
| 7 #include "SkThreadUtils.h" | 14 #include "SkThreadUtils.h" |
| 8 | 15 |
| 9 #if defined(SK_BUILD_FOR_WIN32) | 16 #if defined(SK_BUILD_FOR_WIN32) |
| 10 static inline int num_cores() { | 17 static inline int num_cores() { |
| 11 SYSTEM_INFO sysinfo; | 18 SYSTEM_INFO sysinfo; |
| 12 GetSystemInfo(&sysinfo); | 19 GetSystemInfo(&sysinfo); |
| 13 return sysinfo.dwNumberOfProcessors; | 20 return sysinfo.dwNumberOfProcessors; |
| 14 } | 21 } |
| 15 #else | 22 #else |
| 16 #include <unistd.h> | 23 #include <unistd.h> |
| 17 static inline int num_cores() { | 24 static inline int num_cores() { |
| 18 return (int) sysconf(_SC_NPROCESSORS_ONLN); | 25 return (int) sysconf(_SC_NPROCESSORS_ONLN); |
| 19 } | 26 } |
| 20 #endif | 27 #endif |
| 21 | 28 |
| 22 namespace { | 29 namespace { |
| 23 | 30 |
| 24 class ThreadPool : SkNoncopyable { | 31 class ThreadPool : SkNoncopyable { |
| 25 public: | 32 public: |
| 26 static void Add(SkRunnable* task, int32_t* pending) { | 33 static void Add(SkRunnable* task, SkAtomic<int32_t>* pending) { |
| 27 if (!gGlobal) { // If we have no threads, run synchronously. | 34 if (!gGlobal) { // If we have no threads, run synchronously. |
| 28 return task->run(); | 35 return task->run(); |
| 29 } | 36 } |
| 30 gGlobal->add(&CallRunnable, task, pending); | 37 gGlobal->add(&CallRunnable, task, pending); |
| 31 } | 38 } |
| 32 | 39 |
| 33 static void Add(void (*fn)(void*), void* arg, int32_t* pending) { | 40 static void Add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { |
| 34 if (!gGlobal) { | 41 if (!gGlobal) { |
| 35 return fn(arg); | 42 return fn(arg); |
| 36 } | 43 } |
| 37 gGlobal->add(fn, arg, pending); | 44 gGlobal->add(fn, arg, pending); |
| 38 } | 45 } |
| 39 | 46 |
| 40 static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32
_t* pending) { | 47 static void Batch(void (*fn)(void*), void* args, int N, size_t stride, |
| 48 SkAtomic<int32_t>* pending) { |
| 41 if (!gGlobal) { | 49 if (!gGlobal) { |
| 42 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } | 50 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } |
| 43 return; | 51 return; |
| 44 } | 52 } |
| 45 gGlobal->batch(fn, args, N, stride, pending); | 53 gGlobal->batch(fn, args, N, stride, pending); |
| 46 } | 54 } |
| 47 | 55 |
| 48 static void Wait(int32_t* pending) { | 56 static void Wait(SkAtomic<int32_t>* pending) { |
| 49 if (!gGlobal) { // If we have no threads, the work must already be done
. | 57 if (!gGlobal) { // If we have no threads, the work must already be done
. |
| 50 SkASSERT(*pending == 0); | 58 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); |
| 51 return; | 59 return; |
| 52 } | 60 } |
| 53 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here
or in Loop. | 61 // Acquire pairs with decrement release here or in Loop. |
| 62 while (pending->load(sk_memory_order_acquire) > 0) { |
| 54 // Lend a hand until our SkTaskGroup of interest is done. | 63 // Lend a hand until our SkTaskGroup of interest is done. |
| 55 Work work; | 64 Work work; |
| 56 { | 65 { |
| 57 AutoLock lock(&gGlobal->fReady); | 66 AutoLock lock(&gGlobal->fReady); |
| 58 if (gGlobal->fWork.isEmpty()) { | 67 if (gGlobal->fWork.isEmpty()) { |
| 59 // Someone has picked up all the work (including ours). How
nice of them! | 68 // Someone has picked up all the work (including ours). How
nice of them! |
| 60 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) | 69 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) |
| 61 continue; | 70 continue; |
| 62 } | 71 } |
| 63 gGlobal->fWork.pop(&work); | 72 gGlobal->fWork.pop(&work); |
| 64 } | 73 } |
| 65 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. | 74 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. |
| 66 // We threads gotta stick together. We're always making forward pro
gress. | 75 // We threads gotta stick together. We're always making forward pro
gress. |
| 67 work.fn(work.arg); | 76 work.fn(work.arg); |
| 68 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_l
oad() just above. | 77 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load above. |
| 69 } | 78 } |
| 70 } | 79 } |
| 71 | 80 |
| 72 private: | 81 private: |
| 73 struct AutoLock { | 82 struct AutoLock { |
| 74 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } | 83 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } |
| 75 ~AutoLock() { fC->unlock(); } | 84 ~AutoLock() { fC->unlock(); } |
| 76 private: | 85 private: |
| 77 SkCondVar* fC; | 86 SkCondVar* fC; |
| 78 }; | 87 }; |
| 79 | 88 |
| 80 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} | 89 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} |
| 81 | 90 |
| 82 struct Work { | 91 struct Work { |
| 83 void (*fn)(void*); // A function to call, | 92 void (*fn)(void*); // A function to call, |
| 84 void* arg; // its argument, | 93 void* arg; // its argument, |
| 85 int32_t* pending; // then sk_atomic_dec(pending) afterwards. | 94 SkAtomic<int32_t>* pending; // then decrement pending afterwards. |
| 86 }; | 95 }; |
| 87 | 96 |
| 88 explicit ThreadPool(int threads) : fDraining(false) { | 97 explicit ThreadPool(int threads) : fDraining(false) { |
| 89 if (threads == -1) { | 98 if (threads == -1) { |
| 90 threads = num_cores(); | 99 threads = num_cores(); |
| 91 } | 100 } |
| 92 for (int i = 0; i < threads; i++) { | 101 for (int i = 0; i < threads; i++) { |
| 93 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | 102 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); |
| 94 fThreads.top()->start(); | 103 fThreads.top()->start(); |
| 95 } | 104 } |
| 96 } | 105 } |
| 97 | 106 |
| 98 ~ThreadPool() { | 107 ~ThreadPool() { |
| 99 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. | 108 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. |
| 100 { | 109 { |
| 101 AutoLock lock(&fReady); | 110 AutoLock lock(&fReady); |
| 102 fDraining = true; | 111 fDraining = true; |
| 103 fReady.broadcast(); | 112 fReady.broadcast(); |
| 104 } | 113 } |
| 105 for (int i = 0; i < fThreads.count(); i++) { | 114 for (int i = 0; i < fThreads.count(); i++) { |
| 106 fThreads[i]->join(); | 115 fThreads[i]->join(); |
| 107 } | 116 } |
| 108 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | 117 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. |
| 109 fThreads.deleteAll(); | 118 fThreads.deleteAll(); |
| 110 } | 119 } |
| 111 | 120 |
| 112 void add(void (*fn)(void*), void* arg, int32_t* pending) { | 121 void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { |
| 113 Work work = { fn, arg, pending }; | 122 Work work = { fn, arg, pending }; |
| 114 sk_atomic_inc(pending); // No barrier needed. | 123 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. |
| 115 { | 124 { |
| 116 AutoLock lock(&fReady); | 125 AutoLock lock(&fReady); |
| 117 fWork.push(work); | 126 fWork.push(work); |
| 118 fReady.signal(); | 127 fReady.signal(); |
| 119 } | 128 } |
| 120 } | 129 } |
| 121 | 130 |
| 122 void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pend
ing) { | 131 void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3
2_t>* pending) { |
| 123 sk_atomic_add(pending, N); // No barrier needed. | 132 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. |
| 124 { | 133 { |
| 125 AutoLock lock(&fReady); | 134 AutoLock lock(&fReady); |
| 126 Work* batch = fWork.append(N); | 135 Work* batch = fWork.append(N); |
| 127 for (int i = 0; i < N; i++) { | 136 for (int i = 0; i < N; i++) { |
| 128 Work work = { fn, (char*)arg + i*stride, pending }; | 137 Work work = { fn, (char*)arg + i*stride, pending }; |
| 129 batch[i] = work; | 138 batch[i] = work; |
| 130 } | 139 } |
| 131 fReady.broadcast(); | 140 fReady.broadcast(); |
| 132 } | 141 } |
| 133 } | 142 } |
| 134 | 143 |
| 135 static void Loop(void* arg) { | 144 static void Loop(void* arg) { |
| 136 ThreadPool* pool = (ThreadPool*)arg; | 145 ThreadPool* pool = (ThreadPool*)arg; |
| 137 Work work; | 146 Work work; |
| 138 while (true) { | 147 while (true) { |
| 139 { | 148 { |
| 140 AutoLock lock(&pool->fReady); | 149 AutoLock lock(&pool->fReady); |
| 141 while (pool->fWork.isEmpty()) { | 150 while (pool->fWork.isEmpty()) { |
| 142 if (pool->fDraining) { | 151 if (pool->fDraining) { |
| 143 return; | 152 return; |
| 144 } | 153 } |
| 145 pool->fReady.wait(); | 154 pool->fReady.wait(); |
| 146 } | 155 } |
| 147 pool->fWork.pop(&work); | 156 pool->fWork.pop(&work); |
| 148 } | 157 } |
| 149 work.fn(work.arg); | 158 work.fn(work.arg); |
| 150 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load(
) in Wait(). | 159 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load in Wait(). |
| 151 } | 160 } |
| 152 } | 161 } |
| 153 | 162 |
| 154 SkTDArray<Work> fWork; | 163 SkTDArray<Work> fWork; |
| 155 SkTDArray<SkThread*> fThreads; | 164 SkTDArray<SkThread*> fThreads; |
| 156 SkCondVar fReady; | 165 SkCondVar fReady; |
| 157 bool fDraining; | 166 bool fDraining; |
| 158 | 167 |
| 159 static ThreadPool* gGlobal; | 168 static ThreadPool* gGlobal; |
| 160 friend struct SkTaskGroup::Enabler; | 169 friend struct SkTaskGroup::Enabler; |
| (...skipping 15 matching lines...) Expand all Loading... |
| 176 | 185 |
| 177 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 186 SkTaskGroup::SkTaskGroup() : fPending(0) {} |
| 178 | 187 |
| 179 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } | 188 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } |
| 180 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } | 189 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } |
| 181 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } | 190 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } |
| 182 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { | 191 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { |
| 183 ThreadPool::Batch(fn, args, N, stride, &fPending); | 192 ThreadPool::Batch(fn, args, N, stride, &fPending); |
| 184 } | 193 } |
| 185 | 194 |
| OLD | NEW |