| OLD | NEW |
| 1 #include "SkTaskGroup.h" | 1 #include "SkTaskGroup.h" |
| 2 | 2 |
| 3 #include "SkCondVar.h" | 3 #include "SkCondVar.h" |
| 4 #include "SkTDArray.h" | 4 #include "SkTDArray.h" |
| 5 #include "SkThread.h" | 5 #include "SkThread.h" |
| 6 #include "SkThreadUtils.h" | 6 #include "SkThreadUtils.h" |
| 7 | 7 |
| 8 #if defined(SK_BUILD_FOR_WIN32) | 8 #if defined(SK_BUILD_FOR_WIN32) |
| 9 static inline int num_cores() { | 9 static inline int num_cores() { |
| 10 SYSTEM_INFO sysinfo; | 10 SYSTEM_INFO sysinfo; |
| 11 GetSystemInfo(&sysinfo); | 11 GetSystemInfo(&sysinfo); |
| 12 return sysinfo.dwNumberOfProcessors; | 12 return sysinfo.dwNumberOfProcessors; |
| 13 } | 13 } |
| 14 #else | 14 #else |
| 15 #include <unistd.h> | 15 #include <unistd.h> |
| 16 static inline int num_cores() { | 16 static inline int num_cores() { |
| 17 return (int) sysconf(_SC_NPROCESSORS_ONLN); | 17 return (int) sysconf(_SC_NPROCESSORS_ONLN); |
| 18 } | 18 } |
| 19 #endif | 19 #endif |
| 20 | 20 |
| 21 namespace { | 21 namespace { |
| 22 | 22 |
| 23 class ThreadPool : SkNoncopyable { | 23 class ThreadPool : SkNoncopyable { |
| 24 public: | 24 public: |
| 25 static void Add(SkRunnable* task, int32_t* pending) { | 25 static void Add(SkRunnable* task, int32_t* pending) { |
| 26 if (!gGlobal) { // If we have no threads, run synchronously. | 26 if (!gGlobal) { // If we have no threads, run synchronously. |
| 27 return task->run(); | 27 return task->run(); |
| 28 } | 28 } |
| 29 gGlobal->add(task, pending); | 29 gGlobal->add(&CallRunnable, task, pending); |
| 30 } |
| 31 |
| 32 static void Add(void (*fn)(void*), void* arg, int32_t* pending) { |
| 33 if (!gGlobal) { |
| 34 return fn(arg); |
| 35 } |
| 36 gGlobal->add(fn, arg, pending); |
| 30 } | 37 } |
| 31 | 38 |
| 32 static void Wait(int32_t* pending) { | 39 static void Wait(int32_t* pending) { |
| 33 if (!gGlobal) { // If we have no threads, the work must already be done
. | 40 if (!gGlobal) { // If we have no threads, the work must already be done
. |
| 34 SkASSERT(*pending == 0); | 41 SkASSERT(*pending == 0); |
| 35 return; | 42 return; |
| 36 } | 43 } |
| 37 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here
or in Loop. | 44 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here
or in Loop. |
| 38 // Lend a hand until our SkTaskGroup of interest is done. | 45 // Lend a hand until our SkTaskGroup of interest is done. |
| 39 Work work; | 46 Work work; |
| 40 { | 47 { |
| 41 AutoLock lock(&gGlobal->fReady); | 48 AutoLock lock(&gGlobal->fReady); |
| 42 if (gGlobal->fWork.isEmpty()) { | 49 if (gGlobal->fWork.isEmpty()) { |
| 43 // Someone has picked up all the work (including ours). How
nice of them! | 50 // Someone has picked up all the work (including ours). How
nice of them! |
| 44 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) | 51 // (They may still be working on it, so we can't assert *pen
ding == 0 here.) |
| 45 continue; | 52 continue; |
| 46 } | 53 } |
| 47 gGlobal->fWork.pop(&work); | 54 gGlobal->fWork.pop(&work); |
| 48 } | 55 } |
| 49 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. | 56 // This Work isn't necessarily part of our SkTaskGroup of interest,
but that's fine. |
| 50 // We threads gotta stick together. We're always making forward pro
gress. | 57 // We threads gotta stick together. We're always making forward pro
gress. |
| 51 work.task->run(); | 58 work.fn(work.arg); |
| 52 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_l
oad() just above. | 59 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_l
oad() just above. |
| 53 } | 60 } |
| 54 } | 61 } |
| 55 | 62 |
| 56 private: | 63 private: |
| 57 struct AutoLock { | 64 struct AutoLock { |
| 58 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } | 65 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } |
| 59 ~AutoLock() { fC->unlock(); } | 66 ~AutoLock() { fC->unlock(); } |
| 60 private: | 67 private: |
| 61 SkCondVar* fC; | 68 SkCondVar* fC; |
| 62 }; | 69 }; |
| 63 | 70 |
| 71 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run();
} |
| 72 |
| 64 struct Work { | 73 struct Work { |
| 65 SkRunnable* task; // A task to ->run(), | 74 void (*fn)(void*); // A function to call, |
| 66 int32_t* pending; // then sk_atomic_dec(pending) afterwards. | 75 void* arg; // its argument, |
| 76 int32_t* pending; // then sk_atomic_dec(pending) afterwards. |
| 67 }; | 77 }; |
| 68 | 78 |
| 69 explicit ThreadPool(int threads) : fDraining(false) { | 79 explicit ThreadPool(int threads) : fDraining(false) { |
| 70 if (threads == -1) { | 80 if (threads == -1) { |
| 71 threads = num_cores(); | 81 threads = num_cores(); |
| 72 } | 82 } |
| 73 for (int i = 0; i < threads; i++) { | 83 for (int i = 0; i < threads; i++) { |
| 74 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | 84 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); |
| 75 fThreads.top()->start(); | 85 fThreads.top()->start(); |
| 76 } | 86 } |
| 77 } | 87 } |
| 78 | 88 |
| 79 ~ThreadPool() { | 89 ~ThreadPool() { |
| 80 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. | 90 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by n
ow. |
| 81 { | 91 { |
| 82 AutoLock lock(&fReady); | 92 AutoLock lock(&fReady); |
| 83 fDraining = true; | 93 fDraining = true; |
| 84 fReady.broadcast(); | 94 fReady.broadcast(); |
| 85 } | 95 } |
| 86 for (int i = 0; i < fThreads.count(); i++) { | 96 for (int i = 0; i < fThreads.count(); i++) { |
| 87 fThreads[i]->join(); | 97 fThreads[i]->join(); |
| 88 } | 98 } |
| 89 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. | 99 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. |
| 90 fThreads.deleteAll(); | 100 fThreads.deleteAll(); |
| 91 } | 101 } |
| 92 | 102 |
| 93 void add(SkRunnable* task, int32_t* pending) { | 103 void add(void (*fn)(void*), void* arg, int32_t* pending) { |
| 94 Work work = { task, pending }; | 104 Work work = { fn, arg, pending }; |
| 95 sk_atomic_inc(pending); // No barrier needed. | 105 sk_atomic_inc(pending); // No barrier needed. |
| 96 { | 106 { |
| 97 AutoLock lock(&fReady); | 107 AutoLock lock(&fReady); |
| 98 fWork.push(work); | 108 fWork.push(work); |
| 99 fReady.signal(); | 109 fReady.signal(); |
| 100 } | 110 } |
| 101 } | 111 } |
| 102 | 112 |
| 103 static void Loop(void* arg) { | 113 static void Loop(void* arg) { |
| 104 ThreadPool* pool = (ThreadPool*)arg; | 114 ThreadPool* pool = (ThreadPool*)arg; |
| 105 Work work; | 115 Work work; |
| 106 while (true) { | 116 while (true) { |
| 107 { | 117 { |
| 108 AutoLock lock(&pool->fReady); | 118 AutoLock lock(&pool->fReady); |
| 109 while (pool->fWork.isEmpty()) { | 119 while (pool->fWork.isEmpty()) { |
| 110 if (pool->fDraining) { | 120 if (pool->fDraining) { |
| 111 return; | 121 return; |
| 112 } | 122 } |
| 113 pool->fReady.wait(); | 123 pool->fReady.wait(); |
| 114 } | 124 } |
| 115 pool->fWork.pop(&work); | 125 pool->fWork.pop(&work); |
| 116 } | 126 } |
| 117 work.task->run(); | 127 work.fn(work.arg); |
| 118 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load(
) in Wait(). | 128 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load(
) in Wait(). |
| 119 } | 129 } |
| 120 } | 130 } |
| 121 | 131 |
| 122 SkTDArray<Work> fWork; | 132 SkTDArray<Work> fWork; |
| 123 SkTDArray<SkThread*> fThreads; | 133 SkTDArray<SkThread*> fThreads; |
| 124 SkCondVar fReady; | 134 SkCondVar fReady; |
| 125 bool fDraining; | 135 bool fDraining; |
| 126 | 136 |
| 127 static ThreadPool* gGlobal; | 137 static ThreadPool* gGlobal; |
| 128 friend struct SkTaskGroup::Enabler; | 138 friend struct SkTaskGroup::Enabler; |
| 129 }; | 139 }; |
| 130 ThreadPool* ThreadPool::gGlobal = NULL; | 140 ThreadPool* ThreadPool::gGlobal = NULL; |
| 131 | 141 |
| 132 } // namespace | 142 } // namespace |
| 133 | 143 |
| 134 SkTaskGroup::Enabler::Enabler(int threads) { | 144 SkTaskGroup::Enabler::Enabler(int threads) { |
| 135 SkASSERT(ThreadPool::gGlobal == NULL); | 145 SkASSERT(ThreadPool::gGlobal == NULL); |
| 136 if (threads != 0) { | 146 if (threads != 0) { |
| 137 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); | 147 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); |
| 138 } | 148 } |
| 139 } | 149 } |
| 140 | 150 |
| 141 SkTaskGroup::Enabler::~Enabler() { | 151 SkTaskGroup::Enabler::~Enabler() { |
| 142 SkDELETE(ThreadPool::gGlobal); | 152 SkDELETE(ThreadPool::gGlobal); |
| 143 } | 153 } |
| 144 | 154 |
| 145 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 155 SkTaskGroup::SkTaskGroup() : fPending(0) {} |
| 146 | 156 |
| 147 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); } | 157 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPe
nding); } |
| 148 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } | 158 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
fPending); } |
| 159 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } |
| 149 | 160 |
| OLD | NEW |