OLD | NEW |
| (Empty) |
1 #include "SkTaskGroup.h" | |
2 | |
3 #include "SkCondVar.h" | |
4 #include "SkTDArray.h" | |
5 #include "SkThread.h" | |
6 #include "SkThreadUtils.h" | |
7 | |
8 #if defined(SK_BUILD_FOR_WIN32) | |
9 static inline int num_cores() { | |
10 SYSTEM_INFO sysinfo; | |
11 GetSystemInfo(&sysinfo); | |
12 return sysinfo.dwNumberOfProcessors; | |
13 } | |
14 #else | |
15 #include <unistd.h> | |
16 static inline int num_cores() { | |
17 return (int) sysconf(_SC_NPROCESSORS_ONLN); | |
18 } | |
19 #endif | |
20 | |
21 namespace { | |
22 | |
23 class ThreadPool : SkNoncopyable { | |
24 public: | |
25 static void Add(SkRunnable* task, int32_t* pending) { | |
26 SkASSERT(gGlobal); // If this fails, see SkTaskGroup::Enabler. | |
27 gGlobal->add(task, pending); | |
28 } | |
29 | |
30 static void Wait(int32_t* pending) { | |
31 SkASSERT(gGlobal); // If this fails, see SkTaskGroup::Enabler. | |
32 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here
or in Loop. | |
33 // Lend a hand until our SkTaskGroup of interest is done. | |
34 Work work; | |
35 { | |
36 AutoLock lock(&gGlobal->fReady); | |
37 if (gGlobal->fWork.isEmpty()) { | |
38 // Someone has picked up all the work (including ours). How
nice of them! | |
39 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) | |
40 continue; | |
41 } | |
42 gGlobal->fWork.pop(&work); | |
43 } | |
44 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. | |
45 // We threads gotta stick together. We're always making forward pro
gress. | |
46 work.task->run(); | |
47 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_l
oad() just above. | |
48 } | |
49 } | |
50 | |
51 private: | |
52 struct AutoLock { | |
53 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } | |
54 ~AutoLock() { fC->unlock(); } | |
55 private: | |
56 SkCondVar* fC; | |
57 }; | |
58 | |
59 struct Work { | |
60 SkRunnable* task; // A task to ->run(), | |
61 int32_t* pending; // then sk_atomic_dec(pending) afterwards. | |
62 }; | |
63 | |
64 explicit ThreadPool(int threads) : fDraining(false) { | |
65 if (threads == 0) { | |
66 threads = num_cores(); | |
67 } | |
68 for (int i = 0; i < threads; i++) { | |
69 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | |
70 fThreads.top()->start(); | |
71 } | |
72 } | |
73 | |
74 ~ThreadPool() { | |
75 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. | |
76 { | |
77 AutoLock lock(&fReady); | |
78 fDraining = true; | |
79 fReady.broadcast(); | |
80 } | |
81 for (int i = 0; i < fThreads.count(); i++) { | |
82 fThreads[i]->join(); | |
83 } | |
84 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | |
85 fThreads.deleteAll(); | |
86 } | |
87 | |
88 void add(SkRunnable* task, int32_t* pending) { | |
89 Work work = { task, pending }; | |
90 sk_atomic_inc(pending); // No barrier needed. | |
91 { | |
92 AutoLock lock(&fReady); | |
93 fWork.push(work); | |
94 fReady.signal(); | |
95 } | |
96 } | |
97 | |
98 static void Loop(void* arg) { | |
99 ThreadPool* pool = (ThreadPool*)arg; | |
100 Work work; | |
101 while (true) { | |
102 { | |
103 AutoLock lock(&pool->fReady); | |
104 while (pool->fWork.isEmpty()) { | |
105 if (pool->fDraining) { | |
106 return; | |
107 } | |
108 pool->fReady.wait(); | |
109 } | |
110 pool->fWork.pop(&work); | |
111 } | |
112 work.task->run(); | |
113 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load(
) in Wait(). | |
114 } | |
115 } | |
116 | |
117 SkTDArray<Work> fWork; | |
118 SkTDArray<SkThread*> fThreads; | |
119 SkCondVar fReady; | |
120 bool fDraining; | |
121 | |
122 static ThreadPool* gGlobal; | |
123 friend struct SkTaskGroup::Enabler; | |
124 }; | |
125 ThreadPool* ThreadPool::gGlobal = NULL; | |
126 | |
127 } // namespace | |
128 | |
129 SkTaskGroup::Enabler::Enabler(int threads) { | |
130 SkASSERT(ThreadPool::gGlobal == NULL); | |
131 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); | |
132 } | |
133 | |
134 SkTaskGroup::Enabler::~Enabler() { | |
135 SkASSERT(ThreadPool::gGlobal != NULL); | |
136 SkDELETE(ThreadPool::gGlobal); | |
137 } | |
138 | |
139 SkTaskGroup::SkTaskGroup() : fPending(0) {} | |
140 | |
141 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); } | |
142 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } | |
143 | |
OLD | NEW |