OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2014 Google Inc. | 2 * Copyright 2014 Google Inc. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license that can be | 4 * Use of this source code is governed by a BSD-style license that can be |
5 * found in the LICENSE file. | 5 * found in the LICENSE file. |
6 */ | 6 */ |
7 | 7 |
| 8 #include "SkRunnable.h" |
| 9 #include "SkSemaphore.h" |
| 10 #include "SkSpinlock.h" |
| 11 #include "SkTDArray.h" |
8 #include "SkTaskGroup.h" | 12 #include "SkTaskGroup.h" |
9 | |
10 #include "SkCondVar.h" | |
11 #include "SkRunnable.h" | |
12 #include "SkTDArray.h" | |
13 #include "SkThread.h" | |
14 #include "SkThreadUtils.h" | 13 #include "SkThreadUtils.h" |
15 | 14 |
16 #if defined(SK_BUILD_FOR_WIN32) | 15 #if defined(SK_BUILD_FOR_WIN32) |
17 static inline int num_cores() { | 16 static inline int num_cores() { |
18 SYSTEM_INFO sysinfo; | 17 SYSTEM_INFO sysinfo; |
19 GetSystemInfo(&sysinfo); | 18 GetSystemInfo(&sysinfo); |
20 return sysinfo.dwNumberOfProcessors; | 19 return sysinfo.dwNumberOfProcessors; |
21 } | 20 } |
22 #else | 21 #else |
23 #include <unistd.h> | 22 #include <unistd.h> |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
56 static void Wait(SkAtomic<int32_t>* pending) { | 55 static void Wait(SkAtomic<int32_t>* pending) { |
57 if (!gGlobal) { // If we have no threads, the work must already be done
. | 56 if (!gGlobal) { // If we have no threads, the work must already be done
. |
58 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); | 57 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); |
59 return; | 58 return; |
60 } | 59 } |
61 // Acquire pairs with decrement release here or in Loop. | 60 // Acquire pairs with decrement release here or in Loop. |
62 while (pending->load(sk_memory_order_acquire) > 0) { | 61 while (pending->load(sk_memory_order_acquire) > 0) { |
63 // Lend a hand until our SkTaskGroup of interest is done. | 62 // Lend a hand until our SkTaskGroup of interest is done. |
64 Work work; | 63 Work work; |
65 { | 64 { |
66 AutoLock lock(&gGlobal->fReady); | 65 // We're stealing work opportunistically, |
| 66 // so we never call fWorkAvailable.wait(), which could sleep us
if there's no work. |
| 67 // This means fWorkAvailable is only an upper bound on fWork.cou
nt(). |
| 68 AutoLock lock(&gGlobal->fWorkLock); |
67 if (gGlobal->fWork.isEmpty()) { | 69 if (gGlobal->fWork.isEmpty()) { |
68 // Someone has picked up all the work (including ours). How
nice of them! | 70 // Someone has picked up all the work (including ours). How
nice of them! |
69 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) | 71 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) |
70 continue; | 72 continue; |
71 } | 73 } |
72 gGlobal->fWork.pop(&work); | 74 gGlobal->fWork.pop(&work); |
73 } | 75 } |
74 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. | 76 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. |
75 // We threads gotta stick together. We're always making forward pro
gress. | 77 // We threads gotta stick together. We're always making forward pro
gress. |
76 work.fn(work.arg); | 78 work.fn(work.arg); |
77 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load above. | 79 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load above. |
78 } | 80 } |
79 } | 81 } |
80 | 82 |
81 private: | 83 private: |
82 struct AutoLock { | 84 struct AutoLock { |
83 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } | 85 AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } |
84 ~AutoLock() { fC->unlock(); } | 86 ~AutoLock() { fLock->release(); } |
85 private: | 87 private: |
86 SkCondVar* fC; | 88 SkSpinlock* fLock; |
87 }; | 89 }; |
88 | 90 |
89 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} | 91 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} |
90 | 92 |
91 struct Work { | 93 struct Work { |
92 void (*fn)(void*); // A function to call, | 94 void (*fn)(void*); // A function to call, |
93 void* arg; // its argument, | 95 void* arg; // its argument, |
94 SkAtomic<int32_t>* pending; // then decrement pending afterwards. | 96 SkAtomic<int32_t>* pending; // then decrement pending afterwards. |
95 }; | 97 }; |
96 | 98 |
97 explicit ThreadPool(int threads) : fDraining(false) { | 99 explicit ThreadPool(int threads) { |
98 if (threads == -1) { | 100 if (threads == -1) { |
99 threads = num_cores(); | 101 threads = num_cores(); |
100 } | 102 } |
101 for (int i = 0; i < threads; i++) { | 103 for (int i = 0; i < threads; i++) { |
102 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | 104 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); |
103 fThreads.top()->start(); | 105 fThreads.top()->start(); |
104 } | 106 } |
105 } | 107 } |
106 | 108 |
107 ~ThreadPool() { | 109 ~ThreadPool() { |
108 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. | 110 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. |
109 { | 111 |
110 AutoLock lock(&fReady); | 112 // Send a poison pill to each thread. |
111 fDraining = true; | 113 SkAtomic<int> dummy(0); |
112 fReady.broadcast(); | 114 for (int i = 0; i < fThreads.count(); i++) { |
| 115 this->add(NULL, NULL, &dummy); |
113 } | 116 } |
| 117 // Wait for them all to swallow the pill and die. |
114 for (int i = 0; i < fThreads.count(); i++) { | 118 for (int i = 0; i < fThreads.count(); i++) { |
115 fThreads[i]->join(); | 119 fThreads[i]->join(); |
116 } | 120 } |
117 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | 121 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. |
118 fThreads.deleteAll(); | 122 fThreads.deleteAll(); |
119 } | 123 } |
120 | 124 |
121 void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { | 125 void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { |
122 Work work = { fn, arg, pending }; | 126 Work work = { fn, arg, pending }; |
123 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. | 127 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. |
124 { | 128 { |
125 AutoLock lock(&fReady); | 129 AutoLock lock(&fWorkLock); |
126 fWork.push(work); | 130 fWork.push(work); |
127 fReady.signal(); | |
128 } | 131 } |
| 132 fWorkAvailable.signal(1); |
129 } | 133 } |
130 | 134 |
131 void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3
2_t>* pending) { | 135 void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3
2_t>* pending) { |
132 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. | 136 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. |
133 { | 137 { |
134 AutoLock lock(&fReady); | 138 AutoLock lock(&fWorkLock); |
135 Work* batch = fWork.append(N); | 139 Work* batch = fWork.append(N); |
136 for (int i = 0; i < N; i++) { | 140 for (int i = 0; i < N; i++) { |
137 Work work = { fn, (char*)arg + i*stride, pending }; | 141 Work work = { fn, (char*)arg + i*stride, pending }; |
138 batch[i] = work; | 142 batch[i] = work; |
139 } | 143 } |
140 fReady.broadcast(); | |
141 } | 144 } |
| 145 fWorkAvailable.signal(N); |
142 } | 146 } |
143 | 147 |
144 static void Loop(void* arg) { | 148 static void Loop(void* arg) { |
145 ThreadPool* pool = (ThreadPool*)arg; | 149 ThreadPool* pool = (ThreadPool*)arg; |
146 Work work; | 150 Work work; |
147 while (true) { | 151 while (true) { |
| 152 // Sleep until there's work available, and claim one unit of Work as
we wake. |
| 153 pool->fWorkAvailable.wait(); |
148 { | 154 { |
149 AutoLock lock(&pool->fReady); | 155 AutoLock lock(&pool->fWorkLock); |
150 while (pool->fWork.isEmpty()) { | 156 if (pool->fWork.isEmpty()) { |
151 if (pool->fDraining) { | 157 // Someone in Wait() stole our work (fWorkAvailable is an up
per bound). |
152 return; | 158 // Well, that's fine, back to sleep for us. |
153 } | 159 continue; |
154 pool->fReady.wait(); | |
155 } | 160 } |
156 pool->fWork.pop(&work); | 161 pool->fWork.pop(&work); |
157 } | 162 } |
| 163 if (!work.fn) { |
| 164 return; // Poison pill. Time... to die. |
| 165 } |
158 work.fn(work.arg); | 166 work.fn(work.arg); |
159 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load in Wait(). | 167 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load in Wait(). |
160 } | 168 } |
161 } | 169 } |
162 | 170 |
163 SkTDArray<Work> fWork; | 171 // fWorkLock must be held when reading or modifying fWork. |
| 172 SkSpinlock fWorkLock; |
| 173 SkTDArray<Work> fWork; |
| 174 |
| 175 // A thread-safe upper bound for fWork.count(). |
| 176 // |
| 177 // We'd have it be an exact count but for the loop in Wait(): |
| 178 // we never want that to block, so it can't call fWorkAvailable.wait(), |
| 179 // and that's the only way to decrement fWorkAvailable. |
| 180 // So fWorkAvailable may overcount actual the work available. |
| 181 // We make do, but this means some worker threads may wake spuriously. |
| 182 SkSemaphore fWorkAvailable; |
| 183 |
| 184 // These are only changed in a single-threaded context. |
164 SkTDArray<SkThread*> fThreads; | 185 SkTDArray<SkThread*> fThreads; |
165 SkCondVar fReady; | 186 static ThreadPool* gGlobal; |
166 bool fDraining; | |
167 | 187 |
168 static ThreadPool* gGlobal; | |
169 friend struct SkTaskGroup::Enabler; | 188 friend struct SkTaskGroup::Enabler; |
170 }; | 189 }; |
171 ThreadPool* ThreadPool::gGlobal = NULL; | 190 ThreadPool* ThreadPool::gGlobal = NULL; |
172 | 191 |
173 } // namespace | 192 } // namespace |
174 | 193 |
175 SkTaskGroup::Enabler::Enabler(int threads) { | 194 SkTaskGroup::Enabler::Enabler(int threads) { |
176 SkASSERT(ThreadPool::gGlobal == NULL); | 195 SkASSERT(ThreadPool::gGlobal == NULL); |
177 if (threads != 0 && SkCondVar::Supported()) { | 196 if (threads != 0) { |
178 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); | 197 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); |
179 } | 198 } |
180 } | 199 } |
181 | 200 |
182 SkTaskGroup::Enabler::~Enabler() { | 201 SkTaskGroup::Enabler::~Enabler() { |
183 SkDELETE(ThreadPool::gGlobal); | 202 SkDELETE(ThreadPool::gGlobal); |
184 } | 203 } |
185 | 204 |
186 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 205 SkTaskGroup::SkTaskGroup() : fPending(0) {} |
187 | 206 |
188 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } | 207 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } |
189 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } | 208 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } |
190 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } | 209 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } |
191 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { | 210 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { |
192 ThreadPool::Batch(fn, args, N, stride, &fPending); | 211 ThreadPool::Batch(fn, args, N, stride, &fPending); |
193 } | 212 } |
194 | 213 |
OLD | NEW |