Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(169)

Side by Side Diff: src/core/SkTaskGroup.cpp

Issue 1193493003: Modernize atomics in SkTaskGroup's threadpool. (Closed) Base URL: https://skia.googlesource.com/skia.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « src/core/SkTaskGroup.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /*
2 * Copyright 2014 Google Inc.
3 *
4 * Use of this source code is governed by a BSD-style license that can be
5 * found in the LICENSE file.
6 */
7
1 #include "SkTaskGroup.h" 8 #include "SkTaskGroup.h"
2 9
3 #include "SkCondVar.h" 10 #include "SkCondVar.h"
4 #include "SkRunnable.h" 11 #include "SkRunnable.h"
5 #include "SkTDArray.h" 12 #include "SkTDArray.h"
6 #include "SkThread.h" 13 #include "SkThread.h"
7 #include "SkThreadUtils.h" 14 #include "SkThreadUtils.h"
8 15
9 #if defined(SK_BUILD_FOR_WIN32) 16 #if defined(SK_BUILD_FOR_WIN32)
10 static inline int num_cores() { 17 static inline int num_cores() {
11 SYSTEM_INFO sysinfo; 18 SYSTEM_INFO sysinfo;
12 GetSystemInfo(&sysinfo); 19 GetSystemInfo(&sysinfo);
13 return sysinfo.dwNumberOfProcessors; 20 return sysinfo.dwNumberOfProcessors;
14 } 21 }
15 #else 22 #else
16 #include <unistd.h> 23 #include <unistd.h>
17 static inline int num_cores() { 24 static inline int num_cores() {
18 return (int) sysconf(_SC_NPROCESSORS_ONLN); 25 return (int) sysconf(_SC_NPROCESSORS_ONLN);
19 } 26 }
20 #endif 27 #endif
21 28
22 namespace { 29 namespace {
23 30
24 class ThreadPool : SkNoncopyable { 31 class ThreadPool : SkNoncopyable {
25 public: 32 public:
26 static void Add(SkRunnable* task, int32_t* pending) { 33 static void Add(SkRunnable* task, SkAtomic<int32_t>* pending) {
27 if (!gGlobal) { // If we have no threads, run synchronously. 34 if (!gGlobal) { // If we have no threads, run synchronously.
28 return task->run(); 35 return task->run();
29 } 36 }
30 gGlobal->add(&CallRunnable, task, pending); 37 gGlobal->add(&CallRunnable, task, pending);
31 } 38 }
32 39
33 static void Add(void (*fn)(void*), void* arg, int32_t* pending) { 40 static void Add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) {
34 if (!gGlobal) { 41 if (!gGlobal) {
35 return fn(arg); 42 return fn(arg);
36 } 43 }
37 gGlobal->add(fn, arg, pending); 44 gGlobal->add(fn, arg, pending);
38 } 45 }
39 46
40 static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32 _t* pending) { 47 static void Batch(void (*fn)(void*), void* args, int N, size_t stride,
48 SkAtomic<int32_t>* pending) {
41 if (!gGlobal) { 49 if (!gGlobal) {
42 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } 50 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); }
43 return; 51 return;
44 } 52 }
45 gGlobal->batch(fn, args, N, stride, pending); 53 gGlobal->batch(fn, args, N, stride, pending);
46 } 54 }
47 55
48 static void Wait(int32_t* pending) { 56 static void Wait(SkAtomic<int32_t>* pending) {
49 if (!gGlobal) { // If we have no threads, the work must already be done . 57 if (!gGlobal) { // If we have no threads, the work must already be done .
50 SkASSERT(*pending == 0); 58 SkASSERT(pending->load(sk_memory_order_relaxed) == 0);
51 return; 59 return;
52 } 60 }
53 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here or in Loop. 61 // Acquire pairs with decrement release here or in Loop.
62 while (pending->load(sk_memory_order_acquire) > 0) {
54 // Lend a hand until our SkTaskGroup of interest is done. 63 // Lend a hand until our SkTaskGroup of interest is done.
55 Work work; 64 Work work;
56 { 65 {
57 AutoLock lock(&gGlobal->fReady); 66 AutoLock lock(&gGlobal->fReady);
58 if (gGlobal->fWork.isEmpty()) { 67 if (gGlobal->fWork.isEmpty()) {
59 // Someone has picked up all the work (including ours). How nice of them! 68 // Someone has picked up all the work (including ours). How nice of them!
60 // (They may still be working on it, so we can't assert *pen ding == 0 here.) 69 // (They may still be working on it, so we can't assert *pen ding == 0 here.)
61 continue; 70 continue;
62 } 71 }
63 gGlobal->fWork.pop(&work); 72 gGlobal->fWork.pop(&work);
64 } 73 }
65 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. 74 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
66 // We threads gotta stick together. We're always making forward pro gress. 75 // We threads gotta stick together. We're always making forward pro gress.
67 work.fn(work.arg); 76 work.fn(work.arg);
68 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_l oad() just above. 77 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above.
69 } 78 }
70 } 79 }
71 80
72 private: 81 private:
73 struct AutoLock { 82 struct AutoLock {
74 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } 83 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
75 ~AutoLock() { fC->unlock(); } 84 ~AutoLock() { fC->unlock(); }
76 private: 85 private:
77 SkCondVar* fC; 86 SkCondVar* fC;
78 }; 87 };
79 88
80 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); } 89 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
81 90
82 struct Work { 91 struct Work {
83 void (*fn)(void*); // A function to call, 92 void (*fn)(void*); // A function to call,
84 void* arg; // its argument, 93 void* arg; // its argument,
85 int32_t* pending; // then sk_atomic_dec(pending) afterwards. 94 SkAtomic<int32_t>* pending; // then decrement pending afterwards.
86 }; 95 };
87 96
88 explicit ThreadPool(int threads) : fDraining(false) { 97 explicit ThreadPool(int threads) : fDraining(false) {
89 if (threads == -1) { 98 if (threads == -1) {
90 threads = num_cores(); 99 threads = num_cores();
91 } 100 }
92 for (int i = 0; i < threads; i++) { 101 for (int i = 0; i < threads; i++) {
93 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); 102 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));
94 fThreads.top()->start(); 103 fThreads.top()->start();
95 } 104 }
96 } 105 }
97 106
98 ~ThreadPool() { 107 ~ThreadPool() {
99 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n ow. 108 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n ow.
100 { 109 {
101 AutoLock lock(&fReady); 110 AutoLock lock(&fReady);
102 fDraining = true; 111 fDraining = true;
103 fReady.broadcast(); 112 fReady.broadcast();
104 } 113 }
105 for (int i = 0; i < fThreads.count(); i++) { 114 for (int i = 0; i < fThreads.count(); i++) {
106 fThreads[i]->join(); 115 fThreads[i]->join();
107 } 116 }
108 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. 117 SkASSERT(fWork.isEmpty()); // Can't hurt to double check.
109 fThreads.deleteAll(); 118 fThreads.deleteAll();
110 } 119 }
111 120
112 void add(void (*fn)(void*), void* arg, int32_t* pending) { 121 void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) {
113 Work work = { fn, arg, pending }; 122 Work work = { fn, arg, pending };
114 sk_atomic_inc(pending); // No barrier needed. 123 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed.
115 { 124 {
116 AutoLock lock(&fReady); 125 AutoLock lock(&fReady);
117 fWork.push(work); 126 fWork.push(work);
118 fReady.signal(); 127 fReady.signal();
119 } 128 }
120 } 129 }
121 130
122 void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pend ing) { 131 void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3 2_t>* pending) {
123 sk_atomic_add(pending, N); // No barrier needed. 132 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed.
124 { 133 {
125 AutoLock lock(&fReady); 134 AutoLock lock(&fReady);
126 Work* batch = fWork.append(N); 135 Work* batch = fWork.append(N);
127 for (int i = 0; i < N; i++) { 136 for (int i = 0; i < N; i++) {
128 Work work = { fn, (char*)arg + i*stride, pending }; 137 Work work = { fn, (char*)arg + i*stride, pending };
129 batch[i] = work; 138 batch[i] = work;
130 } 139 }
131 fReady.broadcast(); 140 fReady.broadcast();
132 } 141 }
133 } 142 }
134 143
135 static void Loop(void* arg) { 144 static void Loop(void* arg) {
136 ThreadPool* pool = (ThreadPool*)arg; 145 ThreadPool* pool = (ThreadPool*)arg;
137 Work work; 146 Work work;
138 while (true) { 147 while (true) {
139 { 148 {
140 AutoLock lock(&pool->fReady); 149 AutoLock lock(&pool->fReady);
141 while (pool->fWork.isEmpty()) { 150 while (pool->fWork.isEmpty()) {
142 if (pool->fDraining) { 151 if (pool->fDraining) {
143 return; 152 return;
144 } 153 }
145 pool->fReady.wait(); 154 pool->fReady.wait();
146 } 155 }
147 pool->fWork.pop(&work); 156 pool->fWork.pop(&work);
148 } 157 }
149 work.fn(work.arg); 158 work.fn(work.arg);
150 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load( ) in Wait(). 159 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait().
151 } 160 }
152 } 161 }
153 162
154 SkTDArray<Work> fWork; 163 SkTDArray<Work> fWork;
155 SkTDArray<SkThread*> fThreads; 164 SkTDArray<SkThread*> fThreads;
156 SkCondVar fReady; 165 SkCondVar fReady;
157 bool fDraining; 166 bool fDraining;
158 167
159 static ThreadPool* gGlobal; 168 static ThreadPool* gGlobal;
160 friend struct SkTaskGroup::Enabler; 169 friend struct SkTaskGroup::Enabler;
(...skipping 15 matching lines...) Expand all
176 185
177 SkTaskGroup::SkTaskGroup() : fPending(0) {} 186 SkTaskGroup::SkTaskGroup() : fPending(0) {}
178 187
179 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending ); } 188 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending ); }
180 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe nding); } 189 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe nding); }
181 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, & fPending); } 190 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, & fPending); }
182 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { 191 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) {
183 ThreadPool::Batch(fn, args, N, stride, &fPending); 192 ThreadPool::Batch(fn, args, N, stride, &fPending);
184 } 193 }
185 194
OLDNEW
« no previous file with comments | « src/core/SkTaskGroup.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698