Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: src/core/SkTaskGroup.cpp

Issue 1519573003: Change SkTaskGroup to use std::function. (Closed) Base URL: https://skia.googlesource.com/skia.git@master
Patch Set: address comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « src/core/SkTaskGroup.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2014 Google Inc. 2 * Copyright 2014 Google Inc.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license that can be 4 * Use of this source code is governed by a BSD-style license that can be
5 * found in the LICENSE file. 5 * found in the LICENSE file.
6 */ 6 */
7 7
8 #include "SkOnce.h" 8 #include "SkOnce.h"
9 #include "SkRunnable.h" 9 #include "SkRunnable.h"
10 #include "SkSemaphore.h" 10 #include "SkSemaphore.h"
11 #include "SkSpinlock.h" 11 #include "SkSpinlock.h"
12 #include "SkTArray.h"
12 #include "SkTDArray.h" 13 #include "SkTDArray.h"
13 #include "SkTaskGroup.h" 14 #include "SkTaskGroup.h"
14 #include "SkThreadUtils.h" 15 #include "SkThreadUtils.h"
15 16
16 #if defined(SK_BUILD_FOR_WIN32) 17 #if defined(SK_BUILD_FOR_WIN32)
17 static void query_num_cores(int* num_cores) { 18 static void query_num_cores(int* num_cores) {
18 SYSTEM_INFO sysinfo; 19 SYSTEM_INFO sysinfo;
19 GetNativeSystemInfo(&sysinfo); 20 GetNativeSystemInfo(&sysinfo);
20 *num_cores = sysinfo.dwNumberOfProcessors; 21 *num_cores = sysinfo.dwNumberOfProcessors;
21 } 22 }
(...skipping 14 matching lines...) Expand all
36 } 37 }
37 38
38 namespace { 39 namespace {
39 40
40 class ThreadPool : SkNoncopyable { 41 class ThreadPool : SkNoncopyable {
41 public: 42 public:
42 static void Add(SkRunnable* task, SkAtomic<int32_t>* pending) { 43 static void Add(SkRunnable* task, SkAtomic<int32_t>* pending) {
43 if (!gGlobal) { // If we have no threads, run synchronously. 44 if (!gGlobal) { // If we have no threads, run synchronously.
44 return task->run(); 45 return task->run();
45 } 46 }
46 gGlobal->add(&CallRunnable, task, pending); 47 gGlobal->add([task]() { task->run(); }, pending);
47 } 48 }
48 49
49 static void Add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { 50 static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
50 if (!gGlobal) { 51 if (!gGlobal) {
51 return fn(arg); 52 return fn();
52 } 53 }
53 gGlobal->add(fn, arg, pending); 54 gGlobal->add(fn, pending);
54 } 55 }
55 56
56 static void Batch(void (*fn)(void*), void* args, int N, size_t stride, 57 static void Batch(std::function<void(int)> fn, int N, SkAtomic<int32_t>* pen ding) {
57 SkAtomic<int32_t>* pending) {
58 if (!gGlobal) { 58 if (!gGlobal) {
59 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } 59 for (int i = 0; i < N; i++) { fn(i); }
60 return; 60 return;
61 } 61 }
62 gGlobal->batch(fn, args, N, stride, pending); 62 gGlobal->batch(fn, N, pending);
63 } 63 }
64 64
65 static void Wait(SkAtomic<int32_t>* pending) { 65 static void Wait(SkAtomic<int32_t>* pending) {
66 if (!gGlobal) { // If we have no threads, the work must already be done . 66 if (!gGlobal) { // If we have no threads, the work must already be done .
67 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); 67 SkASSERT(pending->load(sk_memory_order_relaxed) == 0);
68 return; 68 return;
69 } 69 }
70 // Acquire pairs with decrement release here or in Loop. 70 // Acquire pairs with decrement release here or in Loop.
71 while (pending->load(sk_memory_order_acquire) > 0) { 71 while (pending->load(sk_memory_order_acquire) > 0) {
72 // Lend a hand until our SkTaskGroup of interest is done. 72 // Lend a hand until our SkTaskGroup of interest is done.
73 Work work; 73 Work work;
74 { 74 {
75 // We're stealing work opportunistically, 75 // We're stealing work opportunistically,
76 // so we never call fWorkAvailable.wait(), which could sleep us if there's no work. 76 // so we never call fWorkAvailable.wait(), which could sleep us if there's no work.
77 // This means fWorkAvailable is only an upper bound on fWork.cou nt(). 77 // This means fWorkAvailable is only an upper bound on fWork.cou nt().
78 AutoLock lock(&gGlobal->fWorkLock); 78 AutoLock lock(&gGlobal->fWorkLock);
79 if (gGlobal->fWork.isEmpty()) { 79 if (gGlobal->fWork.empty()) {
80 // Someone has picked up all the work (including ours). How nice of them! 80 // Someone has picked up all the work (including ours). How nice of them!
81 // (They may still be working on it, so we can't assert *pen ding == 0 here.) 81 // (They may still be working on it, so we can't assert *pen ding == 0 here.)
82 continue; 82 continue;
83 } 83 }
84 gGlobal->fWork.pop(&work); 84 work = gGlobal->fWork.back();
85 gGlobal->fWork.pop_back();
85 } 86 }
86 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. 87 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
87 // We threads gotta stick together. We're always making forward pro gress. 88 // We threads gotta stick together. We're always making forward pro gress.
88 work.fn(work.arg); 89 work.fn();
89 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above. 90 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above.
90 } 91 }
91 } 92 }
92 93
93 private: 94 private:
94 struct AutoLock { 95 struct AutoLock {
95 AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } 96 AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); }
96 ~AutoLock() { fLock->release(); } 97 ~AutoLock() { fLock->release(); }
97 private: 98 private:
98 SkSpinlock* fLock; 99 SkSpinlock* fLock;
99 }; 100 };
100 101
101 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); } 102 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
102 103
103 struct Work { 104 struct Work {
104 void (*fn)(void*); // A function to call, 105 std::function<void(void)> fn; // A function to call
105 void* arg; // its argument,
106 SkAtomic<int32_t>* pending; // then decrement pending afterwards. 106 SkAtomic<int32_t>* pending; // then decrement pending afterwards.
107 }; 107 };
108 108
109 explicit ThreadPool(int threads) { 109 explicit ThreadPool(int threads) {
110 if (threads == -1) { 110 if (threads == -1) {
111 threads = sk_num_cores(); 111 threads = sk_num_cores();
112 } 112 }
113 for (int i = 0; i < threads; i++) { 113 for (int i = 0; i < threads; i++) {
114 fThreads.push(new SkThread(&ThreadPool::Loop, this)); 114 fThreads.push(new SkThread(&ThreadPool::Loop, this));
115 fThreads.top()->start(); 115 fThreads.top()->start();
116 } 116 }
117 } 117 }
118 118
119 ~ThreadPool() { 119 ~ThreadPool() {
120 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n ow. 120 SkASSERT(fWork.empty()); // All SkTaskGroups should be destroyed by now .
121 121
122 // Send a poison pill to each thread. 122 // Send a poison pill to each thread.
123 SkAtomic<int> dummy(0); 123 SkAtomic<int> dummy(0);
124 for (int i = 0; i < fThreads.count(); i++) { 124 for (int i = 0; i < fThreads.count(); i++) {
125 this->add(nullptr, nullptr, &dummy); 125 this->add(nullptr, &dummy);
126 } 126 }
127 // Wait for them all to swallow the pill and die. 127 // Wait for them all to swallow the pill and die.
128 for (int i = 0; i < fThreads.count(); i++) { 128 for (int i = 0; i < fThreads.count(); i++) {
129 fThreads[i]->join(); 129 fThreads[i]->join();
130 } 130 }
131 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. 131 SkASSERT(fWork.empty()); // Can't hurt to double check.
132 fThreads.deleteAll(); 132 fThreads.deleteAll();
133 } 133 }
134 134
135 void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { 135 void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
136 Work work = { fn, arg, pending }; 136 Work work = { fn, pending };
137 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. 137 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed.
138 { 138 {
139 AutoLock lock(&fWorkLock); 139 AutoLock lock(&fWorkLock);
140 fWork.push(work); 140 fWork.push_back(work);
141 } 141 }
142 fWorkAvailable.signal(1); 142 fWorkAvailable.signal(1);
143 } 143 }
144 144
145 void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3 2_t>* pending) { 145 void batch(std::function<void(int)> fn, int N, SkAtomic<int32_t>* pending) {
146 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. 146 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed.
147 { 147 {
148 AutoLock lock(&fWorkLock); 148 AutoLock lock(&fWorkLock);
149 Work* batch = fWork.append(N);
150 for (int i = 0; i < N; i++) { 149 for (int i = 0; i < N; i++) {
151 Work work = { fn, (char*)arg + i*stride, pending }; 150 Work work = { [i, fn]() { fn(i); }, pending };
152 batch[i] = work; 151 fWork.push_back(work);
153 } 152 }
154 } 153 }
155 fWorkAvailable.signal(N); 154 fWorkAvailable.signal(N);
156 } 155 }
157 156
158 static void Loop(void* arg) { 157 static void Loop(void* arg) {
159 ThreadPool* pool = (ThreadPool*)arg; 158 ThreadPool* pool = (ThreadPool*)arg;
160 Work work; 159 Work work;
161 while (true) { 160 while (true) {
162 // Sleep until there's work available, and claim one unit of Work as we wake. 161 // Sleep until there's work available, and claim one unit of Work as we wake.
163 pool->fWorkAvailable.wait(); 162 pool->fWorkAvailable.wait();
164 { 163 {
165 AutoLock lock(&pool->fWorkLock); 164 AutoLock lock(&pool->fWorkLock);
166 if (pool->fWork.isEmpty()) { 165 if (pool->fWork.empty()) {
167 // Someone in Wait() stole our work (fWorkAvailable is an up per bound). 166 // Someone in Wait() stole our work (fWorkAvailable is an up per bound).
168 // Well, that's fine, back to sleep for us. 167 // Well, that's fine, back to sleep for us.
169 continue; 168 continue;
170 } 169 }
171 pool->fWork.pop(&work); 170 work = pool->fWork.back();
171 pool->fWork.pop_back();
172 } 172 }
173 if (!work.fn) { 173 if (!work.fn) {
174 return; // Poison pill. Time... to die. 174 return; // Poison pill. Time... to die.
175 } 175 }
176 work.fn(work.arg); 176 work.fn();
177 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait(). 177 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait().
178 } 178 }
179 } 179 }
180 180
181 // fWorkLock must be held when reading or modifying fWork. 181 // fWorkLock must be held when reading or modifying fWork.
182 SkSpinlock fWorkLock; 182 SkSpinlock fWorkLock;
183 SkTDArray<Work> fWork; 183 SkTArray<Work> fWork;
184 184
185 // A thread-safe upper bound for fWork.count(). 185 // A thread-safe upper bound for fWork.count().
186 // 186 //
187 // We'd have it be an exact count but for the loop in Wait(): 187 // We'd have it be an exact count but for the loop in Wait():
188 // we never want that to block, so it can't call fWorkAvailable.wait(), 188 // we never want that to block, so it can't call fWorkAvailable.wait(),
189 // and that's the only way to decrement fWorkAvailable. 189 // and that's the only way to decrement fWorkAvailable.
190 // So fWorkAvailable may overcount actual the work available. 190 // So fWorkAvailable may overcount actual the work available.
191 // We make do, but this means some worker threads may wake spuriously. 191 // We make do, but this means some worker threads may wake spuriously.
192 SkSemaphore fWorkAvailable; 192 SkSemaphore fWorkAvailable;
193 193
(...skipping 14 matching lines...) Expand all
208 ThreadPool::gGlobal = new ThreadPool(threads); 208 ThreadPool::gGlobal = new ThreadPool(threads);
209 } 209 }
210 } 210 }
211 211
212 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; } 212 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; }
213 213
214 SkTaskGroup::SkTaskGroup() : fPending(0) {} 214 SkTaskGroup::SkTaskGroup() : fPending(0) {}
215 215
216 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending ); } 216 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending ); }
217 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe nding); } 217 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe nding); }
218 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, & fPending); } 218 void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPend ing); }
219 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { 219 void SkTaskGroup::batch (std::function<void(int)> fn, int N) {
220 ThreadPool::Batch(fn, args, N, stride, &fPending); 220 ThreadPool::Batch(fn, N, &fPending);
221 } 221 }
222 222
223 int sk_parallel_for_thread_count() { 223 int sk_parallel_for_thread_count() {
224 if (ThreadPool::gGlobal != nullptr) { 224 if (ThreadPool::gGlobal != nullptr) {
225 return ThreadPool::gGlobal->fThreads.count(); 225 return ThreadPool::gGlobal->fThreads.count();
226 } 226 }
227 return 0; 227 return 0;
228 } 228 }
OLDNEW
« no previous file with comments | « src/core/SkTaskGroup.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698