OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "cc/base/worker_pool.h" | 5 #include "cc/base/worker_pool.h" |
6 | 6 |
7 #if defined(OS_ANDROID) | 7 #include <algorithm> |
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | |
9 #include <sys/resource.h> | |
10 #endif | |
11 | |
12 #include <map> | |
13 | 8 |
14 #include "base/bind.h" | 9 #include "base/bind.h" |
15 #include "base/debug/trace_event.h" | 10 #include "base/debug/trace_event.h" |
16 #include "base/hash_tables.h" | |
17 #include "base/stringprintf.h" | 11 #include "base/stringprintf.h" |
| 12 #include "base/synchronization/condition_variable.h" |
18 #include "base/threading/simple_thread.h" | 13 #include "base/threading/simple_thread.h" |
19 #include "base/threading/thread_restrictions.h" | 14 #include "base/threading/thread_restrictions.h" |
20 #include "cc/base/scoped_ptr_deque.h" | |
21 #include "cc/base/scoped_ptr_hash_map.h" | |
22 | |
23 #if defined(COMPILER_GCC) | |
24 namespace BASE_HASH_NAMESPACE { | |
25 template <> struct hash<cc::internal::WorkerPoolTask*> { | |
26 size_t operator()(cc::internal::WorkerPoolTask* ptr) const { | |
27 return hash<size_t>()(reinterpret_cast<size_t>(ptr)); | |
28 } | |
29 }; | |
30 } // namespace BASE_HASH_NAMESPACE | |
31 #endif // COMPILER | |
32 | 15 |
33 namespace cc { | 16 namespace cc { |
34 | 17 |
| 18 namespace { |
| 19 |
| 20 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
| 21 public: |
| 22 WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
| 23 const base::Closure& reply) |
| 24 : internal::WorkerPoolTask(reply), |
| 25 task_(task) {} |
| 26 |
| 27 virtual void RunOnThread(unsigned thread_index) OVERRIDE { |
| 28 task_.Run(); |
| 29 } |
| 30 |
| 31 private: |
| 32 WorkerPool::Callback task_; |
| 33 }; |
| 34 |
| 35 } // namespace |
| 36 |
35 namespace internal { | 37 namespace internal { |
36 | 38 |
37 WorkerPoolTask::WorkerPoolTask() | 39 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { |
38 : did_schedule_(false), | |
39 did_run_(false), | |
40 did_complete_(false) { | |
41 } | |
42 | |
43 WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies) | |
44 : did_schedule_(false), | |
45 did_run_(false), | |
46 did_complete_(false) { | |
47 dependencies_.swap(*dependencies); | |
48 } | 40 } |
49 | 41 |
50 WorkerPoolTask::~WorkerPoolTask() { | 42 WorkerPoolTask::~WorkerPoolTask() { |
51 DCHECK_EQ(did_schedule_, did_complete_); | |
52 DCHECK(!did_run_ || did_schedule_); | |
53 DCHECK(!did_run_ || did_complete_); | |
54 } | |
55 | |
56 void WorkerPoolTask::DidSchedule() { | |
57 DCHECK(!did_complete_); | |
58 did_schedule_ = true; | |
59 } | |
60 | |
61 void WorkerPoolTask::WillRun() { | |
62 DCHECK(did_schedule_); | |
63 DCHECK(!did_complete_); | |
64 DCHECK(!did_run_); | |
65 } | |
66 | |
67 void WorkerPoolTask::DidRun() { | |
68 did_run_ = true; | |
69 } | 43 } |
70 | 44 |
71 void WorkerPoolTask::DidComplete() { | 45 void WorkerPoolTask::DidComplete() { |
72 DCHECK(did_schedule_); | 46 reply_.Run(); |
73 DCHECK(!did_complete_); | |
74 did_complete_ = true; | |
75 } | |
76 | |
77 bool WorkerPoolTask::IsReadyToRun() const { | |
78 // TODO(reveman): Use counter to improve performance. | |
79 for (TaskVector::const_reverse_iterator it = dependencies_.rbegin(); | |
80 it != dependencies_.rend(); ++it) { | |
81 WorkerPoolTask* dependency = *it; | |
82 if (!dependency->HasFinishedRunning()) | |
83 return false; | |
84 } | |
85 return true; | |
86 } | |
87 | |
88 bool WorkerPoolTask::HasFinishedRunning() const { | |
89 return did_run_; | |
90 } | |
91 | |
92 bool WorkerPoolTask::HasCompleted() const { | |
93 return did_complete_; | |
94 } | 47 } |
95 | 48 |
96 } // namespace internal | 49 } // namespace internal |
97 | 50 |
98 // Internal to the worker pool. Any data or logic that needs to be | 51 // Internal to the worker pool. Any data or logic that needs to be |
99 // shared between threads lives in this class. All members are guarded | 52 // shared between threads lives in this class. All members are guarded |
100 // by |lock_|. | 53 // by |lock_|. |
101 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { | 54 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
102 public: | 55 public: |
103 Inner(WorkerPool* worker_pool, | 56 Inner(WorkerPool* worker_pool, |
104 size_t num_threads, | 57 size_t num_threads, |
105 const std::string& thread_name_prefix); | 58 const std::string& thread_name_prefix); |
106 virtual ~Inner(); | 59 virtual ~Inner(); |
107 | 60 |
108 void Shutdown(); | 61 void Shutdown(); |
109 | 62 |
110 // Schedule running of |root| task and all its dependencies. Tasks | 63 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); |
111 // previously scheduled but no longer needed to run |root| will be | |
112 // canceled unless already running. Canceled tasks are moved to | |
113 // |completed_tasks_| without being run. The result is that once | |
114 // scheduled, a task is guaranteed to end up in the |completed_tasks_| | |
115 // queue even if they later get canceled by another call to | |
116 // ScheduleTasks(). | |
117 void ScheduleTasks(internal::WorkerPoolTask* root); | |
118 | 64 |
119 // Collect all completed tasks in |completed_tasks|. Returns true if idle. | 65 // Appends all completed tasks to worker pool's completed tasks queue |
120 bool CollectCompletedTasks(TaskDeque* completed_tasks); | 66 // and returns true if idle. |
| 67 bool CollectCompletedTasks(); |
121 | 68 |
122 private: | 69 private: |
123 class ScheduledTask { | 70 // Appends all completed tasks to |completed_tasks|. Lock must |
124 public: | 71 // already be acquired before calling this function. |
125 ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority) | 72 bool AppendCompletedTasksWithLockAcquired( |
126 : priority_(priority) { | 73 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); |
127 if (dependent) | |
128 dependents_.push_back(dependent); | |
129 } | |
130 | |
131 internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; } | |
132 unsigned priority() const { return priority_; } | |
133 | |
134 private: | |
135 internal::WorkerPoolTask::TaskVector dependents_; | |
136 unsigned priority_; | |
137 }; | |
138 typedef internal::WorkerPoolTask* ScheduledTaskMapKey; | |
139 typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask> | |
140 ScheduledTaskMap; | |
141 | |
142 // This builds a ScheduledTaskMap from a root task. | |
143 static unsigned BuildScheduledTaskMapRecursive( | |
144 internal::WorkerPoolTask* task, | |
145 internal::WorkerPoolTask* dependent, | |
146 unsigned priority, | |
147 ScheduledTaskMap* scheduled_tasks); | |
148 static void BuildScheduledTaskMap( | |
149 internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks); | |
150 | |
151 // Collect all completed tasks by swapping the contents of | |
152 // |completed_tasks| and |completed_tasks_|. Lock must be acquired | |
153 // before calling this function. Returns true if idle. | |
154 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); | |
155 | 74 |
156 // Schedule an OnIdleOnOriginThread callback if not already pending. | 75 // Schedule an OnIdleOnOriginThread callback if not already pending. |
157 // Lock must already be acquired before calling this function. | 76 // Lock must already be acquired before calling this function. |
158 void ScheduleOnIdleWithLockAcquired(); | 77 void ScheduleOnIdleWithLockAcquired(); |
159 void OnIdleOnOriginThread(); | 78 void OnIdleOnOriginThread(); |
160 | 79 |
161 // Overridden from base::DelegateSimpleThread: | 80 // Overridden from base::DelegateSimpleThread: |
162 virtual void Run() OVERRIDE; | 81 virtual void Run() OVERRIDE; |
163 | 82 |
164 // Pointer to worker pool. Can only be used on origin thread. | 83 // Pointer to worker pool. Can only be used on origin thread. |
165 // Not guarded by |lock_|. | 84 // Not guarded by |lock_|. |
166 WorkerPool* worker_pool_on_origin_thread_; | 85 WorkerPool* worker_pool_on_origin_thread_; |
167 | 86 |
168 // This lock protects all members of this class except | 87 // This lock protects all members of this class except |
169 // |worker_pool_on_origin_thread_|. Do not read or modify anything | 88 // |worker_pool_on_origin_thread_|. Do not read or modify anything |
170 // without holding this lock. Do not block while holding this lock. | 89 // without holding this lock. Do not block while holding this lock. |
171 mutable base::Lock lock_; | 90 mutable base::Lock lock_; |
172 | 91 |
173 // Condition variable that is waited on by worker threads until new | 92 // Condition variable that is waited on by worker threads until new |
174 // tasks are ready to run or shutdown starts. | 93 // tasks are posted or shutdown starts. |
175 base::ConditionVariable has_ready_to_run_tasks_cv_; | 94 base::ConditionVariable has_pending_tasks_cv_; |
176 | 95 |
177 // Target message loop used for posting callbacks. | 96 // Target message loop used for posting callbacks. |
178 scoped_refptr<base::MessageLoopProxy> origin_loop_; | 97 scoped_refptr<base::MessageLoopProxy> origin_loop_; |
179 | 98 |
180 base::WeakPtrFactory<Inner> weak_ptr_factory_; | 99 base::WeakPtrFactory<Inner> weak_ptr_factory_; |
181 | 100 |
182 const base::Closure on_idle_callback_; | 101 const base::Closure on_idle_callback_; |
183 // Set when a OnIdleOnOriginThread() callback is pending. | 102 // Set when a OnIdleOnOriginThread() callback is pending. |
184 bool on_idle_pending_; | 103 bool on_idle_pending_; |
185 | 104 |
186 // Provides each running thread loop with a unique index. First thread | 105 // Provides each running thread loop with a unique index. First thread |
187 // loop index is 0. | 106 // loop index is 0. |
188 unsigned next_thread_index_; | 107 unsigned next_thread_index_; |
189 | 108 |
| 109 // Number of tasks currently running. |
| 110 unsigned running_task_count_; |
| 111 |
190 // Set during shutdown. Tells workers to exit when no more tasks | 112 // Set during shutdown. Tells workers to exit when no more tasks |
191 // are pending. | 113 // are pending. |
192 bool shutdown_; | 114 bool shutdown_; |
193 | 115 |
194 // The root task that is a dependent of all other tasks. | 116 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; |
195 scoped_refptr<internal::WorkerPoolTask> root_; | 117 TaskDeque pending_tasks_; |
196 | |
197 // This set contains all pending tasks. | |
198 ScheduledTaskMap pending_tasks_; | |
199 | |
200 // Ordered set of tasks that are ready to run. | |
201 // TODO(reveman): priority_queue might be more efficient. | |
202 typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap; | |
203 TaskMap ready_to_run_tasks_; | |
204 | |
205 // This set contains all currently running tasks. | |
206 ScheduledTaskMap running_tasks_; | |
207 | |
208 // Completed tasks not yet collected by origin thread. | |
209 TaskDeque completed_tasks_; | 118 TaskDeque completed_tasks_; |
210 | 119 |
211 ScopedPtrDeque<base::DelegateSimpleThread> workers_; | 120 ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
212 | 121 |
213 DISALLOW_COPY_AND_ASSIGN(Inner); | 122 DISALLOW_COPY_AND_ASSIGN(Inner); |
214 }; | 123 }; |
215 | 124 |
216 WorkerPool::Inner::Inner(WorkerPool* worker_pool, | 125 WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
217 size_t num_threads, | 126 size_t num_threads, |
218 const std::string& thread_name_prefix) | 127 const std::string& thread_name_prefix) |
219 : worker_pool_on_origin_thread_(worker_pool), | 128 : worker_pool_on_origin_thread_(worker_pool), |
220 lock_(), | 129 lock_(), |
221 has_ready_to_run_tasks_cv_(&lock_), | 130 has_pending_tasks_cv_(&lock_), |
222 origin_loop_(base::MessageLoopProxy::current()), | 131 origin_loop_(base::MessageLoopProxy::current()), |
223 weak_ptr_factory_(this), | 132 weak_ptr_factory_(this), |
224 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, | 133 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
225 weak_ptr_factory_.GetWeakPtr())), | 134 weak_ptr_factory_.GetWeakPtr())), |
226 on_idle_pending_(false), | 135 on_idle_pending_(false), |
227 next_thread_index_(0), | 136 next_thread_index_(0), |
| 137 running_task_count_(0), |
228 shutdown_(false) { | 138 shutdown_(false) { |
229 base::AutoLock lock(lock_); | 139 base::AutoLock lock(lock_); |
230 | 140 |
231 while (workers_.size() < num_threads) { | 141 while (workers_.size() < num_threads) { |
232 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | 142 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
233 new base::DelegateSimpleThread( | 143 new base::DelegateSimpleThread( |
234 this, | 144 this, |
235 thread_name_prefix + | 145 thread_name_prefix + |
236 base::StringPrintf( | 146 base::StringPrintf( |
237 "Worker%u", | 147 "Worker%u", |
238 static_cast<unsigned>(workers_.size() + 1)).c_str())); | 148 static_cast<unsigned>(workers_.size() + 1)).c_str())); |
239 worker->Start(); | 149 worker->Start(); |
240 workers_.push_back(worker.Pass()); | 150 workers_.push_back(worker.Pass()); |
241 } | 151 } |
242 } | 152 } |
243 | 153 |
244 WorkerPool::Inner::~Inner() { | 154 WorkerPool::Inner::~Inner() { |
245 base::AutoLock lock(lock_); | 155 base::AutoLock lock(lock_); |
246 | 156 |
247 DCHECK(shutdown_); | 157 DCHECK(shutdown_); |
248 | 158 |
| 159 // Cancel all pending callbacks. |
| 160 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 161 |
249 DCHECK_EQ(0u, pending_tasks_.size()); | 162 DCHECK_EQ(0u, pending_tasks_.size()); |
250 DCHECK_EQ(0u, ready_to_run_tasks_.size()); | |
251 DCHECK_EQ(0u, running_tasks_.size()); | |
252 DCHECK_EQ(0u, completed_tasks_.size()); | 163 DCHECK_EQ(0u, completed_tasks_.size()); |
| 164 DCHECK_EQ(0u, running_task_count_); |
253 } | 165 } |
254 | 166 |
255 void WorkerPool::Inner::Shutdown() { | 167 void WorkerPool::Inner::Shutdown() { |
256 { | 168 { |
257 base::AutoLock lock(lock_); | 169 base::AutoLock lock(lock_); |
258 | 170 |
259 DCHECK(!shutdown_); | 171 DCHECK(!shutdown_); |
260 shutdown_ = true; | 172 shutdown_ = true; |
261 | 173 |
262 // Wake up a worker so it knows it should exit. This will cause all workers | 174 // Wake up a worker so it knows it should exit. This will cause all workers |
263 // to exit as each will wake up another worker before exiting. | 175 // to exit as each will wake up another worker before exiting. |
264 has_ready_to_run_tasks_cv_.Signal(); | 176 has_pending_tasks_cv_.Signal(); |
265 } | 177 } |
266 | 178 |
267 while (workers_.size()) { | 179 while (workers_.size()) { |
268 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | 180 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
269 // http://crbug.com/240453 - Join() is considered IO and will block this | 181 // http://crbug.com/240453 - Join() is considered IO and will block this |
270 // thread. See also http://crbug.com/239423 for further ideas. | 182 // thread. See also http://crbug.com/239423 for further ideas. |
271 base::ThreadRestrictions::ScopedAllowIO allow_io; | 183 base::ThreadRestrictions::ScopedAllowIO allow_io; |
272 worker->Join(); | 184 worker->Join(); |
273 } | 185 } |
274 | |
275 // Cancel any pending OnIdle callback. | |
276 weak_ptr_factory_.InvalidateWeakPtrs(); | |
277 } | 186 } |
278 | 187 |
279 void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) { | 188 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
280 // It is OK to call ScheduleTasks() after shutdown if |root| is NULL. | 189 base::AutoLock lock(lock_); |
281 DCHECK(!root || !shutdown_); | |
282 | 190 |
283 scoped_refptr<internal::WorkerPoolTask> new_root(root); | 191 pending_tasks_.push_back(task.Pass()); |
284 | 192 |
285 ScheduledTaskMap new_pending_tasks; | 193 // There is more work available, so wake up worker thread. |
286 ScheduledTaskMap new_running_tasks; | 194 has_pending_tasks_cv_.Signal(); |
287 TaskMap new_ready_to_run_tasks; | |
288 | |
289 // Build scheduled task map before acquiring |lock_|. | |
290 if (root) | |
291 BuildScheduledTaskMap(root, &new_pending_tasks); | |
292 | |
293 { | |
294 base::AutoLock lock(lock_); | |
295 | |
296 // First remove all completed tasks from |new_pending_tasks|. | |
297 for (TaskDeque::iterator it = completed_tasks_.begin(); | |
298 it != completed_tasks_.end(); ++it) { | |
299 internal::WorkerPoolTask* task = *it; | |
300 new_pending_tasks.take_and_erase(task); | |
301 } | |
302 | |
303 // Move tasks not present in |new_pending_tasks| to |completed_tasks_|. | |
304 for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); | |
305 it != pending_tasks_.end(); ++it) { | |
306 internal::WorkerPoolTask* task = it->first; | |
307 | |
308 // Task has completed if not present in |new_pending_tasks|. | |
309 if (!new_pending_tasks.contains(task)) | |
310 completed_tasks_.push_back(task); | |
311 } | |
312 | |
313 // Build new running task set. | |
314 for (ScheduledTaskMap::iterator it = running_tasks_.begin(); | |
315 it != running_tasks_.end(); ++it) { | |
316 internal::WorkerPoolTask* task = it->first; | |
317 // Transfer scheduled task value from |new_pending_tasks| to | |
318 // |new_running_tasks| if currently running. Value must be set to | |
319 // NULL if |new_pending_tasks| doesn't contain task. This does | |
320 // the right in both cases. | |
321 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); | |
322 } | |
323 | |
324 // Build new "ready to run" tasks queue. | |
325 for (ScheduledTaskMap::iterator it = new_pending_tasks.begin(); | |
326 it != new_pending_tasks.end(); ++it) { | |
327 internal::WorkerPoolTask* task = it->first; | |
328 | |
329 // Completed tasks should not exist in |new_pending_tasks_|. | |
330 DCHECK(!task->HasFinishedRunning()); | |
331 | |
332 // Call DidSchedule() to indicate that this task has been scheduled. | |
333 // Note: This is only for debugging purposes. | |
334 task->DidSchedule(); | |
335 | |
336 DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority())); | |
337 if (task->IsReadyToRun()) | |
338 new_ready_to_run_tasks[it->second->priority()] = task; | |
339 } | |
340 | |
341 // Swap root taskand task sets. | |
342 // Note: old tasks are intentionally destroyed after releasing |lock_|. | |
343 root_.swap(new_root); | |
344 pending_tasks_.swap(new_pending_tasks); | |
345 running_tasks_.swap(new_running_tasks); | |
346 ready_to_run_tasks_.swap(new_ready_to_run_tasks); | |
347 | |
348 // If there is more work available, wake up worker thread. | |
349 if (!ready_to_run_tasks_.empty()) | |
350 has_ready_to_run_tasks_cv_.Signal(); | |
351 } | |
352 } | 195 } |
353 | 196 |
354 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { | 197 bool WorkerPool::Inner::CollectCompletedTasks() { |
355 base::AutoLock lock(lock_); | 198 base::AutoLock lock(lock_); |
356 | 199 |
357 return CollectCompletedTasksWithLockAcquired(completed_tasks); | 200 return AppendCompletedTasksWithLockAcquired( |
| 201 &worker_pool_on_origin_thread_->completed_tasks_); |
358 } | 202 } |
359 | 203 |
360 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( | 204 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( |
361 TaskDeque* completed_tasks) { | 205 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { |
362 lock_.AssertAcquired(); | 206 lock_.AssertAcquired(); |
363 | 207 |
364 DCHECK_EQ(0u, completed_tasks->size()); | 208 while (completed_tasks_.size()) |
365 completed_tasks->swap(completed_tasks_); | 209 completed_tasks->push_back(completed_tasks_.take_front().Pass()); |
366 | 210 |
367 return running_tasks_.empty() && pending_tasks_.empty(); | 211 return !running_task_count_ && pending_tasks_.empty(); |
368 } | 212 } |
369 | 213 |
370 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { | 214 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
371 lock_.AssertAcquired(); | 215 lock_.AssertAcquired(); |
372 | 216 |
373 if (on_idle_pending_) | 217 if (on_idle_pending_) |
374 return; | 218 return; |
375 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); | 219 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); |
376 on_idle_pending_ = true; | 220 on_idle_pending_ = true; |
377 } | 221 } |
378 | 222 |
379 void WorkerPool::Inner::OnIdleOnOriginThread() { | 223 void WorkerPool::Inner::OnIdleOnOriginThread() { |
380 TaskDeque completed_tasks; | |
381 | |
382 { | 224 { |
383 base::AutoLock lock(lock_); | 225 base::AutoLock lock(lock_); |
384 | 226 |
385 DCHECK(on_idle_pending_); | 227 DCHECK(on_idle_pending_); |
386 on_idle_pending_ = false; | 228 on_idle_pending_ = false; |
387 | 229 |
388 // Early out if no longer idle. | 230 // Early out if no longer idle. |
389 if (!running_tasks_.empty() || !pending_tasks_.empty()) | 231 if (running_task_count_ || !pending_tasks_.empty()) |
390 return; | 232 return; |
391 | 233 |
392 CollectCompletedTasksWithLockAcquired(&completed_tasks); | 234 AppendCompletedTasksWithLockAcquired( |
| 235 &worker_pool_on_origin_thread_->completed_tasks_); |
393 } | 236 } |
394 | 237 |
395 worker_pool_on_origin_thread_->OnIdle(&completed_tasks); | 238 worker_pool_on_origin_thread_->OnIdle(); |
396 } | 239 } |
397 | 240 |
398 void WorkerPool::Inner::Run() { | 241 void WorkerPool::Inner::Run() { |
399 #if defined(OS_ANDROID) | 242 #if defined(OS_ANDROID) |
400 base::PlatformThread::SetThreadPriority( | 243 base::PlatformThread::SetThreadPriority( |
401 base::PlatformThread::CurrentHandle(), | 244 base::PlatformThread::CurrentHandle(), |
402 base::kThreadPriority_Background); | 245 base::kThreadPriority_Background); |
403 #endif | 246 #endif |
404 | 247 |
405 base::AutoLock lock(lock_); | 248 base::AutoLock lock(lock_); |
406 | 249 |
407 // Get a unique thread index. | 250 // Get a unique thread index. |
408 int thread_index = next_thread_index_++; | 251 int thread_index = next_thread_index_++; |
409 | 252 |
410 while (true) { | 253 while (true) { |
411 if (ready_to_run_tasks_.empty()) { | 254 if (pending_tasks_.empty()) { |
412 if (pending_tasks_.empty()) { | 255 // Exit when shutdown is set and no more tasks are pending. |
413 // Exit when shutdown is set and no more tasks are pending. | 256 if (shutdown_) |
414 if (shutdown_) | 257 break; |
415 break; | |
416 | 258 |
417 // Schedule an idle callback if no tasks are running. | 259 // Schedule an idle callback if requested and not pending. |
418 if (running_tasks_.empty()) | 260 if (!running_task_count_) |
419 ScheduleOnIdleWithLockAcquired(); | 261 ScheduleOnIdleWithLockAcquired(); |
420 } | |
421 | 262 |
422 // Wait for more tasks. | 263 // Wait for new pending tasks. |
423 has_ready_to_run_tasks_cv_.Wait(); | 264 has_pending_tasks_cv_.Wait(); |
424 continue; | 265 continue; |
425 } | 266 } |
426 | 267 |
427 // Take top priority task from |ready_to_run_tasks_|. | 268 // Get next task. |
428 scoped_refptr<internal::WorkerPoolTask> task( | 269 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
429 ready_to_run_tasks_.begin()->second); | |
430 ready_to_run_tasks_.erase(ready_to_run_tasks_.begin()); | |
431 | 270 |
432 // Move task from |pending_tasks_| to |running_tasks_|. | 271 // Increment |running_task_count_| before starting to run task. |
433 DCHECK(pending_tasks_.contains(task)); | 272 running_task_count_++; |
434 DCHECK(!running_tasks_.contains(task)); | |
435 running_tasks_.set(task, pending_tasks_.take_and_erase(task)); | |
436 | 273 |
437 // There may be more work available, so wake up another worker thread. | 274 // There may be more work available, so wake up another |
438 has_ready_to_run_tasks_cv_.Signal(); | 275 // worker thread. |
439 | 276 has_pending_tasks_cv_.Signal(); |
440 // Call WillRun() before releasing |lock_| and running task. | |
441 task->WillRun(); | |
442 | 277 |
443 { | 278 { |
444 base::AutoUnlock unlock(lock_); | 279 base::AutoUnlock unlock(lock_); |
445 | 280 |
446 task->RunOnThread(thread_index); | 281 task->RunOnThread(thread_index); |
447 } | 282 } |
448 | 283 |
449 // This will mark task as finished running. | 284 completed_tasks_.push_back(task.Pass()); |
450 task->DidRun(); | |
451 | 285 |
452 // Now iterate over all dependents to check if they are ready to run. | 286 // Decrement |running_task_count_| now that we are done running task. |
453 scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase( | 287 running_task_count_--; |
454 task); | |
455 if (scheduled_task) { | |
456 typedef internal::WorkerPoolTask::TaskVector TaskVector; | |
457 for (TaskVector::iterator it = scheduled_task->dependents().begin(); | |
458 it != scheduled_task->dependents().end(); ++it) { | |
459 internal::WorkerPoolTask* dependent = *it; | |
460 if (!dependent->IsReadyToRun()) | |
461 continue; | |
462 | |
463 // Task is ready. Add it to |ready_to_run_tasks_|. | |
464 DCHECK(pending_tasks_.contains(dependent)); | |
465 unsigned priority = pending_tasks_.get(dependent)->priority(); | |
466 DCHECK(!ready_to_run_tasks_.count(priority) || | |
467 ready_to_run_tasks_[priority] == dependent); | |
468 ready_to_run_tasks_[priority] = dependent; | |
469 } | |
470 } | |
471 | |
472 // Finally add task to |completed_tasks_|. | |
473 completed_tasks_.push_back(task); | |
474 } | 288 } |
475 | 289 |
476 // We noticed we should exit. Wake up the next worker so it knows it should | 290 // We noticed we should exit. Wake up the next worker so it knows it should |
477 // exit as well (because the Shutdown() code only signals once). | 291 // exit as well (because the Shutdown() code only signals once). |
478 has_ready_to_run_tasks_cv_.Signal(); | 292 has_pending_tasks_cv_.Signal(); |
479 } | |
480 | |
481 // BuildScheduledTaskMap() takes a task tree as input and constructs | |
482 // a unique set of tasks with edges between dependencies pointing in | |
483 // the direction of the dependents. Each task is given a unique priority | |
484 // which is currently the same as the DFS traversal order. | |
485 // | |
486 // Input: Output: | |
487 // | |
488 // root task4 Task | Priority (lower is better) | |
489 // / \ / \ -------+--------------------------- | |
490 // task1 task2 task3 task2 root | 4 | |
491 // | | | | task1 | 2 | |
492 // task3 | task1 | task2 | 3 | |
493 // | | \ / task3 | 1 | |
494 // task4 task4 root task4 | 0 | |
495 // | |
496 // The output can be used to efficiently maintain a queue of | |
497 // "ready to run" tasks. | |
498 | |
499 // static | |
500 unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive( | |
501 internal::WorkerPoolTask* task, | |
502 internal::WorkerPoolTask* dependent, | |
503 unsigned priority, | |
504 ScheduledTaskMap* scheduled_tasks) { | |
505 // Skip sub-tree if task has already completed. | |
506 if (task->HasCompleted()) | |
507 return priority; | |
508 | |
509 ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task); | |
510 if (scheduled_it != scheduled_tasks->end()) { | |
511 DCHECK(dependent); | |
512 scheduled_it->second->dependents().push_back(dependent); | |
513 return priority; | |
514 } | |
515 | |
516 typedef internal::WorkerPoolTask::TaskVector TaskVector; | |
517 for (TaskVector::iterator it = task->dependencies().begin(); | |
518 it != task->dependencies().end(); ++it) { | |
519 internal::WorkerPoolTask* dependency = *it; | |
520 priority = BuildScheduledTaskMapRecursive( | |
521 dependency, task, priority, scheduled_tasks); | |
522 } | |
523 | |
524 scheduled_tasks->set(task, | |
525 make_scoped_ptr(new ScheduledTask(dependent, | |
526 priority))); | |
527 | |
528 return priority + 1; | |
529 } | |
530 | |
531 // static | |
532 void WorkerPool::Inner::BuildScheduledTaskMap( | |
533 internal::WorkerPoolTask* root, | |
534 ScheduledTaskMap* scheduled_tasks) { | |
535 const unsigned kBasePriority = 0u; | |
536 DCHECK(root); | |
537 BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks); | |
538 } | 293 } |
539 | 294 |
540 WorkerPool::WorkerPool(size_t num_threads, | 295 WorkerPool::WorkerPool(size_t num_threads, |
541 base::TimeDelta check_for_completed_tasks_delay, | 296 base::TimeDelta check_for_completed_tasks_delay, |
542 const std::string& thread_name_prefix) | 297 const std::string& thread_name_prefix) |
543 : client_(NULL), | 298 : client_(NULL), |
544 origin_loop_(base::MessageLoopProxy::current()), | 299 origin_loop_(base::MessageLoopProxy::current()), |
| 300 weak_ptr_factory_(this), |
545 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), | 301 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
546 check_for_completed_tasks_pending_(false), | 302 check_for_completed_tasks_pending_(false), |
547 inner_(make_scoped_ptr(new Inner(this, | 303 inner_(make_scoped_ptr(new Inner(this, |
548 num_threads, | 304 num_threads, |
549 thread_name_prefix))) { | 305 thread_name_prefix))) { |
550 } | 306 } |
551 | 307 |
552 WorkerPool::~WorkerPool() { | 308 WorkerPool::~WorkerPool() { |
| 309 // Cancel all pending callbacks. |
| 310 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 311 |
| 312 DCHECK_EQ(0u, completed_tasks_.size()); |
553 } | 313 } |
554 | 314 |
555 void WorkerPool::Shutdown() { | 315 void WorkerPool::Shutdown() { |
556 inner_->Shutdown(); | 316 inner_->Shutdown(); |
557 | 317 inner_->CollectCompletedTasks(); |
558 TaskDeque completed_tasks; | 318 DispatchCompletionCallbacks(); |
559 inner_->CollectCompletedTasks(&completed_tasks); | |
560 DispatchCompletionCallbacks(&completed_tasks); | |
561 } | 319 } |
562 | 320 |
563 void WorkerPool::OnIdle(TaskDeque* completed_tasks) { | 321 void WorkerPool::PostTaskAndReply( |
| 322 const Callback& task, const base::Closure& reply) { |
| 323 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( |
| 324 task, |
| 325 reply)).PassAs<internal::WorkerPoolTask>()); |
| 326 } |
| 327 |
| 328 void WorkerPool::OnIdle() { |
564 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); | 329 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
565 | 330 |
566 DispatchCompletionCallbacks(completed_tasks); | 331 DispatchCompletionCallbacks(); |
567 | |
568 // Cancel any pending check for completed tasks. | |
569 check_for_completed_tasks_callback_.Cancel(); | |
570 check_for_completed_tasks_pending_ = false; | |
571 } | 332 } |
572 | 333 |
573 void WorkerPool::ScheduleCheckForCompletedTasks() { | 334 void WorkerPool::ScheduleCheckForCompletedTasks() { |
574 if (check_for_completed_tasks_pending_) | 335 if (check_for_completed_tasks_pending_) |
575 return; | 336 return; |
576 check_for_completed_tasks_callback_.Reset( | |
577 base::Bind(&WorkerPool::CheckForCompletedTasks, | |
578 base::Unretained(this))); | |
579 origin_loop_->PostDelayedTask( | 337 origin_loop_->PostDelayedTask( |
580 FROM_HERE, | 338 FROM_HERE, |
581 check_for_completed_tasks_callback_.callback(), | 339 base::Bind(&WorkerPool::CheckForCompletedTasks, |
| 340 weak_ptr_factory_.GetWeakPtr()), |
582 check_for_completed_tasks_delay_); | 341 check_for_completed_tasks_delay_); |
583 check_for_completed_tasks_pending_ = true; | 342 check_for_completed_tasks_pending_ = true; |
584 } | 343 } |
585 | 344 |
586 void WorkerPool::CheckForCompletedTasks() { | 345 void WorkerPool::CheckForCompletedTasks() { |
587 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); | 346 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
588 check_for_completed_tasks_callback_.Cancel(); | 347 DCHECK(check_for_completed_tasks_pending_); |
589 check_for_completed_tasks_pending_ = false; | 348 check_for_completed_tasks_pending_ = false; |
590 | 349 |
591 TaskDeque completed_tasks; | |
592 | |
593 // Schedule another check for completed tasks if not idle. | 350 // Schedule another check for completed tasks if not idle. |
594 if (!inner_->CollectCompletedTasks(&completed_tasks)) | 351 if (!inner_->CollectCompletedTasks()) |
595 ScheduleCheckForCompletedTasks(); | 352 ScheduleCheckForCompletedTasks(); |
596 | 353 |
597 DispatchCompletionCallbacks(&completed_tasks); | 354 DispatchCompletionCallbacks(); |
598 } | 355 } |
599 | 356 |
600 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { | 357 void WorkerPool::DispatchCompletionCallbacks() { |
601 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); | 358 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
602 | 359 |
603 // Early out when |completed_tasks| is empty to prevent unnecessary | 360 if (completed_tasks_.empty()) |
604 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). | |
605 if (completed_tasks->empty()) | |
606 return; | 361 return; |
607 | 362 |
608 while (!completed_tasks->empty()) { | 363 while (completed_tasks_.size()) { |
609 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); | 364 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); |
610 completed_tasks->pop_front(); | |
611 task->DidComplete(); | 365 task->DidComplete(); |
612 task->DispatchCompletionCallback(); | |
613 } | 366 } |
614 | 367 |
615 DCHECK(client_); | 368 DCHECK(client_); |
616 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); | 369 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
617 } | 370 } |
618 | 371 |
619 void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) { | 372 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
620 TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); | 373 // Schedule check for completed tasks if not pending. |
| 374 ScheduleCheckForCompletedTasks(); |
621 | 375 |
622 // Schedule check for completed tasks. | 376 inner_->PostTask(task.Pass()); |
623 if (root) | |
624 ScheduleCheckForCompletedTasks(); | |
625 | |
626 inner_->ScheduleTasks(root); | |
627 } | 377 } |
628 | 378 |
629 } // namespace cc | 379 } // namespace cc |
OLD | NEW |