| OLD | NEW | 
|---|
| 1 /* | 1 /* | 
| 2  * Copyright 2014 Google Inc. | 2  * Copyright 2014 Google Inc. | 
| 3  * | 3  * | 
| 4  * Use of this source code is governed by a BSD-style license that can be | 4  * Use of this source code is governed by a BSD-style license that can be | 
| 5  * found in the LICENSE file. | 5  * found in the LICENSE file. | 
| 6  */ | 6  */ | 
| 7 | 7 | 
|  | 8 #include "SkRunnable.h" | 
|  | 9 #include "SkSemaphore.h" | 
|  | 10 #include "SkSpinlock.h" | 
|  | 11 #include "SkTDArray.h" | 
| 8 #include "SkTaskGroup.h" | 12 #include "SkTaskGroup.h" | 
| 9 |  | 
| 10 #include "SkCondVar.h" |  | 
| 11 #include "SkRunnable.h" |  | 
| 12 #include "SkTDArray.h" |  | 
| 13 #include "SkThread.h" |  | 
| 14 #include "SkThreadUtils.h" | 13 #include "SkThreadUtils.h" | 
| 15 | 14 | 
| 16 #if defined(SK_BUILD_FOR_WIN32) | 15 #if defined(SK_BUILD_FOR_WIN32) | 
| 17     static inline int num_cores() { | 16     static inline int num_cores() { | 
| 18         SYSTEM_INFO sysinfo; | 17         SYSTEM_INFO sysinfo; | 
| 19         GetSystemInfo(&sysinfo); | 18         GetSystemInfo(&sysinfo); | 
| 20         return sysinfo.dwNumberOfProcessors; | 19         return sysinfo.dwNumberOfProcessors; | 
| 21     } | 20     } | 
| 22 #else | 21 #else | 
| 23     #include <unistd.h> | 22     #include <unistd.h> | 
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 56     static void Wait(SkAtomic<int32_t>* pending) { | 55     static void Wait(SkAtomic<int32_t>* pending) { | 
| 57         if (!gGlobal) {  // If we have no threads, the work must already be done
     . | 56         if (!gGlobal) {  // If we have no threads, the work must already be done
     . | 
| 58             SkASSERT(pending->load(sk_memory_order_relaxed) == 0); | 57             SkASSERT(pending->load(sk_memory_order_relaxed) == 0); | 
| 59             return; | 58             return; | 
| 60         } | 59         } | 
| 61         // Acquire pairs with decrement release here or in Loop. | 60         // Acquire pairs with decrement release here or in Loop. | 
| 62         while (pending->load(sk_memory_order_acquire) > 0) { | 61         while (pending->load(sk_memory_order_acquire) > 0) { | 
| 63             // Lend a hand until our SkTaskGroup of interest is done. | 62             // Lend a hand until our SkTaskGroup of interest is done. | 
| 64             Work work; | 63             Work work; | 
| 65             { | 64             { | 
| 66                 AutoLock lock(&gGlobal->fReady); | 65                 // We're stealing work opportunistically, | 
|  | 66                 // so we never call fWorkAvailable.wait(), which could sleep us 
     if there's no work. | 
|  | 67                 // This means fWorkAvailable is only an upper bound on fWork.cou
     nt(). | 
|  | 68                 AutoLock lock(&gGlobal->fWorkLock); | 
| 67                 if (gGlobal->fWork.isEmpty()) { | 69                 if (gGlobal->fWork.isEmpty()) { | 
| 68                     // Someone has picked up all the work (including ours).  How
      nice of them! | 70                     // Someone has picked up all the work (including ours).  How
      nice of them! | 
| 69                     // (They may still be working on it, so we can't assert *pen
     ding == 0 here.) | 71                     // (They may still be working on it, so we can't assert *pen
     ding == 0 here.) | 
| 70                     continue; | 72                     continue; | 
| 71                 } | 73                 } | 
| 72                 gGlobal->fWork.pop(&work); | 74                 gGlobal->fWork.pop(&work); | 
| 73             } | 75             } | 
| 74             // This Work isn't necessarily part of our SkTaskGroup of interest, 
     but that's fine. | 76             // This Work isn't necessarily part of our SkTaskGroup of interest, 
     but that's fine. | 
| 75             // We threads gotta stick together.  We're always making forward pro
     gress. | 77             // We threads gotta stick together.  We're always making forward pro
     gress. | 
| 76             work.fn(work.arg); | 78             work.fn(work.arg); | 
| 77             work.pending->fetch_add(-1, sk_memory_order_release);  // Pairs with
      load above. | 79             work.pending->fetch_add(-1, sk_memory_order_release);  // Pairs with
      load above. | 
| 78         } | 80         } | 
| 79     } | 81     } | 
| 80 | 82 | 
| 81 private: | 83 private: | 
| 82     struct AutoLock { | 84     struct AutoLock { | 
| 83         AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } | 85         AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } | 
| 84         ~AutoLock() { fC->unlock(); } | 86         ~AutoLock() { fLock->release(); } | 
| 85     private: | 87     private: | 
| 86         SkCondVar* fC; | 88         SkSpinlock* fLock; | 
| 87     }; | 89     }; | 
| 88 | 90 | 
| 89     static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); 
     } | 91     static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); 
     } | 
