Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(393)

Side by Side Diff: cc/base/worker_pool.cc

Issue 14689004: Re-land: cc: Cancel and re-prioritize worker pool tasks. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Added support for dependencies and tests. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "cc/base/worker_pool.h" 5 #include "cc/base/worker_pool.h"
6 6
7 #if defined(OS_ANDROID) 7 #if defined(OS_ANDROID)
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
9 #include <sys/resource.h> 9 #include <sys/resource.h>
10 #endif 10 #endif
11 11
12 #include <algorithm>
13
14 #include "base/bind.h" 12 #include "base/bind.h"
15 #include "base/debug/trace_event.h" 13 #include "base/debug/trace_event.h"
14 #include "base/hash_tables.h"
16 #include "base/stringprintf.h" 15 #include "base/stringprintf.h"
17 #include "base/synchronization/condition_variable.h" 16 #include "base/synchronization/condition_variable.h"
18 #include "base/threading/simple_thread.h" 17 #include "base/threading/simple_thread.h"
18 #include "cc/base/scoped_ptr_deque.h"
19
20 #if defined(COMPILER_GCC)
21 namespace BASE_HASH_NAMESPACE {
22 template <> struct hash<cc::internal::WorkerPoolTask*> {
23 size_t operator()(cc::internal::WorkerPoolTask* ptr) const {
24 return hash<size_t>()(reinterpret_cast<size_t>(ptr));
25 }
26 };
27 } // namespace BASE_HASH_NAMESPACE
28 #endif // COMPILER
19 29
20 namespace cc { 30 namespace cc {
21 31
22 namespace {
23
24 class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
25 public:
26 WorkerPoolTaskImpl(const WorkerPool::Callback& task,
27 const base::Closure& reply)
28 : internal::WorkerPoolTask(reply),
29 task_(task) {}
30
31 virtual void RunOnThread(unsigned thread_index) OVERRIDE {
32 task_.Run();
33 }
34
35 private:
36 WorkerPool::Callback task_;
37 };
38
39 } // namespace
40
41 namespace internal { 32 namespace internal {
42 33
43 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { 34 WorkerPoolTask::WorkerPoolTask() : did_schedule_(false),
35 did_run_(false),
36 did_complete_(false) {
44 } 37 }
45 38
46 WorkerPoolTask::~WorkerPoolTask() { 39 WorkerPoolTask::~WorkerPoolTask() {
40 DCHECK_EQ(did_schedule_, did_complete_);
41 DCHECK(!did_run_ || did_schedule_);
42 DCHECK(!did_run_ || did_complete_);
43 }
44
45 void WorkerPoolTask::DidSchedule() {
46 DCHECK(!did_complete_);
47 did_schedule_ = true;
48 }
49
50 void WorkerPoolTask::WillRun() {
51 DCHECK(did_schedule_);
52 DCHECK(!did_complete_);
53 DCHECK(!did_run_);
54 }
55
56 void WorkerPoolTask::DidRun() {
57 did_run_ = true;
47 } 58 }
48 59
49 void WorkerPoolTask::DidComplete() { 60 void WorkerPoolTask::DidComplete() {
50 reply_.Run(); 61 DCHECK(did_schedule_);
62 DCHECK(!did_complete_);
63 did_complete_ = true;
64 }
65
66 bool WorkerPoolTask::HasFinished() const {
67 return did_run_;
68 }
69
70 WorkerPoolTaskDependency::Iterator::Iterator(
71 const WorkerPoolTaskDependency* root)
72 : current_(root),
73 root_(root) {
74 ++(*this);
75 }
76
77 WorkerPoolTaskDependency::Iterator::~Iterator() {
78 }
79
80 WorkerPoolTaskDependency::TaskIterator::TaskIterator(
81 const WorkerPoolTaskDependency* parent)
82 : current_(parent->first_child_.get()) {
83 }
84
85 WorkerPoolTaskDependency::TaskIterator::~TaskIterator() {
86 }
87
88 WorkerPoolTaskDependency::WorkerPoolTaskDependency()
89 : parent_(NULL),
90 last_child_(NULL) {
91 }
92
93 WorkerPoolTaskDependency::~WorkerPoolTaskDependency() {
94 }
95
96 // static
97 void WorkerPoolTaskDependency::Create(
98 WorkerPoolTask* task,
99 WorkerPoolTaskDependency* parent,
100 WorkerPoolTaskDependency* dependencies) {
101 scoped_ptr<WorkerPoolTaskDependency> dependency(
102 new WorkerPoolTaskDependency);
103
104 if (dependencies) {
105 DCHECK(!dependencies->task());
106 dependency->Swap(dependencies);
107 }
108 dependency->task_ = task;
109 dependency->parent_ = parent;
110
111 if (parent->last_child_) {
112 parent->last_child_->next_sibling_ = dependency.Pass();
113 // |parent_| should only be set for |last_child_|
114 parent->last_child_->parent_ = NULL;
115 parent->last_child_ = parent->last_child_->next_sibling_.get();
116 } else {
117 parent->first_child_ = dependency.Pass();
118 parent->last_child_ = parent->first_child_.get();
119 }
120 }
121
122 void WorkerPoolTaskDependency::Swap(WorkerPoolTaskDependency* other) {
123 DCHECK(other);
124
125 first_child_.swap(other->first_child_);
126 next_sibling_.swap(other->next_sibling_);
127 task_.swap(other->task_);
128
129 // |last_child_| has a pointer to the parent. Make sure this pointer
130 // is properly swapped.
131 WorkerPoolTaskDependency* other_last_child_ = other->last_child_;
132 if (other_last_child_)
133 other_last_child_->parent_ = this;
134 if (last_child_)
135 last_child_->parent_ = other;
136
137 // Swap |last_child_| pointers.
138 other->last_child_ = last_child_;
139 last_child_ = other_last_child_;
51 } 140 }
52 141
53 } // namespace internal 142 } // namespace internal
54 143
55 // Internal to the worker pool. Any data or logic that needs to be 144 // Internal to the worker pool. Any data or logic that needs to be
56 // shared between threads lives in this class. All members are guarded 145 // shared between threads lives in this class. All members are guarded
57 // by |lock_|. 146 // by |lock_|.
58 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { 147 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
59 public: 148 public:
60 Inner(WorkerPool* worker_pool, 149 Inner(WorkerPool* worker_pool,
61 size_t num_threads, 150 size_t num_threads,
62 const std::string& thread_name_prefix); 151 const std::string& thread_name_prefix);
63 virtual ~Inner(); 152 virtual ~Inner();
64 153
65 void Shutdown(); 154 void Shutdown();
66 155
67 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); 156 // Schedule running of tasks in |task_graph|. All tasks previously
157 // scheduled but not present in |task_graph| will be canceled unless
158 // already running. Canceled tasks are moved to |completed_tasks_|
159 // without being run. The result is that once scheduled, a task is
160 // guaranteed to end up in the |completed_tasks_| queue even if they
161 // later get canceled by another call to ScheduleTasks().
162 void ScheduleTasks(internal::WorkerPoolTaskGraph* task_graph);
68 163
69 // Appends all completed tasks to worker pool's completed tasks queue 164 // Collect all completed tasks in |completed_tasks|. Returns true if idle.
70 // and returns true if idle. 165 bool CollectCompletedTasks(TaskDeque* completed_tasks);
71 bool CollectCompletedTasks();
72 166
73 private: 167 private:
74 // Appends all completed tasks to |completed_tasks|. Lock must 168 // Find a task that is ready to run by traversing the dependency graph.
75 // already be acquired before calling this function. 169 // Lock must be acquired before calling this function.
76 bool AppendCompletedTasksWithLockAcquired( 170 internal::WorkerPoolTask* FindTaskThatIsReadyToRunWithLockAcquired();
77 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); 171
172 // Collect all completed tasks by swapping the contents of
173 // |completed_tasks| and |completed_tasks_|. Lock must be acquired
174 // before calling this function. Returns true if idle.
175 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
78 176
79 // Schedule an OnIdleOnOriginThread callback if not already pending. 177 // Schedule an OnIdleOnOriginThread callback if not already pending.
80 // Lock must already be acquired before calling this function. 178 // Lock must already be acquired before calling this function.
81 void ScheduleOnIdleWithLockAcquired(); 179 void ScheduleOnIdleWithLockAcquired();
82 void OnIdleOnOriginThread(); 180 void OnIdleOnOriginThread();
83 181
84 // Overridden from base::DelegateSimpleThread: 182 // Overridden from base::DelegateSimpleThread:
85 virtual void Run() OVERRIDE; 183 virtual void Run() OVERRIDE;
86 184
87 // Pointer to worker pool. Can only be used on origin thread. 185 // Pointer to worker pool. Can only be used on origin thread.
(...skipping 15 matching lines...) Expand all
103 base::WeakPtrFactory<Inner> weak_ptr_factory_; 201 base::WeakPtrFactory<Inner> weak_ptr_factory_;
104 202
105 const base::Closure on_idle_callback_; 203 const base::Closure on_idle_callback_;
106 // Set when a OnIdleOnOriginThread() callback is pending. 204 // Set when a OnIdleOnOriginThread() callback is pending.
107 bool on_idle_pending_; 205 bool on_idle_pending_;
108 206
109 // Provides each running thread loop with a unique index. First thread 207 // Provides each running thread loop with a unique index. First thread
110 // loop index is 0. 208 // loop index is 0.
111 unsigned next_thread_index_; 209 unsigned next_thread_index_;
112 210
113 // Number of tasks currently running.
114 unsigned running_task_count_;
115
116 // Set during shutdown. Tells workers to exit when no more tasks 211 // Set during shutdown. Tells workers to exit when no more tasks
117 // are pending. 212 // are pending.
118 bool shutdown_; 213 bool shutdown_;
119 214
120 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; 215 // The dependency graph. Describes task dependencies and task priority.
121 TaskDeque pending_tasks_; 216 internal::WorkerPoolTaskGraph task_graph_;
217
218 // This set contains all pending tasks.
219 typedef base::hash_set<internal::WorkerPoolTask*> TaskSet;
220 TaskSet pending_tasks_;
221
222 // This set contains all currently running tasks.
223 TaskSet running_tasks_;
224
225 // Completed tasks not yet collected by origin thread.
122 TaskDeque completed_tasks_; 226 TaskDeque completed_tasks_;
123 227
124 ScopedPtrDeque<base::DelegateSimpleThread> workers_; 228 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
125 229
126 DISALLOW_COPY_AND_ASSIGN(Inner); 230 DISALLOW_COPY_AND_ASSIGN(Inner);
127 }; 231 };
128 232
129 WorkerPool::Inner::Inner(WorkerPool* worker_pool, 233 WorkerPool::Inner::Inner(WorkerPool* worker_pool,
130 size_t num_threads, 234 size_t num_threads,
131 const std::string& thread_name_prefix) 235 const std::string& thread_name_prefix)
132 : worker_pool_on_origin_thread_(worker_pool), 236 : worker_pool_on_origin_thread_(worker_pool),
133 lock_(), 237 lock_(),
134 has_pending_tasks_cv_(&lock_), 238 has_pending_tasks_cv_(&lock_),
135 origin_loop_(base::MessageLoopProxy::current()), 239 origin_loop_(base::MessageLoopProxy::current()),
136 weak_ptr_factory_(this), 240 weak_ptr_factory_(this),
137 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, 241 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
138 weak_ptr_factory_.GetWeakPtr())), 242 weak_ptr_factory_.GetWeakPtr())),
139 on_idle_pending_(false), 243 on_idle_pending_(false),
140 next_thread_index_(0), 244 next_thread_index_(0),
141 running_task_count_(0),
142 shutdown_(false) { 245 shutdown_(false) {
143 base::AutoLock lock(lock_); 246 base::AutoLock lock(lock_);
144 247
145 while (workers_.size() < num_threads) { 248 while (workers_.size() < num_threads) {
146 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 249 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
147 new base::DelegateSimpleThread( 250 new base::DelegateSimpleThread(
148 this, 251 this,
149 thread_name_prefix + 252 thread_name_prefix +
150 base::StringPrintf( 253 base::StringPrintf(
151 "Worker%u", 254 "Worker%u",
152 static_cast<unsigned>(workers_.size() + 1)).c_str())); 255 static_cast<unsigned>(workers_.size() + 1)).c_str()));
153 worker->Start(); 256 worker->Start();
154 workers_.push_back(worker.Pass()); 257 workers_.push_back(worker.Pass());
155 } 258 }
156 } 259 }
157 260
158 WorkerPool::Inner::~Inner() { 261 WorkerPool::Inner::~Inner() {
159 base::AutoLock lock(lock_); 262 base::AutoLock lock(lock_);
160 263
161 DCHECK(shutdown_); 264 DCHECK(shutdown_);
162 265
163 // Cancel all pending callbacks.
164 weak_ptr_factory_.InvalidateWeakPtrs();
165
166 DCHECK_EQ(0u, pending_tasks_.size()); 266 DCHECK_EQ(0u, pending_tasks_.size());
267 DCHECK_EQ(0u, running_tasks_.size());
167 DCHECK_EQ(0u, completed_tasks_.size()); 268 DCHECK_EQ(0u, completed_tasks_.size());
168 DCHECK_EQ(0u, running_task_count_);
169 } 269 }
170 270
171 void WorkerPool::Inner::Shutdown() { 271 void WorkerPool::Inner::Shutdown() {
172 { 272 {
173 base::AutoLock lock(lock_); 273 base::AutoLock lock(lock_);
174 274
175 DCHECK(!shutdown_); 275 DCHECK(!shutdown_);
176 shutdown_ = true; 276 shutdown_ = true;
177 277
178 // Wake up a worker so it knows it should exit. This will cause all workers 278 // Wake up a worker so it knows it should exit. This will cause all workers
179 // to exit as each will wake up another worker before exiting. 279 // to exit as each will wake up another worker before exiting.
180 has_pending_tasks_cv_.Signal(); 280 has_pending_tasks_cv_.Signal();
181 } 281 }
182 282
183 while (workers_.size()) { 283 while (workers_.size()) {
184 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 284 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
185 worker->Join(); 285 worker->Join();
186 } 286 }
287
288 // Cancel any pending OnIdle callback.
289 weak_ptr_factory_.InvalidateWeakPtrs();
187 } 290 }
188 291
189 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { 292 void WorkerPool::Inner::ScheduleTasks(
293 internal::WorkerPoolTaskGraph* task_graph) {
294 // Move all dependencies to |new_graph|.
295 internal::WorkerPoolTaskGraph new_graph;
296 new_graph.Swap(task_graph);
297
298 TaskSet tasks;
299 // Traverse task graph to build new pending task set.
300 for (internal::WorkerPoolTaskGraph::Iterator it(&new_graph); it; ++it) {
301 internal::WorkerPoolTask* task = it->task();
302 task->DidSchedule();
303 tasks.insert(task);
304 }
305
306 {
307 base::AutoLock lock(lock_);
308
309 // Move tasks not present in |new_graph| to |completed_tasks_|.
310 for (TaskSet::iterator it = pending_tasks_.begin();
311 it != pending_tasks_.end(); ++it) {
312 internal::WorkerPoolTask* task = *it;
313
314 // Task has completed if not present in |new_graph|.
315 if (tasks.find(task) == tasks.end())
316 completed_tasks_.push_back(task);
317 }
318
319 // Remove all running tasks from new pending task set.
320 for (TaskSet::iterator it = running_tasks_.begin();
321 it != running_tasks_.end(); ++it) {
322 internal::WorkerPoolTask* task = *it;
323
324 if (tasks.find(task) != tasks.end())
325 tasks.erase(task);
326 }
327
328 // And remove all completed tasks from new pending task set.
329 for (TaskDeque::iterator it = completed_tasks_.begin();
330 it != completed_tasks_.end(); ++it) {
331 internal::WorkerPoolTask* task = *it;
332
333 if (tasks.find(task) != tasks.end())
334 tasks.erase(task);
335 }
336
337 // Swap task graphs.
338 // Note: old tasks are intentionally destroyed after releasing |lock_|.
339 task_graph_.Swap(&new_graph);
340
341 // Swap pending task sets.
342 pending_tasks_.swap(tasks);
343
344 // There is more work available, so wake up worker thread.
345 has_pending_tasks_cv_.Signal();
346 }
347 }
348
349 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
190 base::AutoLock lock(lock_); 350 base::AutoLock lock(lock_);
191 351
192 pending_tasks_.push_back(task.Pass()); 352 return CollectCompletedTasksWithLockAcquired(completed_tasks);
193
194 // There is more work available, so wake up worker thread.
195 has_pending_tasks_cv_.Signal();
196 } 353 }
197 354
198 bool WorkerPool::Inner::CollectCompletedTasks() { 355 internal::WorkerPoolTask*
199 base::AutoLock lock(lock_); 356 WorkerPool::Inner::FindTaskThatIsReadyToRunWithLockAcquired() {
357 lock_.AssertAcquired();
200 358
201 return AppendCompletedTasksWithLockAcquired( 359 for (internal::WorkerPoolTaskGraph::Iterator it(&task_graph_); it; ++it) {
202 &worker_pool_on_origin_thread_->completed_tasks_); 360 internal::WorkerPoolTask* task = it->task();
361
362 // Skip task if not present in |pending_tasks_|. These tasks are
363 // either already running or have finished.
364 // Note: Skip traversal of sub-tree if this function becomes too slow.
365 if (pending_tasks_.find(task) == pending_tasks_.end())
366 continue;
367
368 // Task is ready to run if all direct children have finished running.
369 bool all_dependencies_are_satisfied = true;
370
371 for (internal::WorkerPoolTaskGraph::TaskIterator task_it(*it);
372 task_it && all_dependencies_are_satisfied;
373 ++task_it) {
374 all_dependencies_are_satisfied &= task_it->HasFinished();
375 }
376
377 if (all_dependencies_are_satisfied)
378 return task;
379 }
380
381 return NULL;
203 } 382 }
204 383
205 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( 384 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
206 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { 385 TaskDeque* completed_tasks) {
207 lock_.AssertAcquired(); 386 lock_.AssertAcquired();
208 387
209 while (completed_tasks_.size()) 388 DCHECK_EQ(0u, completed_tasks->size());
210 completed_tasks->push_back(completed_tasks_.take_front().Pass()); 389 completed_tasks->swap(completed_tasks_);
211 390
212 return !running_task_count_ && pending_tasks_.empty(); 391 return running_tasks_.empty() && pending_tasks_.empty();
213 } 392 }
214 393
215 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { 394 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
216 lock_.AssertAcquired(); 395 lock_.AssertAcquired();
217 396
218 if (on_idle_pending_) 397 if (on_idle_pending_)
219 return; 398 return;
220 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); 399 origin_loop_->PostTask(FROM_HERE, on_idle_callback_);
221 on_idle_pending_ = true; 400 on_idle_pending_ = true;
222 } 401 }
223 402
224 void WorkerPool::Inner::OnIdleOnOriginThread() { 403 void WorkerPool::Inner::OnIdleOnOriginThread() {
404 TaskDeque completed_tasks;
405
225 { 406 {
226 base::AutoLock lock(lock_); 407 base::AutoLock lock(lock_);
227 408
228 DCHECK(on_idle_pending_); 409 DCHECK(on_idle_pending_);
229 on_idle_pending_ = false; 410 on_idle_pending_ = false;
230 411
231 // Early out if no longer idle. 412 // Early out if no longer idle.
232 if (running_task_count_ || !pending_tasks_.empty()) 413 if (!running_tasks_.empty() || !pending_tasks_.empty())
233 return; 414 return;
234 415
235 AppendCompletedTasksWithLockAcquired( 416 CollectCompletedTasksWithLockAcquired(&completed_tasks);
236 &worker_pool_on_origin_thread_->completed_tasks_);
237 } 417 }
238 418
239 worker_pool_on_origin_thread_->OnIdle(); 419 worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
240 } 420 }
241 421
242 void WorkerPool::Inner::Run() { 422 void WorkerPool::Inner::Run() {
243 #if defined(OS_ANDROID) 423 #if defined(OS_ANDROID)
244 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) 424 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
245 int nice_value = 10; // Idle priority. 425 int nice_value = 10; // Idle priority.
246 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); 426 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value);
247 #endif 427 #endif
248 428
249 base::AutoLock lock(lock_); 429 base::AutoLock lock(lock_);
250 430
251 // Get a unique thread index. 431 // Get a unique thread index.
252 int thread_index = next_thread_index_++; 432 int thread_index = next_thread_index_++;
253 433
254 while (true) { 434 while (true) {
255 if (pending_tasks_.empty()) { 435 scoped_refptr<internal::WorkerPoolTask> task(
256 // Exit when shutdown is set and no more tasks are pending. 436 FindTaskThatIsReadyToRunWithLockAcquired());
257 if (shutdown_) 437 if (!task) {
258 break; 438 if (pending_tasks_.empty()) {
439 // Exit when shutdown is set and no more tasks are pending.
440 if (shutdown_)
441 break;
259 442
260 // Schedule an idle callback if requested and not pending. 443 // Schedule an idle callback if no tasks are running.
261 if (!running_task_count_) 444 if (running_tasks_.empty())
262 ScheduleOnIdleWithLockAcquired(); 445 ScheduleOnIdleWithLockAcquired();
446 }
263 447
264 // Wait for new pending tasks. 448 // Wait for more tasks.
265 has_pending_tasks_cv_.Wait(); 449 has_pending_tasks_cv_.Wait();
266 continue; 450 continue;
267 } 451 }
268 452
269 // Get next task. 453 // First remove task from |pending_tasks_|.
270 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); 454 DCHECK(pending_tasks_.find(task) != pending_tasks_.end());
455 pending_tasks_.erase(task);
271 456
272 // Increment |running_task_count_| before starting to run task. 457 // Insert task in |running_tasks_| before starting to run it.
273 running_task_count_++; 458 DCHECK(running_tasks_.find(task) == running_tasks_.end());
459 running_tasks_.insert(task);
274 460
275 // There may be more work available, so wake up another 461 // There may be more work available, so wake up another worker thread.
276 // worker thread.
277 has_pending_tasks_cv_.Signal(); 462 has_pending_tasks_cv_.Signal();
278 463
464 // Call WillRun() before releasing |lock_| and running task.
465 task->WillRun();
466
279 { 467 {
280 base::AutoUnlock unlock(lock_); 468 base::AutoUnlock unlock(lock_);
281 469
282 task->RunOnThread(thread_index); 470 task->RunOnThread(thread_index);
283 } 471 }
284 472
285 completed_tasks_.push_back(task.Pass()); 473 // This will mark task as finished.
474 task->DidRun();
286 475
287 // Decrement |running_task_count_| now that we are done running task. 476 // Remove task from |running_tasks_| now that we are done running it.
288 running_task_count_--; 477 DCHECK(running_tasks_.find(task) != running_tasks_.end());
478 running_tasks_.erase(task);
479
480 // Finally add task to |completed_tasks_|.
481 completed_tasks_.push_back(task);
289 } 482 }
290 483
291 // We noticed we should exit. Wake up the next worker so it knows it should 484 // We noticed we should exit. Wake up the next worker so it knows it should
292 // exit as well (because the Shutdown() code only signals once). 485 // exit as well (because the Shutdown() code only signals once).
293 has_pending_tasks_cv_.Signal(); 486 has_pending_tasks_cv_.Signal();
294 } 487 }
295 488
296 WorkerPool::WorkerPool(size_t num_threads, 489 WorkerPool::WorkerPool(size_t num_threads,
297 base::TimeDelta check_for_completed_tasks_delay, 490 base::TimeDelta check_for_completed_tasks_delay,
298 const std::string& thread_name_prefix) 491 const std::string& thread_name_prefix)
299 : client_(NULL), 492 : client_(NULL),
300 origin_loop_(base::MessageLoopProxy::current()), 493 origin_loop_(base::MessageLoopProxy::current()),
301 weak_ptr_factory_(this),
302 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), 494 check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
303 check_for_completed_tasks_pending_(false), 495 check_for_completed_tasks_pending_(false),
304 inner_(make_scoped_ptr(new Inner(this, 496 inner_(make_scoped_ptr(new Inner(this,
305 num_threads, 497 num_threads,
306 thread_name_prefix))) { 498 thread_name_prefix))) {
307 } 499 }
308 500
309 WorkerPool::~WorkerPool() { 501 WorkerPool::~WorkerPool() {
310 // Cancel all pending callbacks.
311 weak_ptr_factory_.InvalidateWeakPtrs();
312
313 DCHECK_EQ(0u, completed_tasks_.size());
314 } 502 }
315 503
316 void WorkerPool::Shutdown() { 504 void WorkerPool::Shutdown() {
317 inner_->Shutdown(); 505 inner_->Shutdown();
318 inner_->CollectCompletedTasks(); 506
319 DispatchCompletionCallbacks(); 507 TaskDeque completed_tasks;
508 inner_->CollectCompletedTasks(&completed_tasks);
509 DispatchCompletionCallbacks(&completed_tasks);
320 } 510 }
321 511
322 void WorkerPool::PostTaskAndReply( 512 void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
323 const Callback& task, const base::Closure& reply) {
324 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
325 task,
326 reply)).PassAs<internal::WorkerPoolTask>());
327 }
328
329 void WorkerPool::OnIdle() {
330 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); 513 TRACE_EVENT0("cc", "WorkerPool::OnIdle");
331 514
332 DispatchCompletionCallbacks(); 515 DispatchCompletionCallbacks(completed_tasks);
516
517 // Cancel any pending check for completed tasks.
518 check_for_completed_tasks_callback_.Cancel();
519 check_for_completed_tasks_pending_ = false;
333 } 520 }
334 521
335 void WorkerPool::ScheduleCheckForCompletedTasks() { 522 void WorkerPool::ScheduleCheckForCompletedTasks() {
336 if (check_for_completed_tasks_pending_) 523 if (check_for_completed_tasks_pending_)
337 return; 524 return;
525 check_for_completed_tasks_callback_.Reset(
526 base::Bind(&WorkerPool::CheckForCompletedTasks,
527 base::Unretained(this)));
338 origin_loop_->PostDelayedTask( 528 origin_loop_->PostDelayedTask(
339 FROM_HERE, 529 FROM_HERE,
340 base::Bind(&WorkerPool::CheckForCompletedTasks, 530 check_for_completed_tasks_callback_.callback(),
341 weak_ptr_factory_.GetWeakPtr()),
342 check_for_completed_tasks_delay_); 531 check_for_completed_tasks_delay_);
343 check_for_completed_tasks_pending_ = true; 532 check_for_completed_tasks_pending_ = true;
344 } 533 }
345 534
346 void WorkerPool::CheckForCompletedTasks() { 535 void WorkerPool::CheckForCompletedTasks() {
347 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); 536 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
348 DCHECK(check_for_completed_tasks_pending_); 537 DCHECK(check_for_completed_tasks_pending_);
349 check_for_completed_tasks_pending_ = false; 538 check_for_completed_tasks_pending_ = false;
350 539
540 TaskDeque completed_tasks;
541
351 // Schedule another check for completed tasks if not idle. 542 // Schedule another check for completed tasks if not idle.
352 if (!inner_->CollectCompletedTasks()) 543 if (!inner_->CollectCompletedTasks(&completed_tasks))
353 ScheduleCheckForCompletedTasks(); 544 ScheduleCheckForCompletedTasks();
354 545
355 DispatchCompletionCallbacks(); 546 DispatchCompletionCallbacks(&completed_tasks);
356 } 547 }
357 548
358 void WorkerPool::DispatchCompletionCallbacks() { 549 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
359 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); 550 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
360 551
361 if (completed_tasks_.empty()) 552 // Early out when |completed_tasks| is empty to prevent unnecessary
553 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
554 if (completed_tasks->empty())
362 return; 555 return;
363 556
364 while (completed_tasks_.size()) { 557 while (!completed_tasks->empty()) {
365 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); 558 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
559 completed_tasks->pop_front();
366 task->DidComplete(); 560 task->DidComplete();
561 task->DispatchCompletionCallback();
367 } 562 }
368 563
369 DCHECK(client_); 564 DCHECK(client_);
370 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); 565 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
371 } 566 }
372 567
373 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { 568 void WorkerPool::ScheduleTasks(internal::WorkerPoolTaskGraph* task_graph) {
374 // Schedule check for completed tasks if not pending. 569 TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks");
375 ScheduleCheckForCompletedTasks();
376 570
377 inner_->PostTask(task.Pass()); 571 // Schedule check for completed tasks if graph is non-empty.
572 if (internal::WorkerPoolTaskDependency::Iterator(task_graph))
573 ScheduleCheckForCompletedTasks();
574
575 inner_->ScheduleTasks(task_graph);
378 } 576 }
379 577
380 } // namespace cc 578 } // namespace cc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698