OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2014 Google Inc. | 2 * Copyright 2014 Google Inc. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license that can be | 4 * Use of this source code is governed by a BSD-style license that can be |
5 * found in the LICENSE file. | 5 * found in the LICENSE file. |
6 */ | 6 */ |
7 | 7 |
8 #include "SkOnce.h" | 8 #include "SkOnce.h" |
9 #include "SkRunnable.h" | 9 #include "SkRunnable.h" |
10 #include "SkSemaphore.h" | 10 #include "SkSemaphore.h" |
11 #include "SkSpinlock.h" | 11 #include "SkSpinlock.h" |
| 12 #include "SkTArray.h" |
12 #include "SkTDArray.h" | 13 #include "SkTDArray.h" |
13 #include "SkTaskGroup.h" | 14 #include "SkTaskGroup.h" |
14 #include "SkThreadUtils.h" | 15 #include "SkThreadUtils.h" |
15 | 16 |
16 #if defined(SK_BUILD_FOR_WIN32) | 17 #if defined(SK_BUILD_FOR_WIN32) |
17 static void query_num_cores(int* num_cores) { | 18 static void query_num_cores(int* num_cores) { |
18 SYSTEM_INFO sysinfo; | 19 SYSTEM_INFO sysinfo; |
19 GetNativeSystemInfo(&sysinfo); | 20 GetNativeSystemInfo(&sysinfo); |
20 *num_cores = sysinfo.dwNumberOfProcessors; | 21 *num_cores = sysinfo.dwNumberOfProcessors; |
21 } | 22 } |
(...skipping 14 matching lines...) Expand all Loading... |
36 } | 37 } |
37 | 38 |
38 namespace { | 39 namespace { |
39 | 40 |
40 class ThreadPool : SkNoncopyable { | 41 class ThreadPool : SkNoncopyable { |
41 public: | 42 public: |
42 static void Add(SkRunnable* task, SkAtomic<int32_t>* pending) { | 43 static void Add(SkRunnable* task, SkAtomic<int32_t>* pending) { |
43 if (!gGlobal) { // If we have no threads, run synchronously. | 44 if (!gGlobal) { // If we have no threads, run synchronously. |
44 return task->run(); | 45 return task->run(); |
45 } | 46 } |
46 gGlobal->add(&CallRunnable, task, pending); | 47 gGlobal->add([task]() { task->run(); }, pending); |
47 } | 48 } |
48 | 49 |
49 static void Add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { | 50 static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) { |
50 if (!gGlobal) { | 51 if (!gGlobal) { |
51 return fn(arg); | 52 return fn(); |
52 } | 53 } |
53 gGlobal->add(fn, arg, pending); | 54 gGlobal->add(fn, pending); |
54 } | 55 } |
55 | 56 |
56 static void Batch(void (*fn)(void*), void* args, int N, size_t stride, | 57 static void Batch(std::function<void(int)> fn, int N, SkAtomic<int32_t>* pen
ding) { |
57 SkAtomic<int32_t>* pending) { | |
58 if (!gGlobal) { | 58 if (!gGlobal) { |
59 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } | 59 for (int i = 0; i < N; i++) { fn(i); } |
60 return; | 60 return; |
61 } | 61 } |
62 gGlobal->batch(fn, args, N, stride, pending); | 62 gGlobal->batch(fn, N, pending); |
63 } | 63 } |
64 | 64 |
65 static void Wait(SkAtomic<int32_t>* pending) { | 65 static void Wait(SkAtomic<int32_t>* pending) { |
66 if (!gGlobal) { // If we have no threads, the work must already be done
. | 66 if (!gGlobal) { // If we have no threads, the work must already be done
. |
67 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); | 67 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); |
68 return; | 68 return; |
69 } | 69 } |
70 // Acquire pairs with decrement release here or in Loop. | 70 // Acquire pairs with decrement release here or in Loop. |
71 while (pending->load(sk_memory_order_acquire) > 0) { | 71 while (pending->load(sk_memory_order_acquire) > 0) { |
72 // Lend a hand until our SkTaskGroup of interest is done. | 72 // Lend a hand until our SkTaskGroup of interest is done. |
73 Work work; | 73 Work work; |
74 { | 74 { |
75 // We're stealing work opportunistically, | 75 // We're stealing work opportunistically, |
76 // so we never call fWorkAvailable.wait(), which could sleep us
if there's no work. | 76 // so we never call fWorkAvailable.wait(), which could sleep us
if there's no work. |
77 // This means fWorkAvailable is only an upper bound on fWork.cou
nt(). | 77 // This means fWorkAvailable is only an upper bound on fWork.cou
nt(). |
78 AutoLock lock(&gGlobal->fWorkLock); | 78 AutoLock lock(&gGlobal->fWorkLock); |
79 if (gGlobal->fWork.isEmpty()) { | 79 if (gGlobal->fWork.empty()) { |
80 // Someone has picked up all the work (including ours). How
nice of them! | 80 // Someone has picked up all the work (including ours). How
nice of them! |
81 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) | 81 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) |
82 continue; | 82 continue; |
83 } | 83 } |
84 gGlobal->fWork.pop(&work); | 84 work = gGlobal->fWork.back(); |
| 85 gGlobal->fWork.pop_back(); |
85 } | 86 } |
86 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. | 87 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. |
87 // We threads gotta stick together. We're always making forward pro
gress. | 88 // We threads gotta stick together. We're always making forward pro
gress. |
88 work.fn(work.arg); | 89 work.fn(); |
89 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load above. | 90 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load above. |
90 } | 91 } |
91 } | 92 } |
92 | 93 |
93 private: | 94 private: |
94 struct AutoLock { | 95 struct AutoLock { |
95 AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } | 96 AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } |
96 ~AutoLock() { fLock->release(); } | 97 ~AutoLock() { fLock->release(); } |
97 private: | 98 private: |
98 SkSpinlock* fLock; | 99 SkSpinlock* fLock; |
99 }; | 100 }; |
100 | 101 |
101 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} | 102 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} |
102 | 103 |
103 struct Work { | 104 struct Work { |
104 void (*fn)(void*); // A function to call, | 105 std::function<void(void)> fn; // A function to call |
105 void* arg; // its argument, | |
106 SkAtomic<int32_t>* pending; // then decrement pending afterwards. | 106 SkAtomic<int32_t>* pending; // then decrement pending afterwards. |
107 }; | 107 }; |
108 | 108 |
109 explicit ThreadPool(int threads) { | 109 explicit ThreadPool(int threads) { |
110 if (threads == -1) { | 110 if (threads == -1) { |
111 threads = sk_num_cores(); | 111 threads = sk_num_cores(); |
112 } | 112 } |
113 for (int i = 0; i < threads; i++) { | 113 for (int i = 0; i < threads; i++) { |
114 fThreads.push(new SkThread(&ThreadPool::Loop, this)); | 114 fThreads.push(new SkThread(&ThreadPool::Loop, this)); |
115 fThreads.top()->start(); | 115 fThreads.top()->start(); |
116 } | 116 } |
117 } | 117 } |
118 | 118 |
119 ~ThreadPool() { | 119 ~ThreadPool() { |
120 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. | 120 SkASSERT(fWork.empty()); // All SkTaskGroups should be destroyed by now
. |
121 | 121 |
122 // Send a poison pill to each thread. | 122 // Send a poison pill to each thread. |
123 SkAtomic<int> dummy(0); | 123 SkAtomic<int> dummy(0); |
124 for (int i = 0; i < fThreads.count(); i++) { | 124 for (int i = 0; i < fThreads.count(); i++) { |
125 this->add(nullptr, nullptr, &dummy); | 125 this->add(nullptr, &dummy); |
126 } | 126 } |
127 // Wait for them all to swallow the pill and die. | 127 // Wait for them all to swallow the pill and die. |
128 for (int i = 0; i < fThreads.count(); i++) { | 128 for (int i = 0; i < fThreads.count(); i++) { |
129 fThreads[i]->join(); | 129 fThreads[i]->join(); |
130 } | 130 } |
131 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | 131 SkASSERT(fWork.empty()); // Can't hurt to double check. |
132 fThreads.deleteAll(); | 132 fThreads.deleteAll(); |
133 } | 133 } |
134 | 134 |
135 void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { | 135 void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) { |
136 Work work = { fn, arg, pending }; | 136 Work work = { fn, pending }; |
137 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. | 137 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. |
138 { | 138 { |
139 AutoLock lock(&fWorkLock); | 139 AutoLock lock(&fWorkLock); |
140 fWork.push(work); | 140 fWork.push_back(work); |
141 } | 141 } |
142 fWorkAvailable.signal(1); | 142 fWorkAvailable.signal(1); |
143 } | 143 } |
144 | 144 |
145 void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3
2_t>* pending) { | 145 void batch(std::function<void(int)> fn, int N, SkAtomic<int32_t>* pending) { |
146 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. | 146 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. |
147 { | 147 { |
148 AutoLock lock(&fWorkLock); | 148 AutoLock lock(&fWorkLock); |
149 Work* batch = fWork.append(N); | |
150 for (int i = 0; i < N; i++) { | 149 for (int i = 0; i < N; i++) { |
151 Work work = { fn, (char*)arg + i*stride, pending }; | 150 Work work = { [i, fn]() { fn(i); }, pending }; |
152 batch[i] = work; | 151 fWork.push_back(work); |
153 } | 152 } |
154 } | 153 } |
155 fWorkAvailable.signal(N); | 154 fWorkAvailable.signal(N); |
156 } | 155 } |
157 | 156 |
158 static void Loop(void* arg) { | 157 static void Loop(void* arg) { |
159 ThreadPool* pool = (ThreadPool*)arg; | 158 ThreadPool* pool = (ThreadPool*)arg; |
160 Work work; | 159 Work work; |
161 while (true) { | 160 while (true) { |
162 // Sleep until there's work available, and claim one unit of Work as
we wake. | 161 // Sleep until there's work available, and claim one unit of Work as
we wake. |
163 pool->fWorkAvailable.wait(); | 162 pool->fWorkAvailable.wait(); |
164 { | 163 { |
165 AutoLock lock(&pool->fWorkLock); | 164 AutoLock lock(&pool->fWorkLock); |
166 if (pool->fWork.isEmpty()) { | 165 if (pool->fWork.empty()) { |
167 // Someone in Wait() stole our work (fWorkAvailable is an up
per bound). | 166 // Someone in Wait() stole our work (fWorkAvailable is an up
per bound). |
168 // Well, that's fine, back to sleep for us. | 167 // Well, that's fine, back to sleep for us. |
169 continue; | 168 continue; |
170 } | 169 } |
171 pool->fWork.pop(&work); | 170 work = pool->fWork.back(); |
| 171 pool->fWork.pop_back(); |
172 } | 172 } |
173 if (!work.fn) { | 173 if (!work.fn) { |
174 return; // Poison pill. Time... to die. | 174 return; // Poison pill. Time... to die. |
175 } | 175 } |
176 work.fn(work.arg); | 176 work.fn(); |
177 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load in Wait(). | 177 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load in Wait(). |
178 } | 178 } |
179 } | 179 } |
180 | 180 |
181 // fWorkLock must be held when reading or modifying fWork. | 181 // fWorkLock must be held when reading or modifying fWork. |
182 SkSpinlock fWorkLock; | 182 SkSpinlock fWorkLock; |
183 SkTDArray<Work> fWork; | 183 SkTArray<Work> fWork; |
184 | 184 |
185 // A thread-safe upper bound for fWork.count(). | 185 // A thread-safe upper bound for fWork.count(). |
186 // | 186 // |
187 // We'd have it be an exact count but for the loop in Wait(): | 187 // We'd have it be an exact count but for the loop in Wait(): |
188 // we never want that to block, so it can't call fWorkAvailable.wait(), | 188 // we never want that to block, so it can't call fWorkAvailable.wait(), |
189 // and that's the only way to decrement fWorkAvailable. | 189 // and that's the only way to decrement fWorkAvailable. |
190 // So fWorkAvailable may overcount actual the work available. | 190 // So fWorkAvailable may overcount actual the work available. |
191 // We make do, but this means some worker threads may wake spuriously. | 191 // We make do, but this means some worker threads may wake spuriously. |
192 SkSemaphore fWorkAvailable; | 192 SkSemaphore fWorkAvailable; |
193 | 193 |
(...skipping 14 matching lines...) Expand all Loading... |
208 ThreadPool::gGlobal = new ThreadPool(threads); | 208 ThreadPool::gGlobal = new ThreadPool(threads); |
209 } | 209 } |
210 } | 210 } |
211 | 211 |
212 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; } | 212 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; } |
213 | 213 |
214 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 214 SkTaskGroup::SkTaskGroup() : fPending(0) {} |
215 | 215 |
216 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } | 216 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } |
217 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } | 217 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } |
218 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } | 218 void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPend
ing); } |
219 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { | 219 void SkTaskGroup::batch (std::function<void(int)> fn, int N) { |
220 ThreadPool::Batch(fn, args, N, stride, &fPending); | 220 ThreadPool::Batch(fn, N, &fPending); |
221 } | 221 } |
222 | 222 |
223 int sk_parallel_for_thread_count() { | 223 int sk_parallel_for_thread_count() { |
224 if (ThreadPool::gGlobal != nullptr) { | 224 if (ThreadPool::gGlobal != nullptr) { |
225 return ThreadPool::gGlobal->fThreads.count(); | 225 return ThreadPool::gGlobal->fThreads.count(); |
226 } | 226 } |
227 return 0; | 227 return 0; |
228 } | 228 } |
OLD | NEW |