| 90 | 92 | 
| 91     struct Work { | 93     struct Work { | 
| 92         void (*fn)(void*);            // A function to call, | 94         void (*fn)(void*);            // A function to call, | 
| 93         void* arg;                    // its argument, | 95         void* arg;                    // its argument, | 
| 94         SkAtomic<int32_t>* pending;   // then decrement pending afterwards. | 96         SkAtomic<int32_t>* pending;   // then decrement pending afterwards. | 
| 95     }; | 97     }; | 
| 96 | 98 | 
| 97     explicit ThreadPool(int threads) : fDraining(false) { | 99     explicit ThreadPool(int threads) { | 
| 98         if (threads == -1) { | 100         if (threads == -1) { | 
| 99             threads = num_cores(); | 101             threads = num_cores(); | 
| 100         } | 102         } | 
| 101         for (int i = 0; i < threads; i++) { | 103         for (int i = 0; i < threads; i++) { | 
| 102             fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | 104             fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); | 
| 103             fThreads.top()->start(); | 105             fThreads.top()->start(); | 
| 104         } | 106         } | 
| 105     } | 107     } | 
| 106 | 108 | 
| 107     ~ThreadPool() { | 109     ~ThreadPool() { | 
| 108         SkASSERT(fWork.isEmpty());  // All SkTaskGroups should be destroyed by n
     ow. | 110         SkASSERT(fWork.isEmpty());  // All SkTaskGroups should be destroyed by n
     ow. | 
| 109         { | 111 | 
| 110             AutoLock lock(&fReady); | 112         // Send a poison pill to each thread. | 
| 111             fDraining = true; | 113         SkAtomic<int> dummy(0); | 
| 112             fReady.broadcast(); | 114         for (int i = 0; i < fThreads.count(); i++) { | 
|  | 115             this->add(NULL, NULL, &dummy); | 
| 113         } | 116         } | 
|  | 117         // Wait for them all to swallow the pill and die. | 
| 114         for (int i = 0; i < fThreads.count(); i++) { | 118         for (int i = 0; i < fThreads.count(); i++) { | 
| 115             fThreads[i]->join(); | 119             fThreads[i]->join(); | 
| 116         } | 120         } | 
| 117         SkASSERT(fWork.isEmpty());  // Can't hurt to double check. | 121         SkASSERT(fWork.isEmpty());  // Can't hurt to double check. | 
| 118         fThreads.deleteAll(); | 122         fThreads.deleteAll(); | 
| 119     } | 123     } | 
| 120 | 124 | 
| 121     void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { | 125     void add(void (*fn)(void*), void* arg, SkAtomic<int32_t>* pending) { | 
| 122         Work work = { fn, arg, pending }; | 126         Work work = { fn, arg, pending }; | 
| 123         pending->fetch_add(+1, sk_memory_order_relaxed);  // No barrier needed. | 127         pending->fetch_add(+1, sk_memory_order_relaxed);  // No barrier needed. | 
| 124         { | 128         { | 
| 125             AutoLock lock(&fReady); | 129             AutoLock lock(&fWorkLock); | 
| 126             fWork.push(work); | 130             fWork.push(work); | 
| 127             fReady.signal(); |  | 
| 128         } | 131         } | 
|  | 132         fWorkAvailable.signal(1); | 
| 129     } | 133     } | 
| 130 | 134 | 
| 131     void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3
     2_t>* pending) { | 135     void batch(void (*fn)(void*), void* arg, int N, size_t stride, SkAtomic<int3
     2_t>* pending) { | 
| 132         pending->fetch_add(+N, sk_memory_order_relaxed);  // No barrier needed. | 136         pending->fetch_add(+N, sk_memory_order_relaxed);  // No barrier needed. | 
| 133         { | 137         { | 
| 134             AutoLock lock(&fReady); | 138             AutoLock lock(&fWorkLock); | 
| 135             Work* batch = fWork.append(N); | 139             Work* batch = fWork.append(N); | 
| 136             for (int i = 0; i < N; i++) { | 140             for (int i = 0; i < N; i++) { | 
| 137                 Work work = { fn, (char*)arg + i*stride, pending }; | 141                 Work work = { fn, (char*)arg + i*stride, pending }; | 
| 138                 batch[i] = work; | 142                 batch[i] = work; | 
| 139             } | 143             } | 
| 140             fReady.broadcast(); |  | 
| 141         } | 144         } | 
|  | 145         fWorkAvailable.signal(N); | 
| 142     } | 146     } | 
| 143 | 147 | 
| 144     static void Loop(void* arg) { | 148     static void Loop(void* arg) { | 
| 145         ThreadPool* pool = (ThreadPool*)arg; | 149         ThreadPool* pool = (ThreadPool*)arg; | 
| 146         Work work; | 150         Work work; | 
| 147         while (true) { | 151         while (true) { | 
|  | 152             // Sleep until there's work available, and claim one unit of Work as
      we wake. | 
|  | 153             pool->fWorkAvailable.wait(); | 
| 148             { | 154             { | 
| 149                 AutoLock lock(&pool->fReady); | 155                 AutoLock lock(&pool->fWorkLock); | 
| 150                 while (pool->fWork.isEmpty()) { | 156                 if (pool->fWork.isEmpty()) { | 
| 151                     if (pool->fDraining) { | 157                     // Someone in Wait() stole our work (fWorkAvailable is an up
     per bound). | 
| 152                         return; | 158                     // Well, that's fine, back to sleep for us. | 
| 153                     } | 159                     continue; | 
| 154                     pool->fReady.wait(); |  | 
| 155                 } | 160                 } | 
| 156                 pool->fWork.pop(&work); | 161                 pool->fWork.pop(&work); | 
| 157             } | 162             } | 
|  | 163             if (!work.fn) { | 
|  | 164                 return;  // Poison pill.  Time... to die. | 
|  | 165             } | 
| 158             work.fn(work.arg); | 166             work.fn(work.arg); | 
| 159             work.pending->fetch_add(-1, sk_memory_order_release);  // Pairs with
      load in Wait(). | 167             work.pending->fetch_add(-1, sk_memory_order_release);  // Pairs with
      load in Wait(). | 
| 160         } | 168         } | 
| 161     } | 169     } | 
| 162 | 170 | 
| 163     SkTDArray<Work>      fWork; | 171     // fWorkLock must be held when reading or modifying fWork. | 
|  | 172     SkSpinlock      fWorkLock; | 
|  | 173     SkTDArray<Work> fWork; | 
|  | 174 | 
|  | 175     // A thread-safe upper bound for fWork.count(). | 
|  | 176     // | 
|  | 177     // We'd have it be an exact count but for the loop in Wait(): | 
|  | 178     // we never want that to block, so it can't call fWorkAvailable.wait(), | 
|  | 179     // and that's the only way to decrement fWorkAvailable. | 
|  | 180     // So fWorkAvailable may overcount actual the work available. | 
|  | 181     // We make do, but this means some worker threads may wake spuriously. | 
|  | 182     SkSemaphore fWorkAvailable; | 
|  | 183 | 
|  | 184     // These are only changed in a single-threaded context. | 
| 164     SkTDArray<SkThread*> fThreads; | 185     SkTDArray<SkThread*> fThreads; | 
| 165     SkCondVar            fReady; | 186     static ThreadPool* gGlobal; | 
| 166     bool                 fDraining; |  | 
| 167 | 187 | 
| 168     static ThreadPool* gGlobal; |  | 
| 169     friend struct SkTaskGroup::Enabler; | 188     friend struct SkTaskGroup::Enabler; | 
| 170 }; | 189 }; | 
| 171 ThreadPool* ThreadPool::gGlobal = NULL; | 190 ThreadPool* ThreadPool::gGlobal = NULL; | 
| 172 | 191 | 
| 173 }  // namespace | 192 }  // namespace | 
| 174 | 193 | 
| 175 SkTaskGroup::Enabler::Enabler(int threads) { | 194 SkTaskGroup::Enabler::Enabler(int threads) { | 
| 176     SkASSERT(ThreadPool::gGlobal == NULL); | 195     SkASSERT(ThreadPool::gGlobal == NULL); | 
| 177     if (threads != 0 && SkCondVar::Supported()) { | 196     if (threads != 0) { | 
| 178         ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); | 197         ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); | 
| 179     } | 198     } | 
| 180 } | 199 } | 
| 181 | 200 | 
| 182 SkTaskGroup::Enabler::~Enabler() { | 201 SkTaskGroup::Enabler::~Enabler() { | 
| 183     SkDELETE(ThreadPool::gGlobal); | 202     SkDELETE(ThreadPool::gGlobal); | 
| 184 } | 203 } | 
| 185 | 204 | 
| 186 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 205 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 
| 187 | 206 | 
| 188 void SkTaskGroup::wait()                            { ThreadPool::Wait(&fPending
     ); } | 207 void SkTaskGroup::wait()                            { ThreadPool::Wait(&fPending
     ); } | 
| 189 void SkTaskGroup::add(SkRunnable* task)             { ThreadPool::Add(task, &fPe
     nding); } | 208 void SkTaskGroup::add(SkRunnable* task)             { ThreadPool::Add(task, &fPe
     nding); } | 
| 190 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
     fPending); } | 209 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &
     fPending); } | 
| 191 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { | 210 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { | 
| 192     ThreadPool::Batch(fn, args, N, stride, &fPending); | 211     ThreadPool::Batch(fn, args, N, stride, &fPending); | 
| 193 } | 212 } | 
| 194 | 213 | 
| OLD | NEW | 
|---|