OLD | NEW |
| 1 /* |
| 2 * Copyright 2014 Google Inc. |
| 3 * |
| 4 * Use of this source code is governed by a BSD-style license that can be |
| 5 * found in the LICENSE file. |
| 6 */ |
| 7 |
1 #include "SkTaskGroup.h" | 8 #include "SkTaskGroup.h" |
2 | 9 |
3 #include "SkCondVar.h" | 10 #include "SkCondVar.h" |
4 #include "SkRunnable.h" | 11 #include "SkRunnable.h" |
5 #include "SkTDArray.h" | 12 #include "SkTDArray.h" |
6 #include "SkThread.h" | 13 #include "SkThread.h" |
7 #include "SkThreadUtils.h" | 14 #include "SkThreadUtils.h" |
8 | 15 |
9 #if defined(SK_BUILD_FOR_WIN32) | 16 #if defined(SK_BUILD_FOR_WIN32) |
10 static inline int num_cores() { | 17 static inline int num_cores() { |
11 SYSTEM_INFO sysinfo; | 18 SYSTEM_INFO sysinfo; |
12 GetSystemInfo(&sysinfo); | 19 GetSystemInfo(&sysinfo); |
13 return sysinfo.dwNumberOfProcessors; | 20 return sysinfo.dwNumberOfProcessors; |
14 } | 21 } |
15 #else | 22 #else |
16 #include <unistd.h> | 23 #include <unistd.h> |
17 static inline int num_cores() { | 24 static inline int num_cores() { |
18 return (int) sysconf(_SC_NPROCESSORS_ONLN); | 25 return (int) sysconf(_SC_NPROCESSORS_ONLN); |
19 } | 26 } |
20 #endif | 27 #endif |
21 | 28 |
22 namespace { | 29 namespace { |
23 | 30 |
24 class ThreadPool : SkNoncopyable { | 31 class ThreadPool : SkNoncopyable { |
25 public: | 32 public: |
26 static void Add(SkRunnable* task, int32_t* pending) { | 33 static void Add(SkRunnable* task, SkAtomic<int32_t>* pending) { |
27 if (!gGlobal) { // If we have no threads, run synchronously. | 34 if (!gGlobal) { // If we have no threads, run synchronously. |
28 return task->run(); | 35 return task->run(); |
29 } | 36 } |
30 gGlobal->add(&CallRunnable, task, pending); | 37 gGlobal->add(&CallRunnable, task, pending); |
31 } | 38 } |
32 | 39 |
33 static void Add(void (*fn)(void*), void* arg, int32_t* pending) { | 40 static void Add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { |
34 if (!gGlobal) { | 41 if (!gGlobal) { |
35 return fn(arg); | 42 return fn(arg); |
36 } | 43 } |
37 gGlobal->add(fn, arg, pending); | 44 gGlobal->add(fn, arg, pending); |
38 } | 45 } |
39 | 46 |
40 static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32
_t* pending) { | 47 static void Batch(void (*fn)(void*), void* args, int N, size_t stride, |
| 48 SkAtomic<int32_t>* pending) { |
41 if (!gGlobal) { | 49 if (!gGlobal) { |
42 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } | 50 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } |
43 return; | 51 return; |
44 } | 52 } |
45 gGlobal->batch(fn, args, N, stride, pending); | 53 gGlobal->batch(fn, args, N, stride, pending); |
46 } | 54 } |
47 | 55 |
48 static void Wait(int32_t* pending) { | 56 static void Wait(SkAtomic<int32_t>* pending) { |
49 if (!gGlobal) { // If we have no threads, the work must already be done
. | 57 if (!gGlobal) { // If we have no threads, the work must already be done
. |
50 SkASSERT(*pending == 0); | 58 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); |
51 return; | 59 return; |
52 } | 60 } |
53 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here
or in Loop. | 61 // Acquire pairs with decrement release here or in Loop. |
| 62 while (pending->load(sk_memory_order_acquire) > 0) { |
54 // Lend a hand until our SkTaskGroup of interest is done. | 63 // Lend a hand until our SkTaskGroup of interest is done. |
55 Work work; | 64 Work work; |
56 { | 65 { |
57 AutoLock lock(&gGlobal->fReady); | 66 AutoLock lock(&gGlobal->fReady); |
58 if (gGlobal->fWork.isEmpty()) { | 67 if (gGlobal->fWork.isEmpty()) { |
59 // Someone has picked up all the work (including ours). How
nice of them! | 68 // Someone has picked up all the work (including ours). How
nice of them! |
60 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) | 69 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) |
61 continue; | 70 continue; |
62 } | 71 } |
63 gGlobal->fWork.pop(&work); | 72 gGlobal->fWork.pop(&work); |
64 } | 73 } |
65 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. | 74 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. |
66 // We threads gotta stick together. We're always making forward pro
gress. | 75 // We threads gotta stick together. We're always making forward pro
gress. |
67 work.fn(work.arg); | 76 work.fn(work.arg); |
68 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_l
oad() just above. | 77 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load above. |
69 } | 78 } |
70 } | 79 } |
71 | 80 |
72 private: | 81 private: |
73 struct AutoLock { | 82 struct AutoLock { |
74 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } | 83 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } |
75 ~AutoLock() { fC->unlock(); } | 84 ~AutoLock() { fC->unlock(); } |
76 private: | 85 private: |
77 SkCondVar* fC; | 86 SkCondVar* fC; |
78 }; | 87 }; |
79 | 88 |
80 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} | 89 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} |
81 | 90 |
82 struct Work { | 91 struct Work { |
83 void (*fn)(void*); // A function to call, | 92 void (*fn)(void*); // A function to call, |
84 void* arg; // its argument, | 93 void* arg; // its argument, |
85 int32_t* pending; // then sk_atomic_dec(pending) afterwards. | 94 SkAtomic<int32_t>* pending; // then decrement pending afterwards. |
86 }; | 95 }; |
87 | 96 |
88 explicit ThreadPool(int threads) : fDraining(false) { | 97 explicit ThreadPool(int threads) : fDraining(false) { |
89 if (threads == -1) { | 98 if (threads == -1) { |
90 threads = num_cores(); | 99 threads = num_cores(); |
91 } | 100 } |
92 for (int i = 0; i < threads; i++) { | 101 for (int i = 0; i < threads; i++) { |
93 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | 102 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); |
94 fThreads.top()->start(); | 103 fThreads.top()->start(); |
95 } | 104 } |
96 } | 105 } |
97 | 106 |
98 ~ThreadPool() { | 107 ~ThreadPool() { |
99 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. | 108 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. |
100 { | 109 { |
101 AutoLock lock(&fReady); | 110 AutoLock lock(&fReady); |
102 fDraining = true; | 111 fDraining = true; |
103 fReady.broadcast(); | 112 fReady.broadcast(); |
104 } | 113 } |
105 for (int i = 0; i < fThreads.count(); i++) { | 114 for (int i = 0; i < fThreads.count(); i++) { |
106 fThreads[i]->join(); | 115 fThreads[i]->join(); |
107 } | 116 } |
108 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | 117 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. |
109 fThreads.deleteAll(); | 118 fThreads.deleteAll(); |
110 } | 119 } |
111 | 120 |
112 void add(void (*fn)(void*), void* arg, int32_t* pending) { | 121 void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { |
113 Work work = { fn, arg, pending }; | 122 Work work = { fn, arg, pending }; |
114 sk_atomic_inc(pending); // No barrier needed. | 123 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. |
115 { | 124 { |
116 AutoLock lock(&fReady); | 125 AutoLock lock(&fReady); |
117 fWork.push(work); | 126 fWork.push(work); |
118 fReady.signal(); | 127 fReady.signal(); |
119 } | 128 } |
120 } | 129 } |
121 | 130 |
122 void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pend
ing) { | 131 void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3
2_t>* pending) { |
123 sk_atomic_add(pending, N); // No barrier needed. | 132 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. |
124 { | 133 { |
125 AutoLock lock(&fReady); | 134 AutoLock lock(&fReady); |
126 Work* batch = fWork.append(N); | 135 Work* batch = fWork.append(N); |
127 for (int i = 0; i < N; i++) { | 136 for (int i = 0; i < N; i++) { |
128 Work work = { fn, (char*)arg + i*stride, pending }; | 137 Work work = { fn, (char*)arg + i*stride, pending }; |
129 batch[i] = work; | 138 batch[i] = work; |
130 } | 139 } |
131 fReady.broadcast(); | 140 fReady.broadcast(); |
132 } | 141 } |
133 } | 142 } |
134 | 143 |
135 static void Loop(void* arg) { | 144 static void Loop(void* arg) { |
136 ThreadPool* pool = (ThreadPool*)arg; | 145 ThreadPool* pool = (ThreadPool*)arg; |
137 Work work; | 146 Work work; |
138 while (true) { | 147 while (true) { |
139 { | 148 { |
140 AutoLock lock(&pool->fReady); | 149 AutoLock lock(&pool->fReady); |
141 while (pool->fWork.isEmpty()) { | 150 while (pool->fWork.isEmpty()) { |
142 if (pool->fDraining) { | 151 if (pool->fDraining) { |
143 return; | 152 return; |
144 } | 153 } |
145 pool->fReady.wait(); | 154 pool->fReady.wait(); |
146 } | 155 } |
147 pool->fWork.pop(&work); | 156 pool->fWork.pop(&work); |
148 } | 157 } |
149 work.fn(work.arg); | 158 work.fn(work.arg); |
150 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load(
) in Wait(). | 159 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load in Wait(). |
151 } | 160 } |
152 } | 161 } |
153 | 162 |
154 SkTDArray<Work> fWork; | 163 SkTDArray<Work> fWork; |
155 SkTDArray<SkThread*> fThreads; | 164 SkTDArray<SkThread*> fThreads; |
156 SkCondVar fReady; | 165 SkCondVar fReady; |
157 bool fDraining; | 166 bool fDraining; |
158 | 167 |
159 static ThreadPool* gGlobal; | 168 static ThreadPool* gGlobal; |
160 friend struct SkTaskGroup::Enabler; | 169 friend struct SkTaskGroup::Enabler; |
(...skipping 15 matching lines...) Expand all Loading... |
176 | 185 |
177 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 186 SkTaskGroup::SkTaskGroup() : fPending(0) {} |
178 | 187 |
179 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } | 188 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } |
180 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } | 189 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } |
181 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } | 190 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } |
182 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { | 191 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { |
183 ThreadPool::Batch(fn, args, N, stride, &fPending); | 192 ThreadPool::Batch(fn, args, N, stride, &fPending); |
184 } | 193 } |
185 | 194 |
OLD | NEW |