OLD | NEW |
1 #include "SkTaskGroup.h" | 1 #include "SkTaskGroup.h" |
2 | 2 |
3 #include "SkCondVar.h" | 3 #include "SkCondVar.h" |
4 #include "SkTDArray.h" | 4 #include "SkTDArray.h" |
5 #include "SkThread.h" | 5 #include "SkThread.h" |
6 #include "SkThreadUtils.h" | 6 #include "SkThreadUtils.h" |
7 | 7 |
8 #if defined(SK_BUILD_FOR_WIN32) | 8 #if defined(SK_BUILD_FOR_WIN32) |
9 static inline int num_cores() { | 9 static inline int num_cores() { |
10 SYSTEM_INFO sysinfo; | 10 SYSTEM_INFO sysinfo; |
11 GetSystemInfo(&sysinfo); | 11 GetSystemInfo(&sysinfo); |
12 return sysinfo.dwNumberOfProcessors; | 12 return sysinfo.dwNumberOfProcessors; |
13 } | 13 } |
14 #else | 14 #else |
15 #include <unistd.h> | 15 #include <unistd.h> |
16 static inline int num_cores() { | 16 static inline int num_cores() { |
17 return (int) sysconf(_SC_NPROCESSORS_ONLN); | 17 return (int) sysconf(_SC_NPROCESSORS_ONLN); |
18 } | 18 } |
19 #endif | 19 #endif |
20 | 20 |
21 namespace { | 21 namespace { |
22 | 22 |
23 class ThreadPool : SkNoncopyable { | 23 class ThreadPool : SkNoncopyable { |
24 public: | 24 public: |
25 static void Add(SkRunnable* task, int32_t* pending) { | 25 static void Add(SkRunnable* task, int32_t* pending) { |
26 if (!gGlobal) { // If we have no threads, run synchronously. | 26 if (!gGlobal) { // If we have no threads, run synchronously. |
27 return task->run(); | 27 return task->run(); |
28 } | 28 } |
29 gGlobal->add(task, pending); | 29 gGlobal->add(&CallRunnable, task, pending); |
| 30 } |
| 31 |
| 32 static void Add(void (*fn)(void*), void* arg, int32_t* pending) { |
| 33 if (!gGlobal) { |
| 34 return fn(arg); |
| 35 } |
| 36 gGlobal->add(fn, arg, pending); |
30 } | 37 } |
31 | 38 |
32 static void Wait(int32_t* pending) { | 39 static void Wait(int32_t* pending) { |
33 if (!gGlobal) { // If we have no threads, the work must already be done
. | 40 if (!gGlobal) { // If we have no threads, the work must already be done
. |
34 SkASSERT(*pending == 0); | 41 SkASSERT(*pending == 0); |
35 return; | 42 return; |
36 } | 43 } |
37 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here
or in Loop. | 44 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here
or in Loop. |
38 // Lend a hand until our SkTaskGroup of interest is done. | 45 // Lend a hand until our SkTaskGroup of interest is done. |
39 Work work; | 46 Work work; |
40 { | 47 { |
41 AutoLock lock(&gGlobal->fReady); | 48 AutoLock lock(&gGlobal->fReady); |
42 if (gGlobal->fWork.isEmpty()) { | 49 if (gGlobal->fWork.isEmpty()) { |
43 // Someone has picked up all the work (including ours). How
nice of them! | 50 // Someone has picked up all the work (including ours). How
nice of them! |
44 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) | 51 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) |
45 continue; | 52 continue; |
46 } | 53 } |
47 gGlobal->fWork.pop(&work); | 54 gGlobal->fWork.pop(&work); |
48 } | 55 } |
49 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. | 56 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. |
50 // We threads gotta stick together. We're always making forward pro
gress. | 57 // We threads gotta stick together. We're always making forward pro
gress. |
51 work.task->run(); | 58 work.fn(work.arg); |
52 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_l
oad() just above. | 59 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_l
oad() just above. |
53 } | 60 } |
54 } | 61 } |
55 | 62 |
56 private: | 63 private: |
57 struct AutoLock { | 64 struct AutoLock { |
58 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } | 65 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } |
59 ~AutoLock() { fC->unlock(); } | 66 ~AutoLock() { fC->unlock(); } |
60 private: | 67 private: |
61 SkCondVar* fC; | 68 SkCondVar* fC; |
62 }; | 69 }; |
63 | 70 |
| 71 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} |
| 72 |
64 struct Work { | 73 struct Work { |
65 SkRunnable* task; // A task to ->run(), | 74 void (*fn)(void*); // A function to call, |
66 int32_t* pending; // then sk_atomic_dec(pending) afterwards. | 75 void* arg; // its argument, |
| 76 int32_t* pending; // then sk_atomic_dec(pending) afterwards. |
67 }; | 77 }; |
68 | 78 |
69 explicit ThreadPool(int threads) : fDraining(false) { | 79 explicit ThreadPool(int threads) : fDraining(false) { |
70 if (threads == -1) { | 80 if (threads == -1) { |
71 threads = num_cores(); | 81 threads = num_cores(); |
72 } | 82 } |
73 for (int i = 0; i < threads; i++) { | 83 for (int i = 0; i < threads; i++) { |
74 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | 84 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); |
75 fThreads.top()->start(); | 85 fThreads.top()->start(); |
76 } | 86 } |
77 } | 87 } |
78 | 88 |
79 ~ThreadPool() { | 89 ~ThreadPool() { |
80 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. | 90 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. |
81 { | 91 { |
82 AutoLock lock(&fReady); | 92 AutoLock lock(&fReady); |
83 fDraining = true; | 93 fDraining = true; |
84 fReady.broadcast(); | 94 fReady.broadcast(); |
85 } | 95 } |
86 for (int i = 0; i < fThreads.count(); i++) { | 96 for (int i = 0; i < fThreads.count(); i++) { |
87 fThreads[i]->join(); | 97 fThreads[i]->join(); |
88 } | 98 } |
89 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | 99 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. |
90 fThreads.deleteAll(); | 100 fThreads.deleteAll(); |
91 } | 101 } |
92 | 102 |
93 void add(SkRunnable* task, int32_t* pending) { | 103 void add(void (*fn)(void*), void* arg, int32_t* pending) { |
94 Work work = { task, pending }; | 104 Work work = { fn, arg, pending }; |
95 sk_atomic_inc(pending); // No barrier needed. | 105 sk_atomic_inc(pending); // No barrier needed. |
96 { | 106 { |
97 AutoLock lock(&fReady); | 107 AutoLock lock(&fReady); |
98 fWork.push(work); | 108 fWork.push(work); |
99 fReady.signal(); | 109 fReady.signal(); |
100 } | 110 } |
101 } | 111 } |
102 | 112 |
103 static void Loop(void* arg) { | 113 static void Loop(void* arg) { |
104 ThreadPool* pool = (ThreadPool*)arg; | 114 ThreadPool* pool = (ThreadPool*)arg; |
105 Work work; | 115 Work work; |
106 while (true) { | 116 while (true) { |
107 { | 117 { |
108 AutoLock lock(&pool->fReady); | 118 AutoLock lock(&pool->fReady); |
109 while (pool->fWork.isEmpty()) { | 119 while (pool->fWork.isEmpty()) { |
110 if (pool->fDraining) { | 120 if (pool->fDraining) { |
111 return; | 121 return; |
112 } | 122 } |
113 pool->fReady.wait(); | 123 pool->fReady.wait(); |
114 } | 124 } |
115 pool->fWork.pop(&work); | 125 pool->fWork.pop(&work); |
116 } | 126 } |
117 work.task->run(); | 127 work.fn(work.arg); |
118 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load(
) in Wait(). | 128 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load(
) in Wait(). |
119 } | 129 } |
120 } | 130 } |
121 | 131 |
122 SkTDArray<Work> fWork; | 132 SkTDArray<Work> fWork; |
123 SkTDArray<SkThread*> fThreads; | 133 SkTDArray<SkThread*> fThreads; |
124 SkCondVar fReady; | 134 SkCondVar fReady; |
125 bool fDraining; | 135 bool fDraining; |
126 | 136 |
127 static ThreadPool* gGlobal; | 137 static ThreadPool* gGlobal; |
128 friend struct SkTaskGroup::Enabler; | 138 friend struct SkTaskGroup::Enabler; |
129 }; | 139 }; |
130 ThreadPool* ThreadPool::gGlobal = NULL; | 140 ThreadPool* ThreadPool::gGlobal = NULL; |
131 | 141 |
132 } // namespace | 142 } // namespace |
133 | 143 |
134 SkTaskGroup::Enabler::Enabler(int threads) { | 144 SkTaskGroup::Enabler::Enabler(int threads) { |
135 SkASSERT(ThreadPool::gGlobal == NULL); | 145 SkASSERT(ThreadPool::gGlobal == NULL); |
136 if (threads != 0) { | 146 if (threads != 0) { |
137 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); | 147 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); |
138 } | 148 } |
139 } | 149 } |
140 | 150 |
141 SkTaskGroup::Enabler::~Enabler() { | 151 SkTaskGroup::Enabler::~Enabler() { |
142 SkDELETE(ThreadPool::gGlobal); | 152 SkDELETE(ThreadPool::gGlobal); |
143 } | 153 } |
144 | 154 |
145 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 155 SkTaskGroup::SkTaskGroup() : fPending(0) {} |
146 | 156 |
147 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); } | 157 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } |
148 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } | 158 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } |
| 159 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } |
149 | 160 |
OLD | NEW |