Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(16)

Side by Side Diff: cc/base/worker_pool.cc

Issue 14689004: Re-land: cc: Cancel and re-prioritize worker pool tasks. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: move alignment check to RP Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "cc/base/worker_pool.h" 5 #include "cc/base/worker_pool.h"
6 6
7 #if defined(OS_ANDROID) 7 #if defined(OS_ANDROID)
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
9 #include <sys/resource.h> 9 #include <sys/resource.h>
10 #endif 10 #endif
11 11
12 #include <algorithm> 12 #include <set>
13 13
14 #include "base/bind.h" 14 #include "base/bind.h"
15 #include "base/debug/trace_event.h" 15 #include "base/debug/trace_event.h"
16 #include "base/stringprintf.h" 16 #include "base/stringprintf.h"
17 #include "base/synchronization/condition_variable.h" 17 #include "base/synchronization/condition_variable.h"
18 #include "base/threading/simple_thread.h" 18 #include "base/threading/simple_thread.h"
19 #include "cc/base/scoped_ptr_deque.h"
19 20
20 namespace cc { 21 namespace cc {
21 22
22 namespace { 23 namespace {
23 24
24 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { 25 class WorkerPoolTaskGraphImpl : public internal::WorkerPoolTaskGraph {
25 public: 26 public:
26 WorkerPoolTaskImpl(const WorkerPool::Callback& task, 27 WorkerPoolTaskGraphImpl() {}
27 const base::Closure& reply)
28 : internal::WorkerPoolTask(reply),
29 task_(task) {}
30 28
31 virtual void RunOnThread(unsigned thread_index) OVERRIDE { 29 // Overridden from internal::WorkerPoolTaskGraph:
32 task_.Run(); 30 virtual bool HasMoreTasks() OVERRIDE { return false; }
31 virtual bool HasTask(internal::WorkerPoolTask* task) OVERRIDE {
32 return false;
33 }
34 virtual internal::WorkerPoolTask* TopTask() OVERRIDE {
35 NOTREACHED();
36 return NULL;
37 }
38 virtual scoped_refptr<internal::WorkerPoolTask> TakeTask(
39 internal::WorkerPoolTask* task) OVERRIDE {
40 NOTREACHED();
41 return NULL;
33 } 42 }
34 43
35 private: 44 private:
36 WorkerPool::Callback task_; 45 virtual ~WorkerPoolTaskGraphImpl() {}
37 }; 46 };
38 47
39 } // namespace 48 } // namespace
40 49
41 namespace internal {
42
43 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
44 }
45
46 WorkerPoolTask::~WorkerPoolTask() {
47 }
48
49 void WorkerPoolTask::DidComplete() {
50 reply_.Run();
51 }
52
53 } // namespace internal
54
55 // Internal to the worker pool. Any data or logic that needs to be 50 // Internal to the worker pool. Any data or logic that needs to be
56 // shared between threads lives in this class. All members are guarded 51 // shared between threads lives in this class. All members are guarded
57 // by |lock_|. 52 // by |lock_|.
58 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { 53 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
59 public: 54 public:
60 Inner(WorkerPool* worker_pool, 55 Inner(WorkerPool* worker_pool,
61 size_t num_threads, 56 size_t num_threads,
62 const std::string& thread_name_prefix); 57 const std::string& thread_name_prefix);
63 virtual ~Inner(); 58 virtual ~Inner();
64 59
65 void Shutdown(); 60 void Shutdown();
66 61
67 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); 62 // Schedule running of tasks in |task_graph|. All tasks previously
63 // scheduled but not present in |task_graph| will be canceled unless
64 // already running. Canceled tasks are moved to |completed_tasks_|
65 // without being run. The result is that once scheduled, a task is
66 // guaranteed to end up in the |completed_tasks_| queue even if they
67 // later get canceled by another call to ScheduleTasks().
68 void ScheduleTasks(scoped_ptr<internal::WorkerPoolTaskGraph> task_graph);
68 69
69 // Appends all completed tasks to worker pool's completed tasks queue 70 // Collect all completed tasks in |completed_tasks|. Returns true if idle.
70 // and returns true if idle. 71 bool CollectCompletedTasks(TaskDeque* completed_tasks);
71 bool CollectCompletedTasks();
72 72
73 private: 73 private:
74 // Appends all completed tasks to |completed_tasks|. Lock must 74 // Collect all completed tasks by swapping the contents of
75 // already be acquired before calling this function. 75 // |completed_tasks| and |completed_tasks_|. Lock must be acquired
76 bool AppendCompletedTasksWithLockAcquired( 76 // before calling this function. Returns true if idle.
77 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); 77 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
78 78
79 // Schedule an OnIdleOnOriginThread callback if not already pending. 79 // Schedule an OnIdleOnOriginThread callback if not already pending.
80 // Lock must already be acquired before calling this function. 80 // Lock must already be acquired before calling this function.
81 void ScheduleOnIdleWithLockAcquired(); 81 void ScheduleOnIdleWithLockAcquired();
82 void OnIdleOnOriginThread(); 82 void OnIdleOnOriginThread();
83 83
84 // Overridden from base::DelegateSimpleThread: 84 // Overridden from base::DelegateSimpleThread:
85 virtual void Run() OVERRIDE; 85 virtual void Run() OVERRIDE;
86 86
87 // Pointer to worker pool. Can only be used on origin thread. 87 // Pointer to worker pool. Can only be used on origin thread.
(...skipping 15 matching lines...) Expand all
103 base::WeakPtrFactory<Inner> weak_ptr_factory_; 103 base::WeakPtrFactory<Inner> weak_ptr_factory_;
104 104
105 const base::Closure on_idle_callback_; 105 const base::Closure on_idle_callback_;
106 // Set when a OnIdleOnOriginThread() callback is pending. 106 // Set when a OnIdleOnOriginThread() callback is pending.
107 bool on_idle_pending_; 107 bool on_idle_pending_;
108 108
109 // Provides each running thread loop with a unique index. First thread 109 // Provides each running thread loop with a unique index. First thread
110 // loop index is 0. 110 // loop index is 0.
111 unsigned next_thread_index_; 111 unsigned next_thread_index_;
112 112
113 // Number of tasks currently running.
114 unsigned running_task_count_;
115
116 // Set during shutdown. Tells workers to exit when no more tasks 113 // Set during shutdown. Tells workers to exit when no more tasks
117 // are pending. 114 // are pending.
118 bool shutdown_; 115 bool shutdown_;
119 116
120 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; 117 // The task graph. Provides tasks in order of priority.
121 TaskDeque pending_tasks_; 118 scoped_ptr<internal::WorkerPoolTaskGraph> task_graph_;
119
120 // This set contains all currently running tasks.
121 typedef std::set<internal::WorkerPoolTask*> TaskSet;
122 TaskSet running_tasks_;
123
124 // Completed tasks not yet collected by origin thread.
122 TaskDeque completed_tasks_; 125 TaskDeque completed_tasks_;
123 126
124 ScopedPtrDeque<base::DelegateSimpleThread> workers_; 127 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
125 128
126 DISALLOW_COPY_AND_ASSIGN(Inner); 129 DISALLOW_COPY_AND_ASSIGN(Inner);
127 }; 130 };
128 131
129 WorkerPool::Inner::Inner(WorkerPool* worker_pool, 132 WorkerPool::Inner::Inner(WorkerPool* worker_pool,
130 size_t num_threads, 133 size_t num_threads,
131 const std::string& thread_name_prefix) 134 const std::string& thread_name_prefix)
132 : worker_pool_on_origin_thread_(worker_pool), 135 : worker_pool_on_origin_thread_(worker_pool),
133 lock_(), 136 lock_(),
134 has_pending_tasks_cv_(&lock_), 137 has_pending_tasks_cv_(&lock_),
135 origin_loop_(base::MessageLoopProxy::current()), 138 origin_loop_(base::MessageLoopProxy::current()),
136 weak_ptr_factory_(this), 139 weak_ptr_factory_(this),
137 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, 140 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
138 weak_ptr_factory_.GetWeakPtr())), 141 weak_ptr_factory_.GetWeakPtr())),
139 on_idle_pending_(false), 142 on_idle_pending_(false),
140 next_thread_index_(0), 143 next_thread_index_(0),
141 running_task_count_(0), 144 shutdown_(false),
142 shutdown_(false) { 145 task_graph_(new WorkerPoolTaskGraphImpl) {
143 base::AutoLock lock(lock_); 146 base::AutoLock lock(lock_);
144 147
145 while (workers_.size() < num_threads) { 148 while (workers_.size() < num_threads) {
146 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 149 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
147 new base::DelegateSimpleThread( 150 new base::DelegateSimpleThread(
148 this, 151 this,
149 thread_name_prefix + 152 thread_name_prefix +
150 base::StringPrintf( 153 base::StringPrintf(
151 "Worker%u", 154 "Worker%u",
152 static_cast<unsigned>(workers_.size() + 1)).c_str())); 155 static_cast<unsigned>(workers_.size() + 1)).c_str()));
153 worker->Start(); 156 worker->Start();
154 workers_.push_back(worker.Pass()); 157 workers_.push_back(worker.Pass());
155 } 158 }
156 } 159 }
157 160
158 WorkerPool::Inner::~Inner() { 161 WorkerPool::Inner::~Inner() {
159 base::AutoLock lock(lock_); 162 base::AutoLock lock(lock_);
160 163
161 DCHECK(shutdown_); 164 DCHECK(shutdown_);
162 165
163 // Cancel all pending callbacks. 166 DCHECK(!task_graph_->HasMoreTasks());
164 weak_ptr_factory_.InvalidateWeakPtrs(); 167 DCHECK_EQ(0u, running_tasks_.size());
165
166 DCHECK_EQ(0u, pending_tasks_.size());
167 DCHECK_EQ(0u, completed_tasks_.size()); 168 DCHECK_EQ(0u, completed_tasks_.size());
168 DCHECK_EQ(0u, running_task_count_);
169 } 169 }
170 170
171 void WorkerPool::Inner::Shutdown() { 171 void WorkerPool::Inner::Shutdown() {
172 { 172 {
173 base::AutoLock lock(lock_); 173 base::AutoLock lock(lock_);
174 174
175 DCHECK(!shutdown_); 175 DCHECK(!shutdown_);
176 shutdown_ = true; 176 shutdown_ = true;
177 177
178 // Wake up a worker so it knows it should exit. This will cause all workers 178 // Wake up a worker so it knows it should exit. This will cause all workers
179 // to exit as each will wake up another worker before exiting. 179 // to exit as each will wake up another worker before exiting.
180 has_pending_tasks_cv_.Signal(); 180 has_pending_tasks_cv_.Signal();
181 } 181 }
182 182
183 while (workers_.size()) { 183 while (workers_.size()) {
184 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 184 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
185 worker->Join(); 185 worker->Join();
186 } 186 }
187
188 // Cancel any pending OnIdle callback.
189 weak_ptr_factory_.InvalidateWeakPtrs();
187 } 190 }
188 191
189 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { 192 void WorkerPool::Inner::ScheduleTasks(
193 scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) {
190 base::AutoLock lock(lock_); 194 base::AutoLock lock(lock_);
191 195
192 pending_tasks_.push_back(task.Pass()); 196 // Move tasks not present in new graph to |completed_tasks_|.
197 while (task_graph_->HasMoreTasks()) {
198 scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask(
199 task_graph_->TopTask());
200
201 // Task has not completed if present in new graph.
202 if (task_graph->HasTask(task))
203 continue;
204
205 completed_tasks_.push_back(task);
206 }
207
208 // Take any running tasks from new graph.
209 for (TaskSet::iterator it = running_tasks_.begin();
210 it != running_tasks_.end(); ++it) {
211 if (task_graph->HasTask(*it))
212 task_graph->TakeTask(*it);
213 }
214
215 // And take any completed tasks from new graph.
216 for (TaskDeque::iterator it = completed_tasks_.begin();
217 it != completed_tasks_.end(); ++it) {
218 if (task_graph->HasTask(*it))
219 task_graph->TakeTask(*it);
220 }
221
222 // Finally switch to the new graph.
223 task_graph_ = task_graph.Pass();
193 224
194 // There is more work available, so wake up worker thread. 225 // There is more work available, so wake up worker thread.
195 has_pending_tasks_cv_.Signal(); 226 has_pending_tasks_cv_.Signal();
196 } 227 }
197 228
198 bool WorkerPool::Inner::CollectCompletedTasks() { 229 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
199 base::AutoLock lock(lock_); 230 base::AutoLock lock(lock_);
200 231
201 return AppendCompletedTasksWithLockAcquired( 232 return CollectCompletedTasksWithLockAcquired(completed_tasks);
202 &worker_pool_on_origin_thread_->completed_tasks_);
203 } 233 }
204 234
205 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( 235 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
206 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { 236 TaskDeque* completed_tasks) {
207 lock_.AssertAcquired(); 237 lock_.AssertAcquired();
208 238
209 while (completed_tasks_.size()) 239 DCHECK_EQ(0u, completed_tasks->size());
210 completed_tasks->push_back(completed_tasks_.take_front().Pass()); 240 completed_tasks->swap(completed_tasks_);
211 241
212 return !running_task_count_ && pending_tasks_.empty(); 242 return running_tasks_.empty() && !task_graph_->HasMoreTasks();
213 } 243 }
214 244
215 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { 245 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
216 lock_.AssertAcquired(); 246 lock_.AssertAcquired();
217 247
218 if (on_idle_pending_) 248 if (on_idle_pending_)
219 return; 249 return;
220 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); 250 origin_loop_->PostTask(FROM_HERE, on_idle_callback_);
221 on_idle_pending_ = true; 251 on_idle_pending_ = true;
222 } 252 }
223 253
224 void WorkerPool::Inner::OnIdleOnOriginThread() { 254 void WorkerPool::Inner::OnIdleOnOriginThread() {
255 TaskDeque completed_tasks;
256
225 { 257 {
226 base::AutoLock lock(lock_); 258 base::AutoLock lock(lock_);
227 259
228 DCHECK(on_idle_pending_); 260 DCHECK(on_idle_pending_);
229 on_idle_pending_ = false; 261 on_idle_pending_ = false;
230 262
231 // Early out if no longer idle. 263 // Early out if no longer idle.
232 if (running_task_count_ || !pending_tasks_.empty()) 264 if (!running_tasks_.empty() || task_graph_->HasMoreTasks())
233 return; 265 return;
234 266
235 AppendCompletedTasksWithLockAcquired( 267 CollectCompletedTasksWithLockAcquired(&completed_tasks);
236 &worker_pool_on_origin_thread_->completed_tasks_);
237 } 268 }
238 269
239 worker_pool_on_origin_thread_->OnIdle(); 270 worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
240 } 271 }
241 272
242 void WorkerPool::Inner::Run() { 273 void WorkerPool::Inner::Run() {
243 #if defined(OS_ANDROID) 274 #if defined(OS_ANDROID)
244 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) 275 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
245 int nice_value = 10; // Idle priority. 276 int nice_value = 10; // Idle priority.
246 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); 277 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value);
247 #endif 278 #endif
248 279
249 base::AutoLock lock(lock_); 280 base::AutoLock lock(lock_);
250 281
251 // Get a unique thread index. 282 // Get a unique thread index.
252 int thread_index = next_thread_index_++; 283 int thread_index = next_thread_index_++;
253 284
254 while (true) { 285 while (true) {
255 if (pending_tasks_.empty()) { 286 if (!task_graph_->HasMoreTasks()) {
256 // Exit when shutdown is set and no more tasks are pending. 287 // Exit when shutdown is set and no more tasks are pending.
257 if (shutdown_) 288 if (shutdown_)
vmpstr 2013/05/06 23:08:48 Now that task_graph_ contains all work that needs
reveman 2013/05/07 01:33:54 Yes, we could cancel all pending tasks at shutdown
258 break; 289 break;
259 290
260 // Schedule an idle callback if requested and not pending. 291 // Schedule an idle callback if not tasks are running.
vmpstr 2013/05/06 23:08:48 nit: not -> no
reveman 2013/05/07 01:33:54 Done.
261 if (!running_task_count_) 292 if (running_tasks_.empty())
262 ScheduleOnIdleWithLockAcquired(); 293 ScheduleOnIdleWithLockAcquired();
263 294
264 // Wait for new pending tasks. 295 // Wait for more tasks.
265 has_pending_tasks_cv_.Wait(); 296 has_pending_tasks_cv_.Wait();
266 continue; 297 continue;
267 } 298 }
268 299
269 // Get next task. 300 // Take the highest priority task from the task graph.
270 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); 301 scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask(
302 task_graph_->TopTask());
271 303
272 // Increment |running_task_count_| before starting to run task. 304 // Insert task in |running_tasks_| before starting to run it.
273 running_task_count_++; 305 running_tasks_.insert(task);
274 306
275 // There may be more work available, so wake up another 307 // There may be more work available, so wake up another worker thread.
276 // worker thread.
277 has_pending_tasks_cv_.Signal(); 308 has_pending_tasks_cv_.Signal();
278 309
279 { 310 {
280 base::AutoUnlock unlock(lock_); 311 base::AutoUnlock unlock(lock_);
281 312
282 task->RunOnThread(thread_index); 313 task->RunOnThread(thread_index);
283 } 314 }
284 315
285 completed_tasks_.push_back(task.Pass()); 316 // Remove task from |running_tasks_| now that we are done running it.
317 running_tasks_.erase(task);
286 318
287 // Decrement |running_task_count_| now that we are done running task. 319 // Finally add task to |completed_tasks_|.
288 running_task_count_--; 320 completed_tasks_.push_back(task);
289 } 321 }
290 322
291 // We noticed we should exit. Wake up the next worker so it knows it should 323 // We noticed we should exit. Wake up the next worker so it knows it should
292 // exit as well (because the Shutdown() code only signals once). 324 // exit as well (because the Shutdown() code only signals once).
293 has_pending_tasks_cv_.Signal(); 325 has_pending_tasks_cv_.Signal();
294 } 326 }
295 327
296 WorkerPool::WorkerPool(WorkerPoolClient* client, 328 WorkerPool::WorkerPool(WorkerPoolClient* client,
297 size_t num_threads, 329 size_t num_threads,
298 base::TimeDelta check_for_completed_tasks_delay, 330 base::TimeDelta check_for_completed_tasks_delay,
299 const std::string& thread_name_prefix) 331 const std::string& thread_name_prefix)
300 : client_(client), 332 : client_(client),
301 origin_loop_(base::MessageLoopProxy::current()), 333 origin_loop_(base::MessageLoopProxy::current()),
302 weak_ptr_factory_(this),
303 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), 334 check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
304 check_for_completed_tasks_pending_(false), 335 check_for_completed_tasks_pending_(false),
305 inner_(make_scoped_ptr(new Inner(this, 336 inner_(make_scoped_ptr(new Inner(this,
306 num_threads, 337 num_threads,
307 thread_name_prefix))) { 338 thread_name_prefix))) {
308 } 339 }
309 340
310 WorkerPool::~WorkerPool() { 341 WorkerPool::~WorkerPool() {
311 // Cancel all pending callbacks.
312 weak_ptr_factory_.InvalidateWeakPtrs();
313
314 DCHECK_EQ(0u, completed_tasks_.size());
315 } 342 }
316 343
317 void WorkerPool::Shutdown() { 344 void WorkerPool::Shutdown() {
318 inner_->Shutdown(); 345 inner_->Shutdown();
319 inner_->CollectCompletedTasks(); 346
320 DispatchCompletionCallbacks(); 347 TaskDeque completed_tasks;
348 inner_->CollectCompletedTasks(&completed_tasks);
349 DispatchCompletionCallbacks(&completed_tasks);
321 } 350 }
322 351
323 void WorkerPool::PostTaskAndReply( 352 void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
324 const Callback& task, const base::Closure& reply) {
325 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
326 task,
327 reply)).PassAs<internal::WorkerPoolTask>());
328 }
329
330 void WorkerPool::OnIdle() {
331 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); 353 TRACE_EVENT0("cc", "WorkerPool::OnIdle");
332 354
333 DispatchCompletionCallbacks(); 355 DispatchCompletionCallbacks(completed_tasks);
356
357 // Cancel any pending check for completed tasks.
358 check_for_completed_tasks_callback_.Cancel();
359 check_for_completed_tasks_pending_ = false;
334 } 360 }
335 361
336 void WorkerPool::ScheduleCheckForCompletedTasks() { 362 void WorkerPool::ScheduleCheckForCompletedTasks() {
337 if (check_for_completed_tasks_pending_) 363 if (check_for_completed_tasks_pending_)
338 return; 364 return;
365 check_for_completed_tasks_callback_.Reset(
366 base::Bind(&WorkerPool::CheckForCompletedTasks,
367 base::Unretained(this)));
339 origin_loop_->PostDelayedTask( 368 origin_loop_->PostDelayedTask(
340 FROM_HERE, 369 FROM_HERE,
341 base::Bind(&WorkerPool::CheckForCompletedTasks, 370 check_for_completed_tasks_callback_.callback(),
342 weak_ptr_factory_.GetWeakPtr()),
343 check_for_completed_tasks_delay_); 371 check_for_completed_tasks_delay_);
344 check_for_completed_tasks_pending_ = true; 372 check_for_completed_tasks_pending_ = true;
345 } 373 }
346 374
347 void WorkerPool::CheckForCompletedTasks() { 375 void WorkerPool::CheckForCompletedTasks() {
348 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); 376 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
349 DCHECK(check_for_completed_tasks_pending_); 377 DCHECK(check_for_completed_tasks_pending_);
350 check_for_completed_tasks_pending_ = false; 378 check_for_completed_tasks_pending_ = false;
351 379
380 TaskDeque completed_tasks;
381
352 // Schedule another check for completed tasks if not idle. 382 // Schedule another check for completed tasks if not idle.
353 if (!inner_->CollectCompletedTasks()) 383 if (!inner_->CollectCompletedTasks(&completed_tasks))
354 ScheduleCheckForCompletedTasks(); 384 ScheduleCheckForCompletedTasks();
355 385
356 DispatchCompletionCallbacks(); 386 DispatchCompletionCallbacks(&completed_tasks);
357 } 387 }
358 388
359 void WorkerPool::DispatchCompletionCallbacks() { 389 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
360 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); 390 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
361 391
362 if (completed_tasks_.empty()) 392 // Early out when |completed_tasks| is empty to prevent unnecessary
393 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
394 if (completed_tasks->empty())
363 return; 395 return;
364 396
365 while (completed_tasks_.size()) { 397 while (completed_tasks->size()) {
vmpstr 2013/05/06 23:08:48 nit: consider !empty
reveman 2013/05/07 01:33:54 Done.
366 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); 398 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
367 task->DidComplete(); 399 completed_tasks->pop_front();
400 task->DispatchCompletionCallback();
368 } 401 }
369 402
370 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); 403 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
371 } 404 }
372 405
373 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { 406 void WorkerPool::ScheduleTasks(
374 // Schedule check for completed tasks if not pending. 407 scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) {
375 ScheduleCheckForCompletedTasks(); 408 if (task_graph->HasMoreTasks())
376 409 ScheduleCheckForCompletedTasks();
377 inner_->PostTask(task.Pass()); 410 inner_->ScheduleTasks(task_graph.Pass());
378 } 411 }
379 412
380 } // namespace cc 413 } // namespace cc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698