OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2012 Google Inc. | 2 * Copyright 2012 Google Inc. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license that can be | 4 * Use of this source code is governed by a BSD-style license that can be |
5 * found in the LICENSE file. | 5 * found in the LICENSE file. |
6 */ | 6 */ |
7 | 7 |
8 #ifndef SkThreadPool_DEFINED | 8 #ifndef ThreadPool_DEFINED |
9 #define SkThreadPool_DEFINED | 9 #define ThreadPool_DEFINED |
10 | 10 |
11 #include "SkCondVar.h" | 11 #include "CondVar.h" |
12 #include "SkRunnable.h" | 12 #include "Runnable.h" |
13 #include "SkTDArray.h" | 13 #include "SkTDArray.h" |
14 #include "SkTInternalLList.h" | 14 #include "SkTInternalLList.h" |
15 #include "SkThreadUtils.h" | 15 #include "SkThreadUtils.h" |
16 #include "SkTypes.h" | 16 #include "SkTypes.h" |
17 | 17 |
18 #if defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_
FOR_ANDROID) | 18 #if defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_
FOR_ANDROID) |
19 # include <unistd.h> | 19 # include <unistd.h> |
20 #endif | 20 #endif |
21 | 21 |
22 // Returns the number of cores on this machine. | 22 // Returns the number of cores on this machine. |
23 static inline int num_cores() { | 23 static inline int num_cores() { |
24 #if defined(SK_BUILD_FOR_WIN32) | 24 #if defined(SK_BUILD_FOR_WIN32) |
25 SYSTEM_INFO sysinfo; | 25 SYSTEM_INFO sysinfo; |
26 GetSystemInfo(&sysinfo); | 26 GetSystemInfo(&sysinfo); |
27 return sysinfo.dwNumberOfProcessors; | 27 return sysinfo.dwNumberOfProcessors; |
28 #elif defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUIL
D_FOR_ANDROID) | 28 #elif defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUIL
D_FOR_ANDROID) |
29 return (int) sysconf(_SC_NPROCESSORS_ONLN); | 29 return (int) sysconf(_SC_NPROCESSORS_ONLN); |
30 #else | 30 #else |
31 return 1; | 31 return 1; |
32 #endif | 32 #endif |
33 } | 33 } |
34 | 34 |
35 template <typename T> | 35 template <typename T> |
36 class SkTThreadPool { | 36 class TThreadPool { |
37 public: | 37 public: |
38 /** | 38 /** |
39 * Create a threadpool with count threads, or one thread per core if kThread
PerCore. | 39 * Create a threadpool with count threads, or one thread per core if kThread
PerCore. |
40 */ | 40 */ |
41 static const int kThreadPerCore = -1; | 41 static const int kThreadPerCore = -1; |
42 explicit SkTThreadPool(int count); | 42 explicit TThreadPool(int count); |
43 ~SkTThreadPool(); | 43 ~TThreadPool(); |
44 | 44 |
45 /** | 45 /** |
46 * Queues up an SkRunnable to run when a thread is available, or synchronous
ly if count is 0. | 46 * Queues up an Runnable to run when a thread is available, or synchronously
if count is 0. |
47 * Does not take ownership. NULL is a safe no-op. If T is not void, the run
nable will be passed | 47 * Does not take ownership. NULL is a safe no-op. If T is not void, the run
nable will be passed |
48 * a reference to a T on the thread's local stack. | 48 * a reference to a T on the thread's local stack. |
49 */ | 49 */ |
50 void add(SkTRunnable<T>*); | 50 void add(TRunnable<T>*); |
51 | 51 |
52 /** | 52 /** |
53 * Same as add, but adds the runnable as the very next to run rather than en
queueing it. | 53 * Same as add, but adds the runnable as the very next to run rather than en
queueing it. |
54 */ | 54 */ |
55 void addNext(SkTRunnable<T>*); | 55 void addNext(TRunnable<T>*); |
56 | 56 |
57 /** | 57 /** |
58 * Block until all added SkRunnables have completed. Once called, calling a
dd() is undefined. | 58 * Block until all added Runnables have completed. Once called, calling add
() is undefined. |
59 */ | 59 */ |
60 void wait(); | 60 void wait(); |
61 | 61 |
62 private: | 62 private: |
63 struct LinkedRunnable { | 63 struct LinkedRunnable { |
64 SkTRunnable<T>* fRunnable; // Unowned. | 64 TRunnable<T>* fRunnable; // Unowned. |
65 SK_DECLARE_INTERNAL_LLIST_INTERFACE(LinkedRunnable); | 65 SK_DECLARE_INTERNAL_LLIST_INTERFACE(LinkedRunnable); |
66 }; | 66 }; |
67 | 67 |
68 enum State { | 68 enum State { |
69 kRunning_State, // Normal case. We've been constructed and no one has
called wait(). | 69 kRunning_State, // Normal case. We've been constructed and no one has
called wait(). |
70 kWaiting_State, // wait has been called, but there still might be work
to do or being done. | 70 kWaiting_State, // wait has been called, but there still might be work
to do or being done. |
71 kHalting_State, // There's no work to do and no thread is busy. All th
reads can shut down. | 71 kHalting_State, // There's no work to do and no thread is busy. All th
reads can shut down. |
72 }; | 72 }; |
73 | 73 |
74 void addSomewhere(SkTRunnable<T>* r, | 74 void addSomewhere(TRunnable<T>* r, |
75 void (SkTInternalLList<LinkedRunnable>::*)(LinkedRunnable*
)); | 75 void (SkTInternalLList<LinkedRunnable>::*)(LinkedRunnable*
)); |
76 | 76 |
77 SkTInternalLList<LinkedRunnable> fQueue; | 77 SkTInternalLList<LinkedRunnable> fQueue; |
78 SkCondVar fReady; | 78 CondVar fReady; |
79 SkTDArray<SkThread*> fThreads; | 79 SkTDArray<SkThread*> fThreads; |
80 State fState; | 80 State fState; |
81 int fBusyThreads; | 81 int fBusyThreads; |
82 | 82 |
83 static void Loop(void*); // Static because we pass in this. | 83 static void Loop(void*); // Static because we pass in this. |
84 }; | 84 }; |
85 | 85 |
86 template <typename T> | 86 template <typename T> |
87 SkTThreadPool<T>::SkTThreadPool(int count) : fState(kRunning_State), fBusyThread
s(0) { | 87 TThreadPool<T>::TThreadPool(int count) : fState(kRunning_State), fBusyThreads(0)
{ |
88 if (count < 0) { | 88 if (count < 0) { |
89 count = num_cores(); | 89 count = num_cores(); |
90 } | 90 } |
91 // Create count threads, all running SkTThreadPool::Loop. | 91 // Create count threads, all running TThreadPool::Loop. |
92 for (int i = 0; i < count; i++) { | 92 for (int i = 0; i < count; i++) { |
93 SkThread* thread = SkNEW_ARGS(SkThread, (&SkTThreadPool::Loop, this)); | 93 SkThread* thread = SkNEW_ARGS(SkThread, (&TThreadPool::Loop, this)); |
94 *fThreads.append() = thread; | 94 *fThreads.append() = thread; |
95 thread->start(); | 95 thread->start(); |
96 } | 96 } |
97 } | 97 } |
98 | 98 |
99 template <typename T> | 99 template <typename T> |
100 SkTThreadPool<T>::~SkTThreadPool() { | 100 TThreadPool<T>::~TThreadPool() { |
101 if (kRunning_State == fState) { | 101 if (kRunning_State == fState) { |
102 this->wait(); | 102 this->wait(); |
103 } | 103 } |
104 } | 104 } |
105 | 105 |
106 namespace SkThreadPoolPrivate { | 106 namespace ThreadPoolPrivate { |
107 | 107 |
108 template <typename T> | 108 template <typename T> |
109 struct ThreadLocal { | 109 struct ThreadLocal { |
110 void run(SkTRunnable<T>* r) { r->run(data); } | 110 void run(TRunnable<T>* r) { r->run(data); } |
111 T data; | 111 T data; |
112 }; | 112 }; |
113 | 113 |
114 template <> | 114 template <> |
115 struct ThreadLocal<void> { | 115 struct ThreadLocal<void> { |
116 void run(SkTRunnable<void>* r) { r->run(); } | 116 void run(TRunnable<void>* r) { r->run(); } |
117 }; | 117 }; |
118 | 118 |
119 } // namespace SkThreadPoolPrivate | 119 } // namespace ThreadPoolPrivate |
120 | 120 |
121 template <typename T> | 121 template <typename T> |
122 void SkTThreadPool<T>::addSomewhere(SkTRunnable<T>* r, | 122 void TThreadPool<T>::addSomewhere(TRunnable<T>* r, |
123 void (SkTInternalLList<LinkedRunnable>::* f)
(LinkedRunnable*)) { | 123 void (SkTInternalLList<LinkedRunnable>::* f)
(LinkedRunnable*)) { |
124 if (r == NULL) { | 124 if (r == NULL) { |
125 return; | 125 return; |
126 } | 126 } |
127 | 127 |
128 if (fThreads.isEmpty()) { | 128 if (fThreads.isEmpty()) { |
129 SkThreadPoolPrivate::ThreadLocal<T> threadLocal; | 129 ThreadPoolPrivate::ThreadLocal<T> threadLocal; |
130 threadLocal.run(r); | 130 threadLocal.run(r); |
131 return; | 131 return; |
132 } | 132 } |
133 | 133 |
134 LinkedRunnable* linkedRunnable = SkNEW(LinkedRunnable); | 134 LinkedRunnable* linkedRunnable = SkNEW(LinkedRunnable); |
135 linkedRunnable->fRunnable = r; | 135 linkedRunnable->fRunnable = r; |
136 fReady.lock(); | 136 fReady.lock(); |
137 SkASSERT(fState != kHalting_State); // Shouldn't be able to add work when w
e're halting. | 137 SkASSERT(fState != kHalting_State); // Shouldn't be able to add work when w
e're halting. |
138 (fQueue.*f)(linkedRunnable); | 138 (fQueue.*f)(linkedRunnable); |
139 fReady.signal(); | 139 fReady.signal(); |
140 fReady.unlock(); | 140 fReady.unlock(); |
141 } | 141 } |
142 | 142 |
143 template <typename T> | 143 template <typename T> |
144 void SkTThreadPool<T>::add(SkTRunnable<T>* r) { | 144 void TThreadPool<T>::add(TRunnable<T>* r) { |
145 this->addSomewhere(r, &SkTInternalLList<LinkedRunnable>::addToTail); | 145 this->addSomewhere(r, &SkTInternalLList<LinkedRunnable>::addToTail); |
146 } | 146 } |
147 | 147 |
148 template <typename T> | 148 template <typename T> |
149 void SkTThreadPool<T>::addNext(SkTRunnable<T>* r) { | 149 void TThreadPool<T>::addNext(TRunnable<T>* r) { |
150 this->addSomewhere(r, &SkTInternalLList<LinkedRunnable>::addToHead); | 150 this->addSomewhere(r, &SkTInternalLList<LinkedRunnable>::addToHead); |
151 } | 151 } |
152 | 152 |
153 | 153 |
154 template <typename T> | 154 template <typename T> |
155 void SkTThreadPool<T>::wait() { | 155 void TThreadPool<T>::wait() { |
156 fReady.lock(); | 156 fReady.lock(); |
157 fState = kWaiting_State; | 157 fState = kWaiting_State; |
158 fReady.broadcast(); | 158 fReady.broadcast(); |
159 fReady.unlock(); | 159 fReady.unlock(); |
160 | 160 |
161 // Wait for all threads to stop. | 161 // Wait for all threads to stop. |
162 for (int i = 0; i < fThreads.count(); i++) { | 162 for (int i = 0; i < fThreads.count(); i++) { |
163 fThreads[i]->join(); | 163 fThreads[i]->join(); |
164 SkDELETE(fThreads[i]); | 164 SkDELETE(fThreads[i]); |
165 } | 165 } |
166 SkASSERT(fQueue.isEmpty()); | 166 SkASSERT(fQueue.isEmpty()); |
167 } | 167 } |
168 | 168 |
169 template <typename T> | 169 template <typename T> |
170 /*static*/ void SkTThreadPool<T>::Loop(void* arg) { | 170 /*static*/ void TThreadPool<T>::Loop(void* arg) { |
171 // The SkTThreadPool passes itself as arg to each thread as they're created. | 171 // The TThreadPool passes itself as arg to each thread as they're created. |
172 SkTThreadPool<T>* pool = static_cast<SkTThreadPool<T>*>(arg); | 172 TThreadPool<T>* pool = static_cast<TThreadPool<T>*>(arg); |
173 SkThreadPoolPrivate::ThreadLocal<T> threadLocal; | 173 ThreadPoolPrivate::ThreadLocal<T> threadLocal; |
174 | 174 |
175 while (true) { | 175 while (true) { |
176 // We have to be holding the lock to read the queue and to call wait. | 176 // We have to be holding the lock to read the queue and to call wait. |
177 pool->fReady.lock(); | 177 pool->fReady.lock(); |
178 while(pool->fQueue.isEmpty()) { | 178 while(pool->fQueue.isEmpty()) { |
179 // Does the client want to stop and are all the threads ready to sto
p? | 179 // Does the client want to stop and are all the threads ready to sto
p? |
180 // If so, we move into the halting state, and whack all the threads
so they notice. | 180 // If so, we move into the halting state, and whack all the threads
so they notice. |
181 if (kWaiting_State == pool->fState && pool->fBusyThreads == 0) { | 181 if (kWaiting_State == pool->fState && pool->fBusyThreads == 0) { |
182 pool->fState = kHalting_State; | 182 pool->fState = kHalting_State; |
183 pool->fReady.broadcast(); | 183 pool->fReady.broadcast(); |
184 } | 184 } |
185 // Any time we find ourselves in the halting state, it's quitting ti
me. | 185 // Any time we find ourselves in the halting state, it's quitting ti
me. |
186 if (kHalting_State == pool->fState) { | 186 if (kHalting_State == pool->fState) { |
187 pool->fReady.unlock(); | 187 pool->fReady.unlock(); |
188 return; | 188 return; |
189 } | 189 } |
190 // wait yields the lock while waiting, but will have it again when a
woken. | 190 // wait yields the lock while waiting, but will have it again when a
woken. |
191 pool->fReady.wait(); | 191 pool->fReady.wait(); |
192 } | 192 } |
193 // We've got the lock back here, no matter if we ran wait or not. | 193 // We've got the lock back here, no matter if we ran wait or not. |
194 | 194 |
195 // The queue is not empty, so we have something to run. Claim it. | 195 // The queue is not empty, so we have something to run. Claim it. |
196 LinkedRunnable* r = pool->fQueue.head(); | 196 LinkedRunnable* r = pool->fQueue.head(); |
197 | 197 |
198 pool->fQueue.remove(r); | 198 pool->fQueue.remove(r); |
199 | 199 |
200 // Having claimed our SkRunnable, we now give up the lock while we run i
t. | 200 // Having claimed our Runnable, we now give up the lock while we run it. |
201 // Otherwise, we'd only ever do work on one thread at a time, which rath
er | 201 // Otherwise, we'd only ever do work on one thread at a time, which rath
er |
202 // defeats the point of this code. | 202 // defeats the point of this code. |
203 pool->fBusyThreads++; | 203 pool->fBusyThreads++; |
204 pool->fReady.unlock(); | 204 pool->fReady.unlock(); |
205 | 205 |
206 // OK, now really do the work. | 206 // OK, now really do the work. |
207 threadLocal.run(r->fRunnable); | 207 threadLocal.run(r->fRunnable); |
208 SkDELETE(r); | 208 SkDELETE(r); |
209 | 209 |
210 // Let everyone know we're not busy. | 210 // Let everyone know we're not busy. |
211 pool->fReady.lock(); | 211 pool->fReady.lock(); |
212 pool->fBusyThreads--; | 212 pool->fBusyThreads--; |
213 pool->fReady.unlock(); | 213 pool->fReady.unlock(); |
214 } | 214 } |
215 | 215 |
216 SkASSERT(false); // Unreachable. The only exit happens when pool->fState is
kHalting_State. | 216 SkASSERT(false); // Unreachable. The only exit happens when pool->fState is
kHalting_State. |
217 } | 217 } |
218 | 218 |
219 typedef SkTThreadPool<void> SkThreadPool; | 219 typedef TThreadPool<void> ThreadPool; |
220 | 220 |
221 #endif | 221 #endif |
OLD | NEW |