OLD | NEW |
| (Empty) |
1 #include "SkTaskGroup.h" | |
2 | |
3 #include "SkCondVar.h" | |
4 #include "SkTDArray.h" | |
5 #include "SkThread.h" | |
6 #include "SkThreadUtils.h" | |
7 | |
8 #if defined(SK_BUILD_FOR_WIN32) | |
9 static inline int num_cores() { | |
10 SYSTEM_INFO sysinfo; | |
11 GetSystemInfo(&sysinfo); | |
12 return sysinfo.dwNumberOfProcessors; | |
13 } | |
14 #else | |
15 #include <unistd.h> | |
16 static inline int num_cores() { | |
17 return (int) sysconf(_SC_NPROCESSORS_ONLN); | |
18 } | |
19 #endif | |
20 | |
21 namespace { | |
22 | |
23 class ThreadPool : SkNoncopyable { | |
24 public: | |
25 static void Add(SkRunnable* task, int32_t* pending) { | |
26 if (!gGlobal) { // If we have no threads, run synchronously. | |
27 return task->run(); | |
28 } | |
29 gGlobal->add(&CallRunnable, task, pending); | |
30 } | |
31 | |
32 static void Add(void (*fn)(void*), void* arg, int32_t* pending) { | |
33 if (!gGlobal) { | |
34 return fn(arg); | |
35 } | |
36 gGlobal->add(fn, arg, pending); | |
37 } | |
38 | |
39 static void Wait(int32_t* pending) { | |
40 if (!gGlobal) { // If we have no threads, the work must already be done
. | |
41 SkASSERT(*pending == 0); | |
42 return; | |
43 } | |
44 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here
or in Loop. | |
45 // Lend a hand until our SkTaskGroup of interest is done. | |
46 Work work; | |
47 { | |
48 AutoLock lock(&gGlobal->fReady); | |
49 if (gGlobal->fWork.isEmpty()) { | |
50 // Someone has picked up all the work (including ours). How
nice of them! | |
51 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) | |
52 continue; | |
53 } | |
54 gGlobal->fWork.pop(&work); | |
55 } | |
56 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. | |
57 // We threads gotta stick together. We're always making forward pro
gress. | |
58 work.fn(work.arg); | |
59 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_l
oad() just above. | |
60 } | |
61 } | |
62 | |
63 private: | |
64 struct AutoLock { | |
65 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } | |
66 ~AutoLock() { fC->unlock(); } | |
67 private: | |
68 SkCondVar* fC; | |
69 }; | |
70 | |
71 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} | |
72 | |
73 struct Work { | |
74 void (*fn)(void*); // A function to call, | |
75 void* arg; // its argument, | |
76 int32_t* pending; // then sk_atomic_dec(pending) afterwards. | |
77 }; | |
78 | |
79 explicit ThreadPool(int threads) : fDraining(false) { | |
80 if (threads == -1) { | |
81 threads = num_cores(); | |
82 } | |
83 for (int i = 0; i < threads; i++) { | |
84 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | |
85 fThreads.top()->start(); | |
86 } | |
87 } | |
88 | |
89 ~ThreadPool() { | |
90 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. | |
91 { | |
92 AutoLock lock(&fReady); | |
93 fDraining = true; | |
94 fReady.broadcast(); | |
95 } | |
96 for (int i = 0; i < fThreads.count(); i++) { | |
97 fThreads[i]->join(); | |
98 } | |
99 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | |
100 fThreads.deleteAll(); | |
101 } | |
102 | |
103 void add(void (*fn)(void*), void* arg, int32_t* pending) { | |
104 Work work = { fn, arg, pending }; | |
105 sk_atomic_inc(pending); // No barrier needed. | |
106 { | |
107 AutoLock lock(&fReady); | |
108 fWork.push(work); | |
109 fReady.signal(); | |
110 } | |
111 } | |
112 | |
113 static void Loop(void* arg) { | |
114 ThreadPool* pool = (ThreadPool*)arg; | |
115 Work work; | |
116 while (true) { | |
117 { | |
118 AutoLock lock(&pool->fReady); | |
119 while (pool->fWork.isEmpty()) { | |
120 if (pool->fDraining) { | |
121 return; | |
122 } | |
123 pool->fReady.wait(); | |
124 } | |
125 pool->fWork.pop(&work); | |
126 } | |
127 work.fn(work.arg); | |
128 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load(
) in Wait(). | |
129 } | |
130 } | |
131 | |
132 SkTDArray<Work> fWork; | |
133 SkTDArray<SkThread*> fThreads; | |
134 SkCondVar fReady; | |
135 bool fDraining; | |
136 | |
137 static ThreadPool* gGlobal; | |
138 friend struct SkTaskGroup::Enabler; | |
139 }; | |
140 ThreadPool* ThreadPool::gGlobal = NULL; | |
141 | |
142 } // namespace | |
143 | |
144 SkTaskGroup::Enabler::Enabler(int threads) { | |
145 SkASSERT(ThreadPool::gGlobal == NULL); | |
146 if (threads != 0) { | |
147 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); | |
148 } | |
149 } | |
150 | |
151 SkTaskGroup::Enabler::~Enabler() { | |
152 SkDELETE(ThreadPool::gGlobal); | |
153 } | |
154 | |
155 SkTaskGroup::SkTaskGroup() : fPending(0) {} | |
156 | |
157 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } | |
158 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } | |
159 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } | |
160 | |
OLD | NEW |