OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2012 Google Inc. | 2 * Copyright 2012 Google Inc. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license that can be | 4 * Use of this source code is governed by a BSD-style license that can be |
5 * found in the LICENSE file. | 5 * found in the LICENSE file. |
6 */ | 6 */ |
7 | 7 |
8 #include "SkRunnable.h" | 8 #include "SkRunnable.h" |
9 #include "SkThreadPool.h" | 9 #include "SkThreadPool.h" |
10 #include "SkThreadUtils.h" | 10 #include "SkThreadUtils.h" |
(...skipping 10 matching lines...) Expand all Loading... |
21 GetSystemInfo(&sysinfo); | 21 GetSystemInfo(&sysinfo); |
22 return sysinfo.dwNumberOfProcessors; | 22 return sysinfo.dwNumberOfProcessors; |
23 #elif defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUIL
D_FOR_ANDROID) | 23 #elif defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUIL
D_FOR_ANDROID) |
24 return sysconf(_SC_NPROCESSORS_ONLN); | 24 return sysconf(_SC_NPROCESSORS_ONLN); |
25 #else | 25 #else |
26 return 1; | 26 return 1; |
27 #endif | 27 #endif |
28 } | 28 } |
29 | 29 |
30 SkThreadPool::SkThreadPool(int count) | 30 SkThreadPool::SkThreadPool(int count) |
31 : fDone(false) { | 31 : fState(kRunning_State), fBusyThreads(0) { |
32 if (count < 0) count = num_cores(); | 32 if (count < 0) count = num_cores(); |
33 // Create count threads, all running SkThreadPool::Loop. | 33 // Create count threads, all running SkThreadPool::Loop. |
34 for (int i = 0; i < count; i++) { | 34 for (int i = 0; i < count; i++) { |
35 SkThread* thread = SkNEW_ARGS(SkThread, (&SkThreadPool::Loop, this)); | 35 SkThread* thread = SkNEW_ARGS(SkThread, (&SkThreadPool::Loop, this)); |
36 *fThreads.append() = thread; | 36 *fThreads.append() = thread; |
37 thread->start(); | 37 thread->start(); |
38 } | 38 } |
39 } | 39 } |
40 | 40 |
41 SkThreadPool::~SkThreadPool() { | 41 SkThreadPool::~SkThreadPool() { |
42 if (!fDone) { | 42 if (kRunning_State == fState) { |
43 this->wait(); | 43 this->wait(); |
44 } | 44 } |
45 } | 45 } |
46 | 46 |
47 void SkThreadPool::wait() { | 47 void SkThreadPool::wait() { |
48 fReady.lock(); | 48 fReady.lock(); |
49 fDone = true; | 49 fState = kWaiting_State; |
50 fReady.broadcast(); | 50 fReady.broadcast(); |
51 fReady.unlock(); | 51 fReady.unlock(); |
52 | 52 |
53 // Wait for all threads to stop. | 53 // Wait for all threads to stop. |
54 for (int i = 0; i < fThreads.count(); i++) { | 54 for (int i = 0; i < fThreads.count(); i++) { |
55 fThreads[i]->join(); | 55 fThreads[i]->join(); |
56 SkDELETE(fThreads[i]); | 56 SkDELETE(fThreads[i]); |
57 } | 57 } |
| 58 SkASSERT(fQueue.isEmpty()); |
58 } | 59 } |
59 | 60 |
60 /*static*/ void SkThreadPool::Loop(void* arg) { | 61 /*static*/ void SkThreadPool::Loop(void* arg) { |
61 // The SkThreadPool passes itself as arg to each thread as they're created. | 62 // The SkThreadPool passes itself as arg to each thread as they're created. |
62 SkThreadPool* pool = static_cast<SkThreadPool*>(arg); | 63 SkThreadPool* pool = static_cast<SkThreadPool*>(arg); |
63 | 64 |
64 while (true) { | 65 while (true) { |
65 // We have to be holding the lock to read the queue and to call wait. | 66 // We have to be holding the lock to read the queue and to call wait. |
66 pool->fReady.lock(); | 67 pool->fReady.lock(); |
67 while(pool->fQueue.isEmpty()) { | 68 while(pool->fQueue.isEmpty()) { |
68 // Is it time to die? | 69 // Does the client want to stop and are all the threads ready to sto
p? |
69 if (pool->fDone) { | 70 // If so, we move into the halting state, and whack all the threads
so they notice. |
| 71 if (kWaiting_State == pool->fState && pool->fBusyThreads == 0) { |
| 72 pool->fState = kHalting_State; |
| 73 pool->fReady.broadcast(); |
| 74 } |
| 75 // Any time we find ourselves in the halting state, it's quitting ti
me. |
| 76 if (kHalting_State == pool->fState) { |
70 pool->fReady.unlock(); | 77 pool->fReady.unlock(); |
71 return; | 78 return; |
72 } | 79 } |
73 // wait yields the lock while waiting, but will have it again when a
woken. | 80 // wait yields the lock while waiting, but will have it again when a
woken. |
74 pool->fReady.wait(); | 81 pool->fReady.wait(); |
75 } | 82 } |
76 // We've got the lock back here, no matter if we ran wait or not. | 83 // We've got the lock back here, no matter if we ran wait or not. |
77 | 84 |
78 // The queue is not empty, so we have something to run. Claim it. | 85 // The queue is not empty, so we have something to run. Claim it. |
79 LinkedRunnable* r = pool->fQueue.tail(); | 86 LinkedRunnable* r = pool->fQueue.tail(); |
80 | 87 |
81 pool->fQueue.remove(r); | 88 pool->fQueue.remove(r); |
82 | 89 |
83 // Having claimed our SkRunnable, we now give up the lock while we run i
t. | 90 // Having claimed our SkRunnable, we now give up the lock while we run i
t. |
84 // Otherwise, we'd only ever do work on one thread at a time, which rath
er | 91 // Otherwise, we'd only ever do work on one thread at a time, which rath
er |
85 // defeats the point of this code. | 92 // defeats the point of this code. |
| 93 pool->fBusyThreads++; |
86 pool->fReady.unlock(); | 94 pool->fReady.unlock(); |
87 | 95 |
88 // OK, now really do the work. | 96 // OK, now really do the work. |
89 r->fRunnable->run(); | 97 r->fRunnable->run(); |
90 SkDELETE(r); | 98 SkDELETE(r); |
| 99 |
| 100 // Let everyone know we're not busy. |
| 101 pool->fReady.lock(); |
| 102 pool->fBusyThreads--; |
| 103 pool->fReady.unlock(); |
91 } | 104 } |
92 | 105 |
93 SkASSERT(false); // Unreachable. The only exit happens when pool->fDone. | 106 SkASSERT(false); // Unreachable. The only exit happens when pool->fState is
kHalting_State. |
94 } | 107 } |
95 | 108 |
96 void SkThreadPool::add(SkRunnable* r) { | 109 void SkThreadPool::add(SkRunnable* r) { |
97 if (NULL == r) { | 110 if (NULL == r) { |
98 return; | 111 return; |
99 } | 112 } |
100 | 113 |
101 // If we don't have any threads, obligingly just run the thing now. | 114 // If we don't have any threads, obligingly just run the thing now. |
102 if (fThreads.isEmpty()) { | 115 if (fThreads.isEmpty()) { |
103 return r->run(); | 116 return r->run(); |
104 } | 117 } |
105 | 118 |
106 // We have some threads. Queue it up! | 119 // We have some threads. Queue it up! |
107 fReady.lock(); | 120 fReady.lock(); |
108 SkASSERT(!fDone); // We shouldn't be adding work to a pool that's shut down
. | 121 SkASSERT(fState != kHalting_State); // Shouldn't be able to add work when w
e're halting. |
109 LinkedRunnable* linkedRunnable = SkNEW(LinkedRunnable); | 122 LinkedRunnable* linkedRunnable = SkNEW(LinkedRunnable); |
110 linkedRunnable->fRunnable = r; | 123 linkedRunnable->fRunnable = r; |
111 fQueue.addToHead(linkedRunnable); | 124 fQueue.addToHead(linkedRunnable); |
112 fReady.signal(); | 125 fReady.signal(); |
113 fReady.unlock(); | 126 fReady.unlock(); |
114 } | 127 } |
OLD | NEW |