OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "cc/base/worker_pool.h" | 5 #include "cc/base/worker_pool.h" |
6 | 6 |
7 #if defined(OS_ANDROID) | 7 #if defined(OS_ANDROID) |
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
9 #include <sys/resource.h> | 9 #include <sys/resource.h> |
10 #endif | 10 #endif |
11 | 11 |
12 #include <algorithm> | |
13 | |
14 #include "base/bind.h" | 12 #include "base/bind.h" |
15 #include "base/debug/trace_event.h" | 13 #include "base/debug/trace_event.h" |
| 14 #include "base/hash_tables.h" |
16 #include "base/stringprintf.h" | 15 #include "base/stringprintf.h" |
17 #include "base/synchronization/condition_variable.h" | 16 #include "base/synchronization/condition_variable.h" |
18 #include "base/threading/simple_thread.h" | 17 #include "base/threading/simple_thread.h" |
| 18 #include "cc/base/scoped_ptr_deque.h" |
| 19 |
| 20 #if defined(COMPILER_GCC) |
| 21 namespace BASE_HASH_NAMESPACE { |
| 22 template <> struct hash<cc::internal::WorkerPoolTask*> { |
| 23 size_t operator()(cc::internal::WorkerPoolTask* ptr) const { |
| 24 return hash<size_t>()(reinterpret_cast<size_t>(ptr)); |
| 25 } |
| 26 }; |
| 27 } // namespace BASE_HASH_NAMESPACE |
| 28 #endif // COMPILER |
19 | 29 |
20 namespace cc { | 30 namespace cc { |
21 | 31 |
22 namespace { | |
23 | |
24 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { | |
25 public: | |
26 WorkerPoolTaskImpl(const WorkerPool::Callback& task, | |
27 const base::Closure& reply) | |
28 : internal::WorkerPoolTask(reply), | |
29 task_(task) {} | |
30 | |
31 virtual void RunOnThread(unsigned thread_index) OVERRIDE { | |
32 task_.Run(); | |
33 } | |
34 | |
35 private: | |
36 WorkerPool::Callback task_; | |
37 }; | |
38 | |
39 } // namespace | |
40 | |
41 namespace internal { | 32 namespace internal { |
42 | 33 |
43 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { | 34 WorkerPoolTask::WorkerPoolTask() : did_schedule_(false), |
| 35 did_run_(false), |
| 36 did_complete_(false) { |
44 } | 37 } |
45 | 38 |
46 WorkerPoolTask::~WorkerPoolTask() { | 39 WorkerPoolTask::~WorkerPoolTask() { |
| 40 DCHECK_EQ(did_schedule_, did_complete_); |
| 41 DCHECK(!did_run_ || did_schedule_); |
| 42 DCHECK(!did_run_ || did_complete_); |
| 43 } |
| 44 |
| 45 void WorkerPoolTask::DidSchedule() { |
| 46 DCHECK(!did_complete_); |
| 47 did_schedule_ = true; |
| 48 } |
| 49 |
| 50 void WorkerPoolTask::WillRun() { |
| 51 DCHECK(did_schedule_); |
| 52 DCHECK(!did_complete_); |
| 53 DCHECK(!did_run_); |
| 54 } |
| 55 |
| 56 void WorkerPoolTask::DidRun() { |
| 57 did_run_ = true; |
47 } | 58 } |
48 | 59 |
49 void WorkerPoolTask::DidComplete() { | 60 void WorkerPoolTask::DidComplete() { |
50 reply_.Run(); | 61 DCHECK(did_schedule_); |
| 62 DCHECK(!did_complete_); |
| 63 did_complete_ = true; |
| 64 } |
| 65 |
| 66 bool WorkerPoolTask::HasFinished() const { |
| 67 return did_run_; |
| 68 } |
| 69 |
| 70 WorkerPoolTaskDependency::Iterator::Iterator( |
| 71 const WorkerPoolTaskDependency* root) |
| 72 : current_(root), |
| 73 root_(root) { |
| 74 ++(*this); |
| 75 } |
| 76 |
| 77 WorkerPoolTaskDependency::Iterator::~Iterator() { |
| 78 } |
| 79 |
| 80 WorkerPoolTaskDependency::TaskIterator::TaskIterator( |
| 81 const WorkerPoolTaskDependency* parent) |
| 82 : current_(parent->first_child_.get()) { |
| 83 } |
| 84 |
| 85 WorkerPoolTaskDependency::TaskIterator::~TaskIterator() { |
| 86 } |
| 87 |
| 88 WorkerPoolTaskDependency::WorkerPoolTaskDependency() |
| 89 : parent_(NULL), |
| 90 last_child_(NULL) { |
| 91 } |
| 92 |
| 93 WorkerPoolTaskDependency::~WorkerPoolTaskDependency() { |
| 94 } |
| 95 |
| 96 // static |
| 97 void WorkerPoolTaskDependency::Create( |
| 98 WorkerPoolTask* task, |
| 99 WorkerPoolTaskDependency* parent, |
| 100 WorkerPoolTaskDependency* dependencies) { |
| 101 scoped_ptr<WorkerPoolTaskDependency> dependency( |
| 102 new WorkerPoolTaskDependency); |
| 103 |
| 104 if (dependencies) { |
| 105 DCHECK(!dependencies->task()); |
| 106 dependency->Swap(dependencies); |
| 107 } |
| 108 dependency->task_ = task; |
| 109 dependency->parent_ = parent; |
| 110 |
| 111 if (parent->last_child_) { |
| 112 parent->last_child_->next_sibling_ = dependency.Pass(); |
| 113 // |parent_| should only be set for |last_child_| |
| 114 parent->last_child_->parent_ = NULL; |
| 115 parent->last_child_ = parent->last_child_->next_sibling_.get(); |
| 116 } else { |
| 117 parent->first_child_ = dependency.Pass(); |
| 118 parent->last_child_ = parent->first_child_.get(); |
| 119 } |
| 120 } |
| 121 |
| 122 void WorkerPoolTaskDependency::Swap(WorkerPoolTaskDependency* other) { |
| 123 DCHECK(other); |
| 124 |
| 125 first_child_.swap(other->first_child_); |
| 126 next_sibling_.swap(other->next_sibling_); |
| 127 task_.swap(other->task_); |
| 128 |
| 129 // |last_child_| has a pointer to the parent. Make sure this pointer |
| 130 // is properly swapped. |
| 131 WorkerPoolTaskDependency* other_last_child_ = other->last_child_; |
| 132 if (other_last_child_) |
| 133 other_last_child_->parent_ = this; |
| 134 if (last_child_) |
| 135 last_child_->parent_ = other; |
| 136 |
| 137 // Swap |last_child_| pointers. |
| 138 other->last_child_ = last_child_; |
| 139 last_child_ = other_last_child_; |
51 } | 140 } |
52 | 141 |
53 } // namespace internal | 142 } // namespace internal |
54 | 143 |
55 // Internal to the worker pool. Any data or logic that needs to be | 144 // Internal to the worker pool. Any data or logic that needs to be |
56 // shared between threads lives in this class. All members are guarded | 145 // shared between threads lives in this class. All members are guarded |
57 // by |lock_|. | 146 // by |lock_|. |
58 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { | 147 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
59 public: | 148 public: |
60 Inner(WorkerPool* worker_pool, | 149 Inner(WorkerPool* worker_pool, |
61 size_t num_threads, | 150 size_t num_threads, |
62 const std::string& thread_name_prefix); | 151 const std::string& thread_name_prefix); |
63 virtual ~Inner(); | 152 virtual ~Inner(); |
64 | 153 |
65 void Shutdown(); | 154 void Shutdown(); |
66 | 155 |
67 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); | 156 // Schedule running of tasks in |task_graph|. All tasks previously |
| 157 // scheduled but not present in |task_graph| will be canceled unless |
| 158 // already running. Canceled tasks are moved to |completed_tasks_| |
| 159 // without being run. The result is that once scheduled, a task is |
| 160 // guaranteed to end up in the |completed_tasks_| queue even if they |
| 161 // later get canceled by another call to ScheduleTasks(). |
| 162 void ScheduleTasks(internal::WorkerPoolTaskGraph* task_graph); |
68 | 163 |
69 // Appends all completed tasks to worker pool's completed tasks queue | 164 // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
70 // and returns true if idle. | 165 bool CollectCompletedTasks(TaskDeque* completed_tasks); |
71 bool CollectCompletedTasks(); | |
72 | 166 |
73 private: | 167 private: |
74 // Appends all completed tasks to |completed_tasks|. Lock must | 168 // Find a task that is ready to run by traversing the dependency graph. |
75 // already be acquired before calling this function. | 169 // Lock must be acquired before calling this function. |
76 bool AppendCompletedTasksWithLockAcquired( | 170 internal::WorkerPoolTask* FindTaskThatIsReadyToRunWithLockAcquired(); |
77 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); | 171 |
| 172 // Collect all completed tasks by swapping the contents of |
| 173 // |completed_tasks| and |completed_tasks_|. Lock must be acquired |
| 174 // before calling this function. Returns true if idle. |
| 175 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); |
78 | 176 |
79 // Schedule an OnIdleOnOriginThread callback if not already pending. | 177 // Schedule an OnIdleOnOriginThread callback if not already pending. |
80 // Lock must already be acquired before calling this function. | 178 // Lock must already be acquired before calling this function. |
81 void ScheduleOnIdleWithLockAcquired(); | 179 void ScheduleOnIdleWithLockAcquired(); |
82 void OnIdleOnOriginThread(); | 180 void OnIdleOnOriginThread(); |
83 | 181 |
84 // Overridden from base::DelegateSimpleThread: | 182 // Overridden from base::DelegateSimpleThread: |
85 virtual void Run() OVERRIDE; | 183 virtual void Run() OVERRIDE; |
86 | 184 |
87 // Pointer to worker pool. Can only be used on origin thread. | 185 // Pointer to worker pool. Can only be used on origin thread. |
(...skipping 15 matching lines...) Expand all Loading... |
103 base::WeakPtrFactory<Inner> weak_ptr_factory_; | 201 base::WeakPtrFactory<Inner> weak_ptr_factory_; |
104 | 202 |
105 const base::Closure on_idle_callback_; | 203 const base::Closure on_idle_callback_; |
106 // Set when a OnIdleOnOriginThread() callback is pending. | 204 // Set when a OnIdleOnOriginThread() callback is pending. |
107 bool on_idle_pending_; | 205 bool on_idle_pending_; |
108 | 206 |
109 // Provides each running thread loop with a unique index. First thread | 207 // Provides each running thread loop with a unique index. First thread |
110 // loop index is 0. | 208 // loop index is 0. |
111 unsigned next_thread_index_; | 209 unsigned next_thread_index_; |
112 | 210 |
113 // Number of tasks currently running. | |
114 unsigned running_task_count_; | |
115 | |
116 // Set during shutdown. Tells workers to exit when no more tasks | 211 // Set during shutdown. Tells workers to exit when no more tasks |
117 // are pending. | 212 // are pending. |
118 bool shutdown_; | 213 bool shutdown_; |
119 | 214 |
120 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; | 215 // The dependency graph. Describes task dependencies and task priority. |
121 TaskDeque pending_tasks_; | 216 internal::WorkerPoolTaskGraph task_graph_; |
| 217 |
| 218 // This set contains all pending tasks. |
| 219 typedef base::hash_set<internal::WorkerPoolTask*> TaskSet; |
| 220 TaskSet pending_tasks_; |
| 221 |
| 222 // This set contains all currently running tasks. |
| 223 TaskSet running_tasks_; |
| 224 |
| 225 // Completed tasks not yet collected by origin thread. |
122 TaskDeque completed_tasks_; | 226 TaskDeque completed_tasks_; |
123 | 227 |
124 ScopedPtrDeque<base::DelegateSimpleThread> workers_; | 228 ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
125 | 229 |
126 DISALLOW_COPY_AND_ASSIGN(Inner); | 230 DISALLOW_COPY_AND_ASSIGN(Inner); |
127 }; | 231 }; |
128 | 232 |
129 WorkerPool::Inner::Inner(WorkerPool* worker_pool, | 233 WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
130 size_t num_threads, | 234 size_t num_threads, |
131 const std::string& thread_name_prefix) | 235 const std::string& thread_name_prefix) |
132 : worker_pool_on_origin_thread_(worker_pool), | 236 : worker_pool_on_origin_thread_(worker_pool), |
133 lock_(), | 237 lock_(), |
134 has_pending_tasks_cv_(&lock_), | 238 has_pending_tasks_cv_(&lock_), |
135 origin_loop_(base::MessageLoopProxy::current()), | 239 origin_loop_(base::MessageLoopProxy::current()), |
136 weak_ptr_factory_(this), | 240 weak_ptr_factory_(this), |
137 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, | 241 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
138 weak_ptr_factory_.GetWeakPtr())), | 242 weak_ptr_factory_.GetWeakPtr())), |
139 on_idle_pending_(false), | 243 on_idle_pending_(false), |
140 next_thread_index_(0), | 244 next_thread_index_(0), |
141 running_task_count_(0), | |
142 shutdown_(false) { | 245 shutdown_(false) { |
143 base::AutoLock lock(lock_); | 246 base::AutoLock lock(lock_); |
144 | 247 |
145 while (workers_.size() < num_threads) { | 248 while (workers_.size() < num_threads) { |
146 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | 249 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
147 new base::DelegateSimpleThread( | 250 new base::DelegateSimpleThread( |
148 this, | 251 this, |
149 thread_name_prefix + | 252 thread_name_prefix + |
150 base::StringPrintf( | 253 base::StringPrintf( |
151 "Worker%u", | 254 "Worker%u", |
152 static_cast<unsigned>(workers_.size() + 1)).c_str())); | 255 static_cast<unsigned>(workers_.size() + 1)).c_str())); |
153 worker->Start(); | 256 worker->Start(); |
154 workers_.push_back(worker.Pass()); | 257 workers_.push_back(worker.Pass()); |
155 } | 258 } |
156 } | 259 } |
157 | 260 |
158 WorkerPool::Inner::~Inner() { | 261 WorkerPool::Inner::~Inner() { |
159 base::AutoLock lock(lock_); | 262 base::AutoLock lock(lock_); |
160 | 263 |
161 DCHECK(shutdown_); | 264 DCHECK(shutdown_); |
162 | 265 |
163 // Cancel all pending callbacks. | |
164 weak_ptr_factory_.InvalidateWeakPtrs(); | |
165 | |
166 DCHECK_EQ(0u, pending_tasks_.size()); | 266 DCHECK_EQ(0u, pending_tasks_.size()); |
| 267 DCHECK_EQ(0u, running_tasks_.size()); |
167 DCHECK_EQ(0u, completed_tasks_.size()); | 268 DCHECK_EQ(0u, completed_tasks_.size()); |
168 DCHECK_EQ(0u, running_task_count_); | |
169 } | 269 } |
170 | 270 |
171 void WorkerPool::Inner::Shutdown() { | 271 void WorkerPool::Inner::Shutdown() { |
172 { | 272 { |
173 base::AutoLock lock(lock_); | 273 base::AutoLock lock(lock_); |
174 | 274 |
175 DCHECK(!shutdown_); | 275 DCHECK(!shutdown_); |
176 shutdown_ = true; | 276 shutdown_ = true; |
177 | 277 |
178 // Wake up a worker so it knows it should exit. This will cause all workers | 278 // Wake up a worker so it knows it should exit. This will cause all workers |
179 // to exit as each will wake up another worker before exiting. | 279 // to exit as each will wake up another worker before exiting. |
180 has_pending_tasks_cv_.Signal(); | 280 has_pending_tasks_cv_.Signal(); |
181 } | 281 } |
182 | 282 |
183 while (workers_.size()) { | 283 while (workers_.size()) { |
184 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | 284 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
185 worker->Join(); | 285 worker->Join(); |
186 } | 286 } |
| 287 |
| 288 // Cancel any pending OnIdle callback. |
| 289 weak_ptr_factory_.InvalidateWeakPtrs(); |
187 } | 290 } |
188 | 291 |
189 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 292 void WorkerPool::Inner::ScheduleTasks( |
| 293 internal::WorkerPoolTaskGraph* task_graph) { |
| 294 // Move all dependencies to |new_graph|. |
| 295 internal::WorkerPoolTaskGraph new_graph; |
| 296 new_graph.Swap(task_graph); |
| 297 |
| 298 TaskSet tasks; |
| 299 // Traverse task graph to build new pending task set. |
| 300 for (internal::WorkerPoolTaskGraph::Iterator it(&new_graph); it; ++it) { |
| 301 internal::WorkerPoolTask* task = it->task(); |
| 302 task->DidSchedule(); |
| 303 tasks.insert(task); |
| 304 } |
| 305 |
| 306 { |
| 307 base::AutoLock lock(lock_); |
| 308 |
| 309 // Move tasks not present in |new_graph| to |completed_tasks_|. |
| 310 for (TaskSet::iterator it = pending_tasks_.begin(); |
| 311 it != pending_tasks_.end(); ++it) { |
| 312 internal::WorkerPoolTask* task = *it; |
| 313 |
| 314 // Task has completed if not present in |new_graph|. |
| 315 if (tasks.find(task) == tasks.end()) |
| 316 completed_tasks_.push_back(task); |
| 317 } |
| 318 |
| 319 // Remove all running tasks from new pending task set. |
| 320 for (TaskSet::iterator it = running_tasks_.begin(); |
| 321 it != running_tasks_.end(); ++it) { |
| 322 internal::WorkerPoolTask* task = *it; |
| 323 |
| 324 if (tasks.find(task) != tasks.end()) |
| 325 tasks.erase(task); |
| 326 } |
| 327 |
| 328 // And remove all completed tasks from new pending task set. |
| 329 for (TaskDeque::iterator it = completed_tasks_.begin(); |
| 330 it != completed_tasks_.end(); ++it) { |
| 331 internal::WorkerPoolTask* task = *it; |
| 332 |
| 333 if (tasks.find(task) != tasks.end()) |
| 334 tasks.erase(task); |
| 335 } |
| 336 |
| 337 // Swap task graphs. |
| 338 // Note: old tasks are intentionally destroyed after releasing |lock_|. |
| 339 task_graph_.Swap(&new_graph); |
| 340 |
| 341 // Swap pending task sets. |
| 342 pending_tasks_.swap(tasks); |
| 343 |
| 344 // There is more work available, so wake up worker thread. |
| 345 has_pending_tasks_cv_.Signal(); |
| 346 } |
| 347 } |
| 348 |
| 349 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { |
190 base::AutoLock lock(lock_); | 350 base::AutoLock lock(lock_); |
191 | 351 |
192 pending_tasks_.push_back(task.Pass()); | 352 return CollectCompletedTasksWithLockAcquired(completed_tasks); |
193 | |
194 // There is more work available, so wake up worker thread. | |
195 has_pending_tasks_cv_.Signal(); | |
196 } | 353 } |
197 | 354 |
198 bool WorkerPool::Inner::CollectCompletedTasks() { | 355 internal::WorkerPoolTask* |
199 base::AutoLock lock(lock_); | 356 WorkerPool::Inner::FindTaskThatIsReadyToRunWithLockAcquired() { |
| 357 lock_.AssertAcquired(); |
200 | 358 |
201 return AppendCompletedTasksWithLockAcquired( | 359 for (internal::WorkerPoolTaskGraph::Iterator it(&task_graph_); it; ++it) { |
202 &worker_pool_on_origin_thread_->completed_tasks_); | 360 internal::WorkerPoolTask* task = it->task(); |
| 361 |
| 362 // Skip task if not present in |pending_tasks_|. These tasks are |
| 363 // either already running or have finished. |
| 364 // Note: Skip traversal of sub-tree if this function becomes too slow. |
| 365 if (pending_tasks_.find(task) == pending_tasks_.end()) |
| 366 continue; |
| 367 |
| 368 // Task is ready to run if all direct children have finished running. |
| 369 bool all_dependencies_are_satisfied = true; |
| 370 |
| 371 for (internal::WorkerPoolTaskGraph::TaskIterator task_it(*it); |
| 372 task_it && all_dependencies_are_satisfied; |
| 373 ++task_it) { |
| 374 all_dependencies_are_satisfied &= task_it->HasFinished(); |
| 375 } |
| 376 |
| 377 if (all_dependencies_are_satisfied) |
| 378 return task; |
| 379 } |
| 380 |
| 381 return NULL; |
203 } | 382 } |
204 | 383 |
205 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( | 384 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
206 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { | 385 TaskDeque* completed_tasks) { |
207 lock_.AssertAcquired(); | 386 lock_.AssertAcquired(); |
208 | 387 |
209 while (completed_tasks_.size()) | 388 DCHECK_EQ(0u, completed_tasks->size()); |
210 completed_tasks->push_back(completed_tasks_.take_front().Pass()); | 389 completed_tasks->swap(completed_tasks_); |
211 | 390 |
212 return !running_task_count_ && pending_tasks_.empty(); | 391 return running_tasks_.empty() && pending_tasks_.empty(); |
213 } | 392 } |
214 | 393 |
215 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { | 394 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
216 lock_.AssertAcquired(); | 395 lock_.AssertAcquired(); |
217 | 396 |
218 if (on_idle_pending_) | 397 if (on_idle_pending_) |
219 return; | 398 return; |
220 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); | 399 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); |
221 on_idle_pending_ = true; | 400 on_idle_pending_ = true; |
222 } | 401 } |
223 | 402 |
224 void WorkerPool::Inner::OnIdleOnOriginThread() { | 403 void WorkerPool::Inner::OnIdleOnOriginThread() { |
| 404 TaskDeque completed_tasks; |
| 405 |
225 { | 406 { |
226 base::AutoLock lock(lock_); | 407 base::AutoLock lock(lock_); |
227 | 408 |
228 DCHECK(on_idle_pending_); | 409 DCHECK(on_idle_pending_); |
229 on_idle_pending_ = false; | 410 on_idle_pending_ = false; |
230 | 411 |
231 // Early out if no longer idle. | 412 // Early out if no longer idle. |
232 if (running_task_count_ || !pending_tasks_.empty()) | 413 if (!running_tasks_.empty() || !pending_tasks_.empty()) |
233 return; | 414 return; |
234 | 415 |
235 AppendCompletedTasksWithLockAcquired( | 416 CollectCompletedTasksWithLockAcquired(&completed_tasks); |
236 &worker_pool_on_origin_thread_->completed_tasks_); | |
237 } | 417 } |
238 | 418 |
239 worker_pool_on_origin_thread_->OnIdle(); | 419 worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
240 } | 420 } |
241 | 421 |
242 void WorkerPool::Inner::Run() { | 422 void WorkerPool::Inner::Run() { |
243 #if defined(OS_ANDROID) | 423 #if defined(OS_ANDROID) |
244 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 424 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
245 int nice_value = 10; // Idle priority. | 425 int nice_value = 10; // Idle priority. |
246 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); | 426 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); |
247 #endif | 427 #endif |
248 | 428 |
249 base::AutoLock lock(lock_); | 429 base::AutoLock lock(lock_); |
250 | 430 |
251 // Get a unique thread index. | 431 // Get a unique thread index. |
252 int thread_index = next_thread_index_++; | 432 int thread_index = next_thread_index_++; |
253 | 433 |
254 while (true) { | 434 while (true) { |
255 if (pending_tasks_.empty()) { | 435 scoped_refptr<internal::WorkerPoolTask> task( |
256 // Exit when shutdown is set and no more tasks are pending. | 436 FindTaskThatIsReadyToRunWithLockAcquired()); |
257 if (shutdown_) | 437 if (!task) { |
258 break; | 438 if (pending_tasks_.empty()) { |
| 439 // Exit when shutdown is set and no more tasks are pending. |
| 440 if (shutdown_) |
| 441 break; |
259 | 442 |
260 // Schedule an idle callback if requested and not pending. | 443 // Schedule an idle callback if no tasks are running. |
261 if (!running_task_count_) | 444 if (running_tasks_.empty()) |
262 ScheduleOnIdleWithLockAcquired(); | 445 ScheduleOnIdleWithLockAcquired(); |
| 446 } |
263 | 447 |
264 // Wait for new pending tasks. | 448 // Wait for more tasks. |
265 has_pending_tasks_cv_.Wait(); | 449 has_pending_tasks_cv_.Wait(); |
266 continue; | 450 continue; |
267 } | 451 } |
268 | 452 |
269 // Get next task. | 453 // First remove task from |pending_tasks_|. |
270 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); | 454 DCHECK(pending_tasks_.find(task) != pending_tasks_.end()); |
| 455 pending_tasks_.erase(task); |
271 | 456 |
272 // Increment |running_task_count_| before starting to run task. | 457 // Insert task in |running_tasks_| before starting to run it. |
273 running_task_count_++; | 458 DCHECK(running_tasks_.find(task) == running_tasks_.end()); |
| 459 running_tasks_.insert(task); |
274 | 460 |
275 // There may be more work available, so wake up another | 461 // There may be more work available, so wake up another worker thread. |
276 // worker thread. | |
277 has_pending_tasks_cv_.Signal(); | 462 has_pending_tasks_cv_.Signal(); |
278 | 463 |
| 464 // Call WillRun() before releasing |lock_| and running task. |
| 465 task->WillRun(); |
| 466 |
279 { | 467 { |
280 base::AutoUnlock unlock(lock_); | 468 base::AutoUnlock unlock(lock_); |
281 | 469 |
282 task->RunOnThread(thread_index); | 470 task->RunOnThread(thread_index); |
283 } | 471 } |
284 | 472 |
285 completed_tasks_.push_back(task.Pass()); | 473 // This will mark task as finished. |
| 474 task->DidRun(); |
286 | 475 |
287 // Decrement |running_task_count_| now that we are done running task. | 476 // Remove task from |running_tasks_| now that we are done running it. |
288 running_task_count_--; | 477 DCHECK(running_tasks_.find(task) != running_tasks_.end()); |
| 478 running_tasks_.erase(task); |
| 479 |
| 480 // Finally add task to |completed_tasks_|. |
| 481 completed_tasks_.push_back(task); |
289 } | 482 } |
290 | 483 |
291 // We noticed we should exit. Wake up the next worker so it knows it should | 484 // We noticed we should exit. Wake up the next worker so it knows it should |
292 // exit as well (because the Shutdown() code only signals once). | 485 // exit as well (because the Shutdown() code only signals once). |
293 has_pending_tasks_cv_.Signal(); | 486 has_pending_tasks_cv_.Signal(); |
294 } | 487 } |
295 | 488 |
296 WorkerPool::WorkerPool(size_t num_threads, | 489 WorkerPool::WorkerPool(size_t num_threads, |
297 base::TimeDelta check_for_completed_tasks_delay, | 490 base::TimeDelta check_for_completed_tasks_delay, |
298 const std::string& thread_name_prefix) | 491 const std::string& thread_name_prefix) |
299 : client_(NULL), | 492 : client_(NULL), |
300 origin_loop_(base::MessageLoopProxy::current()), | 493 origin_loop_(base::MessageLoopProxy::current()), |
301 weak_ptr_factory_(this), | |
302 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), | 494 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
303 check_for_completed_tasks_pending_(false), | 495 check_for_completed_tasks_pending_(false), |
304 inner_(make_scoped_ptr(new Inner(this, | 496 inner_(make_scoped_ptr(new Inner(this, |
305 num_threads, | 497 num_threads, |
306 thread_name_prefix))) { | 498 thread_name_prefix))) { |
307 } | 499 } |
308 | 500 |
309 WorkerPool::~WorkerPool() { | 501 WorkerPool::~WorkerPool() { |
310 // Cancel all pending callbacks. | |
311 weak_ptr_factory_.InvalidateWeakPtrs(); | |
312 | |
313 DCHECK_EQ(0u, completed_tasks_.size()); | |
314 } | 502 } |
315 | 503 |
316 void WorkerPool::Shutdown() { | 504 void WorkerPool::Shutdown() { |
317 inner_->Shutdown(); | 505 inner_->Shutdown(); |
318 inner_->CollectCompletedTasks(); | 506 |
319 DispatchCompletionCallbacks(); | 507 TaskDeque completed_tasks; |
| 508 inner_->CollectCompletedTasks(&completed_tasks); |
| 509 DispatchCompletionCallbacks(&completed_tasks); |
320 } | 510 } |
321 | 511 |
322 void WorkerPool::PostTaskAndReply( | 512 void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
323 const Callback& task, const base::Closure& reply) { | |
324 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( | |
325 task, | |
326 reply)).PassAs<internal::WorkerPoolTask>()); | |
327 } | |
328 | |
329 void WorkerPool::OnIdle() { | |
330 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); | 513 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
331 | 514 |
332 DispatchCompletionCallbacks(); | 515 DispatchCompletionCallbacks(completed_tasks); |
| 516 |
| 517 // Cancel any pending check for completed tasks. |
| 518 check_for_completed_tasks_callback_.Cancel(); |
| 519 check_for_completed_tasks_pending_ = false; |
333 } | 520 } |
334 | 521 |
335 void WorkerPool::ScheduleCheckForCompletedTasks() { | 522 void WorkerPool::ScheduleCheckForCompletedTasks() { |
336 if (check_for_completed_tasks_pending_) | 523 if (check_for_completed_tasks_pending_) |
337 return; | 524 return; |
| 525 check_for_completed_tasks_callback_.Reset( |
| 526 base::Bind(&WorkerPool::CheckForCompletedTasks, |
| 527 base::Unretained(this))); |
338 origin_loop_->PostDelayedTask( | 528 origin_loop_->PostDelayedTask( |
339 FROM_HERE, | 529 FROM_HERE, |
340 base::Bind(&WorkerPool::CheckForCompletedTasks, | 530 check_for_completed_tasks_callback_.callback(), |
341 weak_ptr_factory_.GetWeakPtr()), | |
342 check_for_completed_tasks_delay_); | 531 check_for_completed_tasks_delay_); |
343 check_for_completed_tasks_pending_ = true; | 532 check_for_completed_tasks_pending_ = true; |
344 } | 533 } |
345 | 534 |
346 void WorkerPool::CheckForCompletedTasks() { | 535 void WorkerPool::CheckForCompletedTasks() { |
347 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); | 536 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
348 DCHECK(check_for_completed_tasks_pending_); | 537 DCHECK(check_for_completed_tasks_pending_); |
349 check_for_completed_tasks_pending_ = false; | 538 check_for_completed_tasks_pending_ = false; |
350 | 539 |
| 540 TaskDeque completed_tasks; |
| 541 |
351 // Schedule another check for completed tasks if not idle. | 542 // Schedule another check for completed tasks if not idle. |
352 if (!inner_->CollectCompletedTasks()) | 543 if (!inner_->CollectCompletedTasks(&completed_tasks)) |
353 ScheduleCheckForCompletedTasks(); | 544 ScheduleCheckForCompletedTasks(); |
354 | 545 |
355 DispatchCompletionCallbacks(); | 546 DispatchCompletionCallbacks(&completed_tasks); |
356 } | 547 } |
357 | 548 |
358 void WorkerPool::DispatchCompletionCallbacks() { | 549 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
359 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); | 550 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
360 | 551 |
361 if (completed_tasks_.empty()) | 552 // Early out when |completed_tasks| is empty to prevent unnecessary |
| 553 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). |
| 554 if (completed_tasks->empty()) |
362 return; | 555 return; |
363 | 556 |
364 while (completed_tasks_.size()) { | 557 while (!completed_tasks->empty()) { |
365 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); | 558 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
| 559 completed_tasks->pop_front(); |
366 task->DidComplete(); | 560 task->DidComplete(); |
| 561 task->DispatchCompletionCallback(); |
367 } | 562 } |
368 | 563 |
369 DCHECK(client_); | 564 DCHECK(client_); |
370 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); | 565 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
371 } | 566 } |
372 | 567 |
373 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 568 void WorkerPool::ScheduleTasks(internal::WorkerPoolTaskGraph* task_graph) { |
374 // Schedule check for completed tasks if not pending. | 569 TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); |
375 ScheduleCheckForCompletedTasks(); | |
376 | 570 |
377 inner_->PostTask(task.Pass()); | 571 // Schedule check for completed tasks if graph is non-empty. |
| 572 if (internal::WorkerPoolTaskDependency::Iterator(task_graph)) |
| 573 ScheduleCheckForCompletedTasks(); |
| 574 |
| 575 inner_->ScheduleTasks(task_graph); |
378 } | 576 } |
379 | 577 |
380 } // namespace cc | 578 } // namespace cc |
OLD | NEW |