| OLD | NEW |
| (Empty) |
| 1 #include "SkTaskGroup.h" | |
| 2 | |
| 3 #include "SkCondVar.h" | |
| 4 #include "SkTDArray.h" | |
| 5 #include "SkThread.h" | |
| 6 #include "SkThreadUtils.h" | |
| 7 | |
| 8 #if defined(SK_BUILD_FOR_WIN32) | |
| 9 static inline int num_cores() { | |
| 10 SYSTEM_INFO sysinfo; | |
| 11 GetSystemInfo(&sysinfo); | |
| 12 return sysinfo.dwNumberOfProcessors; | |
| 13 } | |
| 14 #else | |
| 15 #include <unistd.h> | |
| 16 static inline int num_cores() { | |
| 17 return (int) sysconf(_SC_NPROCESSORS_ONLN); | |
| 18 } | |
| 19 #endif | |
| 20 | |
| 21 namespace { | |
| 22 | |
| 23 class ThreadPool : SkNoncopyable { | |
| 24 public: | |
| 25 static void Add(SkRunnable* task, int32_t* pending) { | |
| 26 SkASSERT(gGlobal); // If this fails, see SkTaskGroup::Enabler. | |
| 27 gGlobal->add(task, pending); | |
| 28 } | |
| 29 | |
| 30 static void Wait(int32_t* pending) { | |
| 31 SkASSERT(gGlobal); // If this fails, see SkTaskGroup::Enabler. | |
| 32 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here
or in Loop. | |
| 33 // Lend a hand until our SkTaskGroup of interest is done. | |
| 34 Work work; | |
| 35 { | |
| 36 AutoLock lock(&gGlobal->fReady); | |
| 37 if (gGlobal->fWork.isEmpty()) { | |
| 38 // Someone has picked up all the work (including ours). How
nice of them! | |
| 39 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) | |
| 40 continue; | |
| 41 } | |
| 42 gGlobal->fWork.pop(&work); | |
| 43 } | |
| 44 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. | |
| 45 // We threads gotta stick together. We're always making forward pro
gress. | |
| 46 work.task->run(); | |
| 47 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_l
oad() just above. | |
| 48 } | |
| 49 } | |
| 50 | |
| 51 private: | |
| 52 struct AutoLock { | |
| 53 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } | |
| 54 ~AutoLock() { fC->unlock(); } | |
| 55 private: | |
| 56 SkCondVar* fC; | |
| 57 }; | |
| 58 | |
| 59 struct Work { | |
| 60 SkRunnable* task; // A task to ->run(), | |
| 61 int32_t* pending; // then sk_atomic_dec(pending) afterwards. | |
| 62 }; | |
| 63 | |
| 64 explicit ThreadPool(int threads) : fDraining(false) { | |
| 65 if (threads == 0) { | |
| 66 threads = num_cores(); | |
| 67 } | |
| 68 for (int i = 0; i < threads; i++) { | |
| 69 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | |
| 70 fThreads.top()->start(); | |
| 71 } | |
| 72 } | |
| 73 | |
| 74 ~ThreadPool() { | |
| 75 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. | |
| 76 { | |
| 77 AutoLock lock(&fReady); | |
| 78 fDraining = true; | |
| 79 fReady.broadcast(); | |
| 80 } | |
| 81 for (int i = 0; i < fThreads.count(); i++) { | |
| 82 fThreads[i]->join(); | |
| 83 } | |
| 84 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | |
| 85 fThreads.deleteAll(); | |
| 86 } | |
| 87 | |
| 88 void add(SkRunnable* task, int32_t* pending) { | |
| 89 Work work = { task, pending }; | |
| 90 sk_atomic_inc(pending); // No barrier needed. | |
| 91 { | |
| 92 AutoLock lock(&fReady); | |
| 93 fWork.push(work); | |
| 94 fReady.signal(); | |
| 95 } | |
| 96 } | |
| 97 | |
| 98 static void Loop(void* arg) { | |
| 99 ThreadPool* pool = (ThreadPool*)arg; | |
| 100 Work work; | |
| 101 while (true) { | |
| 102 { | |
| 103 AutoLock lock(&pool->fReady); | |
| 104 while (pool->fWork.isEmpty()) { | |
| 105 if (pool->fDraining) { | |
| 106 return; | |
| 107 } | |
| 108 pool->fReady.wait(); | |
| 109 } | |
| 110 pool->fWork.pop(&work); | |
| 111 } | |
| 112 work.task->run(); | |
| 113 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load(
) in Wait(). | |
| 114 } | |
| 115 } | |
| 116 | |
| 117 SkTDArray<Work> fWork; | |
| 118 SkTDArray<SkThread*> fThreads; | |
| 119 SkCondVar fReady; | |
| 120 bool fDraining; | |
| 121 | |
| 122 static ThreadPool* gGlobal; | |
| 123 friend struct SkTaskGroup::Enabler; | |
| 124 }; | |
| 125 ThreadPool* ThreadPool::gGlobal = NULL; | |
| 126 | |
| 127 } // namespace | |
| 128 | |
| 129 SkTaskGroup::Enabler::Enabler(int threads) { | |
| 130 SkASSERT(ThreadPool::gGlobal == NULL); | |
| 131 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); | |
| 132 } | |
| 133 | |
| 134 SkTaskGroup::Enabler::~Enabler() { | |
| 135 SkASSERT(ThreadPool::gGlobal != NULL); | |
| 136 SkDELETE(ThreadPool::gGlobal); | |
| 137 } | |
| 138 | |
| 139 SkTaskGroup::SkTaskGroup() : fPending(0) {} | |
| 140 | |
| 141 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); } | |
| 142 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } | |
| 143 | |
| OLD | NEW |