OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2012 Google Inc. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license that can be | |
5 * found in the LICENSE file. | |
6 */ | |
7 | |
8 #ifndef SkThreadPool_DEFINED | |
9 #define SkThreadPool_DEFINED | |
10 | |
11 #include "SkCondVar.h" | |
12 #include "SkRunnable.h" | |
13 #include "SkTDArray.h" | |
14 #include "SkTInternalLList.h" | |
15 #include "SkThreadUtils.h" | |
16 #include "SkTypes.h" | |
17 | |
18 #if defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_
FOR_ANDROID) | |
19 # include <unistd.h> | |
20 #endif | |
21 | |
22 // Returns the number of cores on this machine. | |
23 static inline int num_cores() { | |
24 #if defined(SK_BUILD_FOR_WIN32) | |
25 SYSTEM_INFO sysinfo; | |
26 GetSystemInfo(&sysinfo); | |
27 return sysinfo.dwNumberOfProcessors; | |
28 #elif defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUIL
D_FOR_ANDROID) | |
29 return (int) sysconf(_SC_NPROCESSORS_ONLN); | |
30 #else | |
31 return 1; | |
32 #endif | |
33 } | |
34 | |
35 template <typename T> | |
36 class SkTThreadPool { | |
37 public: | |
38 /** | |
39 * Create a threadpool with count threads, or one thread per core if kThread
PerCore. | |
40 */ | |
41 static const int kThreadPerCore = -1; | |
42 explicit SkTThreadPool(int count); | |
43 ~SkTThreadPool(); | |
44 | |
45 /** | |
46 * Queues up an SkRunnable to run when a thread is available, or synchronous
ly if count is 0. | |
47 * Does not take ownership. NULL is a safe no-op. If T is not void, the run
nable will be passed | |
48 * a reference to a T on the thread's local stack. | |
49 */ | |
50 void add(SkTRunnable<T>*); | |
51 | |
52 /** | |
53 * Same as add, but adds the runnable as the very next to run rather than en
queueing it. | |
54 */ | |
55 void addNext(SkTRunnable<T>*); | |
56 | |
57 /** | |
58 * Block until all added SkRunnables have completed. Once called, calling a
dd() is undefined. | |
59 */ | |
60 void wait(); | |
61 | |
62 private: | |
63 struct LinkedRunnable { | |
64 SkTRunnable<T>* fRunnable; // Unowned. | |
65 SK_DECLARE_INTERNAL_LLIST_INTERFACE(LinkedRunnable); | |
66 }; | |
67 | |
68 enum State { | |
69 kRunning_State, // Normal case. We've been constructed and no one has
called wait(). | |
70 kWaiting_State, // wait has been called, but there still might be work
to do or being done. | |
71 kHalting_State, // There's no work to do and no thread is busy. All th
reads can shut down. | |
72 }; | |
73 | |
74 void addSomewhere(SkTRunnable<T>* r, | |
75 void (SkTInternalLList<LinkedRunnable>::*)(LinkedRunnable*
)); | |
76 | |
77 SkTInternalLList<LinkedRunnable> fQueue; | |
78 SkCondVar fReady; | |
79 SkTDArray<SkThread*> fThreads; | |
80 State fState; | |
81 int fBusyThreads; | |
82 | |
83 static void Loop(void*); // Static because we pass in this. | |
84 }; | |
85 | |
86 template <typename T> | |
87 SkTThreadPool<T>::SkTThreadPool(int count) : fState(kRunning_State), fBusyThread
s(0) { | |
88 if (count < 0) { | |
89 count = num_cores(); | |
90 } | |
91 // Create count threads, all running SkTThreadPool::Loop. | |
92 for (int i = 0; i < count; i++) { | |
93 SkThread* thread = SkNEW_ARGS(SkThread, (&SkTThreadPool::Loop, this)); | |
94 *fThreads.append() = thread; | |
95 thread->start(); | |
96 } | |
97 } | |
98 | |
99 template <typename T> | |
100 SkTThreadPool<T>::~SkTThreadPool() { | |
101 if (kRunning_State == fState) { | |
102 this->wait(); | |
103 } | |
104 } | |
105 | |
106 namespace SkThreadPoolPrivate { | |
107 | |
108 template <typename T> | |
109 struct ThreadLocal { | |
110 void run(SkTRunnable<T>* r) { r->run(data); } | |
111 T data; | |
112 }; | |
113 | |
114 template <> | |
115 struct ThreadLocal<void> { | |
116 void run(SkTRunnable<void>* r) { r->run(); } | |
117 }; | |
118 | |
119 } // namespace SkThreadPoolPrivate | |
120 | |
121 template <typename T> | |
122 void SkTThreadPool<T>::addSomewhere(SkTRunnable<T>* r, | |
123 void (SkTInternalLList<LinkedRunnable>::* f)
(LinkedRunnable*)) { | |
124 if (r == NULL) { | |
125 return; | |
126 } | |
127 | |
128 if (fThreads.isEmpty()) { | |
129 SkThreadPoolPrivate::ThreadLocal<T> threadLocal; | |
130 threadLocal.run(r); | |
131 return; | |
132 } | |
133 | |
134 LinkedRunnable* linkedRunnable = SkNEW(LinkedRunnable); | |
135 linkedRunnable->fRunnable = r; | |
136 fReady.lock(); | |
137 SkASSERT(fState != kHalting_State); // Shouldn't be able to add work when w
e're halting. | |
138 (fQueue.*f)(linkedRunnable); | |
139 fReady.signal(); | |
140 fReady.unlock(); | |
141 } | |
142 | |
143 template <typename T> | |
144 void SkTThreadPool<T>::add(SkTRunnable<T>* r) { | |
145 this->addSomewhere(r, &SkTInternalLList<LinkedRunnable>::addToTail); | |
146 } | |
147 | |
148 template <typename T> | |
149 void SkTThreadPool<T>::addNext(SkTRunnable<T>* r) { | |
150 this->addSomewhere(r, &SkTInternalLList<LinkedRunnable>::addToHead); | |
151 } | |
152 | |
153 | |
154 template <typename T> | |
155 void SkTThreadPool<T>::wait() { | |
156 fReady.lock(); | |
157 fState = kWaiting_State; | |
158 fReady.broadcast(); | |
159 fReady.unlock(); | |
160 | |
161 // Wait for all threads to stop. | |
162 for (int i = 0; i < fThreads.count(); i++) { | |
163 fThreads[i]->join(); | |
164 SkDELETE(fThreads[i]); | |
165 } | |
166 SkASSERT(fQueue.isEmpty()); | |
167 } | |
168 | |
169 template <typename T> | |
170 /*static*/ void SkTThreadPool<T>::Loop(void* arg) { | |
171 // The SkTThreadPool passes itself as arg to each thread as they're created. | |
172 SkTThreadPool<T>* pool = static_cast<SkTThreadPool<T>*>(arg); | |
173 SkThreadPoolPrivate::ThreadLocal<T> threadLocal; | |
174 | |
175 while (true) { | |
176 // We have to be holding the lock to read the queue and to call wait. | |
177 pool->fReady.lock(); | |
178 while(pool->fQueue.isEmpty()) { | |
179 // Does the client want to stop and are all the threads ready to sto
p? | |
180 // If so, we move into the halting state, and whack all the threads
so they notice. | |
181 if (kWaiting_State == pool->fState && pool->fBusyThreads == 0) { | |
182 pool->fState = kHalting_State; | |
183 pool->fReady.broadcast(); | |
184 } | |
185 // Any time we find ourselves in the halting state, it's quitting ti
me. | |
186 if (kHalting_State == pool->fState) { | |
187 pool->fReady.unlock(); | |
188 return; | |
189 } | |
190 // wait yields the lock while waiting, but will have it again when a
woken. | |
191 pool->fReady.wait(); | |
192 } | |
193 // We've got the lock back here, no matter if we ran wait or not. | |
194 | |
195 // The queue is not empty, so we have something to run. Claim it. | |
196 LinkedRunnable* r = pool->fQueue.head(); | |
197 | |
198 pool->fQueue.remove(r); | |
199 | |
200 // Having claimed our SkRunnable, we now give up the lock while we run i
t. | |
201 // Otherwise, we'd only ever do work on one thread at a time, which rath
er | |
202 // defeats the point of this code. | |
203 pool->fBusyThreads++; | |
204 pool->fReady.unlock(); | |
205 | |
206 // OK, now really do the work. | |
207 threadLocal.run(r->fRunnable); | |
208 SkDELETE(r); | |
209 | |
210 // Let everyone know we're not busy. | |
211 pool->fReady.lock(); | |
212 pool->fBusyThreads--; | |
213 pool->fReady.unlock(); | |
214 } | |
215 | |
216 SkASSERT(false); // Unreachable. The only exit happens when pool->fState is
kHalting_State. | |
217 } | |
218 | |
219 typedef SkTThreadPool<void> SkThreadPool; | |
220 | |
221 #endif | |
OLD | NEW |