OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * Copyright 2012 Google Inc. |
| 3 * |
| 4 * Use of this source code is governed by a BSD-style license that can be |
| 5 * found in the LICENSE file. |
| 6 */ |
| 7 |
| 8 #ifndef SkThreadPool_DEFINED |
| 9 #define SkThreadPool_DEFINED |
| 10 |
| 11 #include "SkCondVar.h" |
| 12 #include "SkRunnable.h" |
| 13 #include "SkTDArray.h" |
| 14 #include "SkTInternalLList.h" |
| 15 #include "SkThreadUtils.h" |
| 16 #include "SkTypes.h" |
| 17 |
| 18 #if defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_
FOR_ANDROID) |
| 19 # include <unistd.h> |
| 20 #endif |
| 21 |
| 22 // Returns the number of cores on this machine. |
| 23 static inline int num_cores() { |
| 24 #if defined(SK_BUILD_FOR_WIN32) |
| 25 SYSTEM_INFO sysinfo; |
| 26 GetSystemInfo(&sysinfo); |
| 27 return sysinfo.dwNumberOfProcessors; |
| 28 #elif defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUIL
D_FOR_ANDROID) |
| 29 return (int) sysconf(_SC_NPROCESSORS_ONLN); |
| 30 #else |
| 31 return 1; |
| 32 #endif |
| 33 } |
| 34 |
| 35 template <typename T> |
| 36 class SkTThreadPool { |
| 37 public: |
| 38 /** |
| 39 * Create a threadpool with count threads, or one thread per core if kThread
PerCore. |
| 40 */ |
| 41 static const int kThreadPerCore = -1; |
| 42 explicit SkTThreadPool(int count); |
| 43 ~SkTThreadPool(); |
| 44 |
| 45 /** |
| 46 * Queues up an SkRunnable to run when a thread is available, or synchronous
ly if count is 0. |
| 47 * Does not take ownership. NULL is a safe no-op. If T is not void, the run
nable will be passed |
| 48 * a reference to a T on the thread's local stack. |
| 49 */ |
| 50 void add(SkTRunnable<T>*); |
| 51 |
| 52 /** |
| 53 * Same as add, but adds the runnable as the very next to run rather than en
queueing it. |
| 54 */ |
| 55 void addNext(SkTRunnable<T>*); |
| 56 |
| 57 /** |
| 58 * Block until all added SkRunnables have completed. Once called, calling a
dd() is undefined. |
| 59 */ |
| 60 void wait(); |
| 61 |
| 62 private: |
| 63 struct LinkedRunnable { |
| 64 SkTRunnable<T>* fRunnable; // Unowned. |
| 65 SK_DECLARE_INTERNAL_LLIST_INTERFACE(LinkedRunnable); |
| 66 }; |
| 67 |
| 68 enum State { |
| 69 kRunning_State, // Normal case. We've been constructed and no one has
called wait(). |
| 70 kWaiting_State, // wait has been called, but there still might be work
to do or being done. |
| 71 kHalting_State, // There's no work to do and no thread is busy. All th
reads can shut down. |
| 72 }; |
| 73 |
| 74 void addSomewhere(SkTRunnable<T>* r, |
| 75 void (SkTInternalLList<LinkedRunnable>::*)(LinkedRunnable*
)); |
| 76 |
| 77 SkTInternalLList<LinkedRunnable> fQueue; |
| 78 SkCondVar fReady; |
| 79 SkTDArray<SkThread*> fThreads; |
| 80 State fState; |
| 81 int fBusyThreads; |
| 82 |
| 83 static void Loop(void*); // Static because we pass in this. |
| 84 }; |
| 85 |
| 86 template <typename T> |
| 87 SkTThreadPool<T>::SkTThreadPool(int count) : fState(kRunning_State), fBusyThread
s(0) { |
| 88 if (count < 0) { |
| 89 count = num_cores(); |
| 90 } |
| 91 // Create count threads, all running SkTThreadPool::Loop. |
| 92 for (int i = 0; i < count; i++) { |
| 93 SkThread* thread = SkNEW_ARGS(SkThread, (&SkTThreadPool::Loop, this)); |
| 94 *fThreads.append() = thread; |
| 95 thread->start(); |
| 96 } |
| 97 } |
| 98 |
| 99 template <typename T> |
| 100 SkTThreadPool<T>::~SkTThreadPool() { |
| 101 if (kRunning_State == fState) { |
| 102 this->wait(); |
| 103 } |
| 104 } |
| 105 |
| 106 namespace SkThreadPoolPrivate { |
| 107 |
| 108 template <typename T> |
| 109 struct ThreadLocal { |
| 110 void run(SkTRunnable<T>* r) { r->run(data); } |
| 111 T data; |
| 112 }; |
| 113 |
| 114 template <> |
| 115 struct ThreadLocal<void> { |
| 116 void run(SkTRunnable<void>* r) { r->run(); } |
| 117 }; |
| 118 |
| 119 } // namespace SkThreadPoolPrivate |
| 120 |
| 121 template <typename T> |
| 122 void SkTThreadPool<T>::addSomewhere(SkTRunnable<T>* r, |
| 123 void (SkTInternalLList<LinkedRunnable>::* f)
(LinkedRunnable*)) { |
| 124 if (r == NULL) { |
| 125 return; |
| 126 } |
| 127 |
| 128 if (fThreads.isEmpty()) { |
| 129 SkThreadPoolPrivate::ThreadLocal<T> threadLocal; |
| 130 threadLocal.run(r); |
| 131 return; |
| 132 } |
| 133 |
| 134 LinkedRunnable* linkedRunnable = SkNEW(LinkedRunnable); |
| 135 linkedRunnable->fRunnable = r; |
| 136 fReady.lock(); |
| 137 SkASSERT(fState != kHalting_State); // Shouldn't be able to add work when w
e're halting. |
| 138 (fQueue.*f)(linkedRunnable); |
| 139 fReady.signal(); |
| 140 fReady.unlock(); |
| 141 } |
| 142 |
| 143 template <typename T> |
| 144 void SkTThreadPool<T>::add(SkTRunnable<T>* r) { |
| 145 this->addSomewhere(r, &SkTInternalLList<LinkedRunnable>::addToTail); |
| 146 } |
| 147 |
| 148 template <typename T> |
| 149 void SkTThreadPool<T>::addNext(SkTRunnable<T>* r) { |
| 150 this->addSomewhere(r, &SkTInternalLList<LinkedRunnable>::addToHead); |
| 151 } |
| 152 |
| 153 |
| 154 template <typename T> |
| 155 void SkTThreadPool<T>::wait() { |
| 156 fReady.lock(); |
| 157 fState = kWaiting_State; |
| 158 fReady.broadcast(); |
| 159 fReady.unlock(); |
| 160 |
| 161 // Wait for all threads to stop. |
| 162 for (int i = 0; i < fThreads.count(); i++) { |
| 163 fThreads[i]->join(); |
| 164 SkDELETE(fThreads[i]); |
| 165 } |
| 166 SkASSERT(fQueue.isEmpty()); |
| 167 } |
| 168 |
| 169 template <typename T> |
| 170 /*static*/ void SkTThreadPool<T>::Loop(void* arg) { |
| 171 // The SkTThreadPool passes itself as arg to each thread as they're created. |
| 172 SkTThreadPool<T>* pool = static_cast<SkTThreadPool<T>*>(arg); |
| 173 SkThreadPoolPrivate::ThreadLocal<T> threadLocal; |
| 174 |
| 175 while (true) { |
| 176 // We have to be holding the lock to read the queue and to call wait. |
| 177 pool->fReady.lock(); |
| 178 while(pool->fQueue.isEmpty()) { |
| 179 // Does the client want to stop and are all the threads ready to sto
p? |
| 180 // If so, we move into the halting state, and whack all the threads
so they notice. |
| 181 if (kWaiting_State == pool->fState && pool->fBusyThreads == 0) { |
| 182 pool->fState = kHalting_State; |
| 183 pool->fReady.broadcast(); |
| 184 } |
| 185 // Any time we find ourselves in the halting state, it's quitting ti
me. |
| 186 if (kHalting_State == pool->fState) { |
| 187 pool->fReady.unlock(); |
| 188 return; |
| 189 } |
| 190 // wait yields the lock while waiting, but will have it again when a
woken. |
| 191 pool->fReady.wait(); |
| 192 } |
| 193 // We've got the lock back here, no matter if we ran wait or not. |
| 194 |
| 195 // The queue is not empty, so we have something to run. Claim it. |
| 196 LinkedRunnable* r = pool->fQueue.head(); |
| 197 |
| 198 pool->fQueue.remove(r); |
| 199 |
| 200 // Having claimed our SkRunnable, we now give up the lock while we run i
t. |
| 201 // Otherwise, we'd only ever do work on one thread at a time, which rath
er |
| 202 // defeats the point of this code. |
| 203 pool->fBusyThreads++; |
| 204 pool->fReady.unlock(); |
| 205 |
| 206 // OK, now really do the work. |
| 207 threadLocal.run(r->fRunnable); |
| 208 SkDELETE(r); |
| 209 |
| 210 // Let everyone know we're not busy. |
| 211 pool->fReady.lock(); |
| 212 pool->fBusyThreads--; |
| 213 pool->fReady.unlock(); |
| 214 } |
| 215 |
| 216 SkASSERT(false); // Unreachable. The only exit happens when pool->fState is
kHalting_State. |
| 217 } |
| 218 |
| 219 typedef SkTThreadPool<void> SkThreadPool; |
| 220 |
| 221 #endif |
OLD | NEW |