Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: cc/worker_pool.cc

Issue 12472028: Part 1 of cc/ directory shuffles: base (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "cc/worker_pool.h"
6
7 #include "base/bind.h"
8 #include "base/debug/trace_event.h"
9 #include "base/stringprintf.h"
10 #include "base/synchronization/condition_variable.h"
11 #include "base/threading/simple_thread.h"
12 #include "cc/rendering_stats.h"
13
14 #if defined(OS_ANDROID)
15 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
16 #include <sys/resource.h>
17 #endif
18
19 namespace cc {
20
21 namespace {
22
23 class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
24 public:
25 WorkerPoolTaskImpl(const WorkerPool::Callback& task,
26 const base::Closure& reply)
27 : internal::WorkerPoolTask(reply),
28 task_(task) {}
29
30 virtual bool IsCheap() OVERRIDE { return false; }
31
32 virtual void Run(RenderingStats* rendering_stats) OVERRIDE {
33 task_.Run(rendering_stats);
34 }
35
36 virtual void RunOnThread(
37 RenderingStats* rendering_stats, unsigned thread_index) OVERRIDE {
38 task_.Run(rendering_stats);
39 }
40
41 private:
42 WorkerPool::Callback task_;
43 };
44
45 } // namespace
46
47 namespace internal {
48
49 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
50 }
51
52 WorkerPoolTask::~WorkerPoolTask() {
53 }
54
55 void WorkerPoolTask::DidComplete() {
56 reply_.Run();
57 }
58
59 } // namespace internal
60
61 // Internal to the worker pool. Any data or logic that needs to be
62 // shared between threads lives in this class. All members are guarded
63 // by |lock_|.
64 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
65 public:
66 Inner(WorkerPool* worker_pool,
67 size_t num_threads,
68 const std::string& thread_name_prefix,
69 bool need_on_task_completed_callback);
70 ~Inner();
71
72 void Shutdown();
73
74 void SetRecordRenderingStats(bool record_rendering_stats);
75
76 void GetRenderingStats(RenderingStats* stats);
77
78 void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
79
80 // Appends all completed tasks to worker pool's completed tasks queue
81 // and returns true if idle.
82 bool CollectCompletedTasks();
83
84 // Runs cheap tasks on caller thread until |time_limit| is reached
85 // and returns true if idle.
86 bool RunCheapTasksUntilTimeLimit(base::TimeTicks time_limit);
87
88 private:
89 // Appends all completed tasks to |completed_tasks|. Lock must
90 // already be acquired before calling this function.
91 bool AppendCompletedTasksWithLockAcquired(
92 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks);
93
94 // Schedule a OnTaskCompletedOnOriginThread callback if not already
95 // pending. Lock must already be acquired before calling this function.
96 void ScheduleOnTaskCompletedWithLockAcquired();
97 void OnTaskCompletedOnOriginThread();
98
99 // Schedule an OnIdleOnOriginThread callback if not already pending.
100 // Lock must already be acquired before calling this function.
101 void ScheduleOnIdleWithLockAcquired();
102 void OnIdleOnOriginThread();
103
104 // Overridden from base::DelegateSimpleThread:
105 virtual void Run() OVERRIDE;
106
107 // Pointer to worker pool. Can only be used on origin thread.
108 // Not guarded by |lock_|.
109 WorkerPool* worker_pool_on_origin_thread_;
110
111 // This lock protects all members of this class except
112 // |worker_pool_on_origin_thread_|. Do not read or modify anything
113 // without holding this lock. Do not block while holding this lock.
114 mutable base::Lock lock_;
115
116 // Condition variable that is waited on by worker threads until new
117 // tasks are posted or shutdown starts.
118 base::ConditionVariable has_pending_tasks_cv_;
119
120 // Target message loop used for posting callbacks.
121 scoped_refptr<base::MessageLoopProxy> origin_loop_;
122
123 base::WeakPtrFactory<Inner> weak_ptr_factory_;
124
125 // Set to true when worker pool requires a callback for each
126 // completed task.
127 bool need_on_task_completed_callback_;
128
129 const base::Closure on_task_completed_callback_;
130 // Set when a OnTaskCompletedOnOriginThread() callback is pending.
131 bool on_task_completed_pending_;
132
133 const base::Closure on_idle_callback_;
134 // Set when a OnIdleOnOriginThread() callback is pending.
135 bool on_idle_pending_;
136
137 // Provides each running thread loop with a unique index. First thread
138 // loop index is 0.
139 unsigned next_thread_index_;
140
141 // Number of tasks currently running.
142 unsigned running_task_count_;
143
144 // Set during shutdown. Tells workers to exit when no more tasks
145 // are pending.
146 bool shutdown_;
147
148 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque;
149 TaskDeque pending_tasks_;
150 TaskDeque completed_tasks_;
151
152 scoped_ptr<RenderingStats> rendering_stats_;
153
154 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
155
156 DISALLOW_COPY_AND_ASSIGN(Inner);
157 };
158
159 WorkerPool::Inner::Inner(WorkerPool* worker_pool,
160 size_t num_threads,
161 const std::string& thread_name_prefix,
162 bool need_on_task_completed_callback)
163 : worker_pool_on_origin_thread_(worker_pool),
164 lock_(),
165 has_pending_tasks_cv_(&lock_),
166 origin_loop_(base::MessageLoopProxy::current()),
167 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
168 need_on_task_completed_callback_(need_on_task_completed_callback),
169 on_task_completed_callback_(
170 base::Bind(&WorkerPool::Inner::OnTaskCompletedOnOriginThread,
171 weak_ptr_factory_.GetWeakPtr())),
172 on_task_completed_pending_(false),
173 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
174 weak_ptr_factory_.GetWeakPtr())),
175 on_idle_pending_(false),
176 next_thread_index_(0),
177 running_task_count_(0),
178 shutdown_(false) {
179 base::AutoLock lock(lock_);
180
181 while (workers_.size() < num_threads) {
182 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
183 new base::DelegateSimpleThread(
184 this,
185 thread_name_prefix +
186 StringPrintf("Worker%lu", workers_.size() + 1).c_str()));
187 worker->Start();
188 workers_.push_back(worker.Pass());
189 }
190 }
191
192 WorkerPool::Inner::~Inner() {
193 base::AutoLock lock(lock_);
194
195 DCHECK(shutdown_);
196
197 // Cancel all pending callbacks.
198 weak_ptr_factory_.InvalidateWeakPtrs();
199
200 DCHECK_EQ(pending_tasks_.size(), 0);
201 DCHECK_EQ(completed_tasks_.size(), 0);
202 DCHECK_EQ(running_task_count_, 0);
203 }
204
205 void WorkerPool::Inner::Shutdown() {
206 {
207 base::AutoLock lock(lock_);
208
209 DCHECK(!shutdown_);
210 shutdown_ = true;
211
212 // Wake up a worker so it knows it should exit. This will cause all workers
213 // to exit as each will wake up another worker before exiting.
214 has_pending_tasks_cv_.Signal();
215 }
216
217 while (workers_.size()) {
218 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
219 worker->Join();
220 }
221 }
222
223 void WorkerPool::Inner::SetRecordRenderingStats(bool record_rendering_stats) {
224 base::AutoLock lock(lock_);
225
226 if (record_rendering_stats)
227 rendering_stats_.reset(new RenderingStats);
228 else
229 rendering_stats_.reset();
230 }
231
232 void WorkerPool::Inner::GetRenderingStats(RenderingStats* stats) {
233 base::AutoLock lock(lock_);
234
235 if (rendering_stats_)
236 stats->Add(*rendering_stats_);
237 }
238
239 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
240 base::AutoLock lock(lock_);
241
242 pending_tasks_.push_back(task.Pass());
243
244 // There is more work available, so wake up worker thread.
245 has_pending_tasks_cv_.Signal();
246 }
247
248 bool WorkerPool::Inner::CollectCompletedTasks() {
249 base::AutoLock lock(lock_);
250
251 return AppendCompletedTasksWithLockAcquired(
252 &worker_pool_on_origin_thread_->completed_tasks_);
253 }
254
255 bool WorkerPool::Inner::RunCheapTasksUntilTimeLimit(
256 base::TimeTicks time_limit) {
257 base::AutoLock lock(lock_);
258
259 while (base::TimeTicks::Now() < time_limit) {
260 scoped_ptr<internal::WorkerPoolTask> task;
261
262 // Find next cheap task.
263 for (TaskDeque::iterator iter = pending_tasks_.begin();
264 iter != pending_tasks_.end(); ++iter) {
265 if ((*iter)->IsCheap()) {
266 task = pending_tasks_.take(iter);
267 break;
268 }
269 }
270
271 if (!task) {
272 // Schedule an idle callback if requested and not pending.
273 if (!running_task_count_ && pending_tasks_.empty())
274 ScheduleOnIdleWithLockAcquired();
275
276 // Exit when no more cheap tasks are pending.
277 break;
278 }
279
280 scoped_ptr<RenderingStats> rendering_stats;
281 // Collect rendering stats if |rendering_stats_| is set.
282 if (rendering_stats_)
283 rendering_stats = make_scoped_ptr(new RenderingStats);
284
285 // Increment |running_task_count_| before starting to run task.
286 running_task_count_++;
287
288 {
289 base::AutoUnlock unlock(lock_);
290
291 task->Run(rendering_stats.get());
292
293 // Append tasks directly to worker pool's completed tasks queue.
294 worker_pool_on_origin_thread_->completed_tasks_.push_back(task.Pass());
295 if (need_on_task_completed_callback_)
296 worker_pool_on_origin_thread_->OnTaskCompleted();
297 }
298
299 // Add rendering stat results to |rendering_stats_|.
300 if (rendering_stats && rendering_stats_)
301 rendering_stats_->Add(*rendering_stats);
302
303 // Decrement |running_task_count_| now that we are done running task.
304 running_task_count_--;
305 }
306
307 // Append any other completed tasks before releasing lock.
308 return AppendCompletedTasksWithLockAcquired(
309 &worker_pool_on_origin_thread_->completed_tasks_);
310 }
311
312 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
313 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
314 lock_.AssertAcquired();
315
316 while (completed_tasks_.size())
317 completed_tasks->push_back(completed_tasks_.take_front().Pass());
318
319 return !running_task_count_ && pending_tasks_.empty();
320 }
321
322 void WorkerPool::Inner::ScheduleOnTaskCompletedWithLockAcquired() {
323 lock_.AssertAcquired();
324
325 if (on_task_completed_pending_ || !need_on_task_completed_callback_)
326 return;
327 origin_loop_->PostTask(FROM_HERE, on_task_completed_callback_);
328 on_task_completed_pending_ = true;
329 }
330
331 void WorkerPool::Inner::OnTaskCompletedOnOriginThread() {
332 {
333 base::AutoLock lock(lock_);
334
335 DCHECK(on_task_completed_pending_);
336 on_task_completed_pending_ = false;
337
338 AppendCompletedTasksWithLockAcquired(
339 &worker_pool_on_origin_thread_->completed_tasks_);
340 }
341
342 worker_pool_on_origin_thread_->OnTaskCompleted();
343 }
344
345 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
346 lock_.AssertAcquired();
347
348 if (on_idle_pending_)
349 return;
350 origin_loop_->PostTask(FROM_HERE, on_idle_callback_);
351 on_idle_pending_ = true;
352 }
353
354 void WorkerPool::Inner::OnIdleOnOriginThread() {
355 {
356 base::AutoLock lock(lock_);
357
358 DCHECK(on_idle_pending_);
359 on_idle_pending_ = false;
360
361 // Early out if no longer idle.
362 if (running_task_count_ || !pending_tasks_.empty())
363 return;
364
365 AppendCompletedTasksWithLockAcquired(
366 &worker_pool_on_origin_thread_->completed_tasks_);
367 }
368
369 worker_pool_on_origin_thread_->OnIdle();
370 }
371
372 void WorkerPool::Inner::Run() {
373 #if defined(OS_ANDROID)
374 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
375 int nice_value = 10; // Idle priority.
376 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value);
377 #endif
378
379 {
380 base::AutoLock lock(lock_);
381
382 // Get a unique thread index.
383 int thread_index = next_thread_index_++;
384
385 while (true) {
386 if (pending_tasks_.empty()) {
387 // Exit when shutdown is set and no more tasks are pending.
388 if (shutdown_)
389 break;
390
391 // Schedule an idle callback if requested and not pending.
392 if (!running_task_count_)
393 ScheduleOnIdleWithLockAcquired();
394
395 // Wait for new pending tasks.
396 has_pending_tasks_cv_.Wait();
397 continue;
398 }
399
400 // Get next task.
401 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
402
403 scoped_ptr<RenderingStats> rendering_stats;
404 // Collect rendering stats if |rendering_stats_| is set.
405 if (rendering_stats_)
406 rendering_stats = make_scoped_ptr(new RenderingStats);
407
408 // Increment |running_task_count_| before starting to run task.
409 running_task_count_++;
410
411 // There may be more work available, so wake up another
412 // worker thread.
413 has_pending_tasks_cv_.Signal();
414
415 {
416 base::AutoUnlock unlock(lock_);
417
418 task->RunOnThread(rendering_stats.get(), thread_index);
419 }
420
421 completed_tasks_.push_back(task.Pass());
422
423 // Add rendering stat results to |rendering_stats_|.
424 if (rendering_stats && rendering_stats_)
425 rendering_stats_->Add(*rendering_stats);
426
427 // Decrement |running_task_count_| now that we are done running task.
428 running_task_count_--;
429
430 // Schedule a task completed callback if requested and not pending.
431 ScheduleOnTaskCompletedWithLockAcquired();
432 }
433
434 // We noticed we should exit. Wake up the next worker so it knows it should
435 // exit as well (because the Shutdown() code only signals once).
436 has_pending_tasks_cv_.Signal();
437 }
438 }
439
440 WorkerPool::WorkerPool(WorkerPoolClient* client,
441 size_t num_threads,
442 base::TimeDelta check_for_completed_tasks_delay,
443 const std::string& thread_name_prefix)
444 : client_(client),
445 origin_loop_(base::MessageLoopProxy::current()),
446 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
447 check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
448 check_for_completed_tasks_pending_(false),
449 run_cheap_tasks_callback_(
450 base::Bind(&WorkerPool::RunCheapTasks,
451 weak_ptr_factory_.GetWeakPtr())),
452 run_cheap_tasks_pending_(false),
453 inner_(make_scoped_ptr(
454 new Inner(
455 this,
456 num_threads,
457 thread_name_prefix,
458 // Request OnTaskCompleted() callback when check
459 // for completed tasks delay is 0.
460 check_for_completed_tasks_delay == base::TimeDelta()))) {
461 }
462
463 WorkerPool::~WorkerPool() {
464 Shutdown();
465
466 // Cancel all pending callbacks.
467 weak_ptr_factory_.InvalidateWeakPtrs();
468
469 DCHECK_EQ(completed_tasks_.size(), 0);
470 }
471
472 void WorkerPool::Shutdown() {
473 inner_->Shutdown();
474 DispatchCompletionCallbacks();
475 }
476
477 void WorkerPool::PostTaskAndReply(
478 const Callback& task, const base::Closure& reply) {
479 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
480 task,
481 reply)).PassAs<internal::WorkerPoolTask>());
482 }
483
484 void WorkerPool::SetRunCheapTasksTimeLimit(
485 base::TimeTicks run_cheap_tasks_time_limit) {
486 run_cheap_tasks_time_limit_ = run_cheap_tasks_time_limit;
487 ScheduleRunCheapTasks();
488 }
489
490 void WorkerPool::SetRecordRenderingStats(bool record_rendering_stats) {
491 inner_->SetRecordRenderingStats(record_rendering_stats);
492 }
493
494 void WorkerPool::GetRenderingStats(RenderingStats* stats) {
495 inner_->GetRenderingStats(stats);
496 }
497
498 void WorkerPool::OnIdle() {
499 TRACE_EVENT0("cc", "WorkerPool::OnIdle");
500
501 DispatchCompletionCallbacks();
502 }
503
504 void WorkerPool::OnTaskCompleted() {
505 TRACE_EVENT0("cc", "WorkerPool::OnTaskCompleted");
506
507 DispatchCompletionCallbacks();
508 }
509
510 void WorkerPool::ScheduleCheckForCompletedTasks() {
511 if (check_for_completed_tasks_pending_ ||
512 check_for_completed_tasks_delay_ == base::TimeDelta())
513 return;
514 check_for_completed_tasks_callback_.Reset(
515 base::Bind(&WorkerPool::CheckForCompletedTasks,
516 weak_ptr_factory_.GetWeakPtr()));
517 check_for_completed_tasks_time_ = base::TimeTicks::Now() +
518 check_for_completed_tasks_delay_;
519 origin_loop_->PostDelayedTask(
520 FROM_HERE,
521 check_for_completed_tasks_callback_.callback(),
522 check_for_completed_tasks_delay_);
523 check_for_completed_tasks_pending_ = true;
524 }
525
526 void WorkerPool::CheckForCompletedTasks() {
527 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
528 DCHECK(check_for_completed_tasks_pending_);
529 check_for_completed_tasks_pending_ = false;
530
531 // Schedule another check for completed tasks if not idle.
532 if (!inner_->CollectCompletedTasks())
533 ScheduleCheckForCompletedTasks();
534
535 DispatchCompletionCallbacks();
536 }
537
538 void WorkerPool::CancelCheckForCompletedTasks() {
539 if (!check_for_completed_tasks_pending_)
540 return;
541
542 check_for_completed_tasks_callback_.Cancel();
543 check_for_completed_tasks_pending_ = false;
544 }
545
546 void WorkerPool::DispatchCompletionCallbacks() {
547 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
548
549 if (completed_tasks_.empty())
550 return;
551
552 while (completed_tasks_.size()) {
553 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front();
554 task->DidComplete();
555 }
556
557 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
558 }
559
560 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
561 if (task->IsCheap())
562 ScheduleRunCheapTasks();
563
564 // Schedule check for completed tasks if not pending.
565 ScheduleCheckForCompletedTasks();
566
567 inner_->PostTask(task.Pass());
568 }
569
570 void WorkerPool::ScheduleRunCheapTasks() {
571 if (run_cheap_tasks_pending_)
572 return;
573 origin_loop_->PostTask(FROM_HERE, run_cheap_tasks_callback_);
574 run_cheap_tasks_pending_ = true;
575 }
576
577 void WorkerPool::RunCheapTasks() {
578 TRACE_EVENT0("cc", "WorkerPool::RunCheapTasks");
579 DCHECK(run_cheap_tasks_pending_);
580 run_cheap_tasks_pending_ = false;
581
582 while (true) {
583 base::TimeTicks time_limit = run_cheap_tasks_time_limit_;
584
585 if (!check_for_completed_tasks_time_.is_null())
586 time_limit = std::min(time_limit, check_for_completed_tasks_time_);
587
588 bool is_idle = inner_->RunCheapTasksUntilTimeLimit(time_limit);
589
590 if (base::TimeTicks::Now() >= run_cheap_tasks_time_limit_) {
591 TRACE_EVENT_INSTANT0("cc", "WorkerPool::RunCheapTasks out of time");
592 break;
593 }
594
595 // We must be out of cheap tasks if this happens.
596 if (check_for_completed_tasks_time_.is_null() ||
597 base::TimeTicks::Now() < run_cheap_tasks_time_limit_)
598 break;
599
600 TRACE_EVENT_INSTANT0("cc", "WorkerPool::RunCheapTasks check time");
601 CancelCheckForCompletedTasks();
602 DispatchCompletionCallbacks();
603 // Schedule another check for completed tasks if not idle.
604 if (!is_idle)
605 ScheduleCheckForCompletedTasks();
606 }
607 }
608
609 } // namespace cc
OLDNEW
« cc/cc.gyp ('K') | « cc/worker_pool.h ('k') | cc/yuv_video_draw_quad.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698