Chromium Code Reviews| Index: cc/worker_pool.cc |
| diff --git a/cc/worker_pool.cc b/cc/worker_pool.cc |
| index 7d3428aa74d2f8bde286b73cb418549fb1504a70..ea572d9b7d29631f26ce27ee0e18fb074cdb647f 100644 |
| --- a/cc/worker_pool.cc |
| +++ b/cc/worker_pool.cc |
| @@ -28,6 +28,7 @@ class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
| virtual void Run(RenderingStats* rendering_stats) OVERRIDE { |
| task_.Run(rendering_stats); |
| + base::subtle::Release_Store(&completed_, 1); |
| } |
| private: |
| @@ -36,22 +37,31 @@ class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
| const char* kWorkerThreadNamePrefix = "Compositor"; |
| -// Allow two pending tasks per worker. This keeps resource usage |
| -// low while making sure workers aren't unnecessarily idle. |
| -const int kNumPendingTasksPerWorker = 2; |
| +#if defined(OS_ANDROID) |
| +const int kNumPendingTasksPerWorker = 8; |
| +#else |
| +const int kNumPendingTasksPerWorker = 40; |
| +#endif |
| + |
| +const int kCheckForCompletedTasksDelayMs = 6; |
| } // namespace |
| namespace internal { |
| -WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) |
| - : reply_(reply) { |
| +WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { |
| + base::subtle::Acquire_Store(&completed_, 0); |
| } |
| WorkerPoolTask::~WorkerPoolTask() { |
| } |
| -void WorkerPoolTask::Completed() { |
| +bool WorkerPoolTask::HasCompleted() { |
| + return base::subtle::Acquire_Load(&completed_) == 1; |
| +} |
| + |
| +void WorkerPoolTask::DidComplete() { |
| + DCHECK_EQ(base::subtle::Acquire_Load(&completed_), 1); |
| reply_.Run(); |
| } |
| @@ -60,7 +70,6 @@ void WorkerPoolTask::Completed() { |
| WorkerPool::Worker::Worker(WorkerPool* worker_pool, const std::string name) |
| : base::Thread(name.c_str()), |
| worker_pool_(worker_pool), |
| - weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| rendering_stats_(make_scoped_ptr(new RenderingStats)), |
| record_rendering_stats_(false) { |
| Start(); |
| @@ -80,27 +89,22 @@ void WorkerPool::Worker::StopAfterCompletingAllPendingTasks() { |
| // all tasks have finished running. |
| while (!pending_tasks_.empty()) |
| OnTaskCompleted(); |
| - |
| - // Cancel all pending replies. |
| - weak_ptr_factory_.InvalidateWeakPtrs(); |
| } |
| void WorkerPool::Worker::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
| - DCHECK_LT(num_pending_tasks(), kNumPendingTasksPerWorker); |
| - |
| RenderingStats* stats = |
| record_rendering_stats_ ? rendering_stats_.get() : NULL; |
| - message_loop_proxy()->PostTaskAndReply( |
| + worker_pool_->WillPostTask(); |
| + |
| + message_loop_proxy()->PostTask( |
| FROM_HERE, |
| base::Bind(&Worker::RunTask, |
| base::Unretained(task.get()), |
| - base::Unretained(stats)), |
| - base::Bind(&Worker::OnTaskCompleted, weak_ptr_factory_.GetWeakPtr())); |
| + base::Unretained(worker_pool_), |
| + base::Unretained(stats))); |
| pending_tasks_.push_back(task.Pass()); |
| - |
| - worker_pool_->DidNumPendingTasksChange(); |
| } |
| void WorkerPool::Worker::Init() { |
| @@ -113,22 +117,43 @@ void WorkerPool::Worker::Init() { |
| // static |
| void WorkerPool::Worker::RunTask( |
| - internal::WorkerPoolTask* task, RenderingStats* rendering_stats) { |
| + internal::WorkerPoolTask* task, |
| + WorkerPool* worker_pool, |
| + RenderingStats* rendering_stats) { |
| task->Run(rendering_stats); |
| + worker_pool->OnWorkCompletedOnWorkerThread(); |
| } |
| void WorkerPool::Worker::OnTaskCompleted() { |
| CHECK(!pending_tasks_.empty()); |
| scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
| - task->Completed(); |
| - worker_pool_->DidNumPendingTasksChange(); |
| + // Notify worker pool of task completion. |
| + worker_pool_->OnTaskCompleted(); |
| + |
| + task->DidComplete(); |
| } |
| -WorkerPool::WorkerPool(size_t num_threads) |
| - : workers_need_sorting_(false), |
| - shutdown_(false) { |
| +void WorkerPool::Worker::CheckForCompletedTasks() { |
| + while (!pending_tasks_.empty()) { |
| + if (!pending_tasks_.front()->HasCompleted()) |
| + return; |
| + |
| + OnTaskCompleted(); |
| + } |
| +} |
| + |
| +WorkerPool::WorkerPool(WorkerPoolClient* client, size_t num_threads) |
| + : client_(client), |
| + origin_loop_(base::MessageLoopProxy::current()), |
| + weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| + workers_need_sorting_(false), |
| + pending_task_count_(0), |
| + shutdown_(false), |
| + check_for_completed_tasks_pending_(false), |
| + idle_callback_( |
| + base::Bind(&WorkerPool::OnIdle, weak_ptr_factory_.GetWeakPtr())) { |
| const std::string thread_name_prefix = kWorkerThreadNamePrefix; |
| while (workers_.size() < num_threads) { |
| int thread_number = workers_.size() + 1; |
| @@ -136,11 +161,15 @@ WorkerPool::WorkerPool(size_t num_threads) |
| this, |
| thread_name_prefix + StringPrintf("Worker%d", thread_number).c_str())); |
| } |
| + base::subtle::Acquire_Store(&pending_task_count_, 0); |
| } |
| WorkerPool::~WorkerPool() { |
| Shutdown(); |
| STLDeleteElements(&workers_); |
| + // Cancel all pending callbacks. |
| + weak_ptr_factory_.InvalidateWeakPtrs(); |
| + DCHECK_EQ(base::subtle::Acquire_Load(&pending_task_count_), 0); |
| } |
| void WorkerPool::Shutdown() { |
| @@ -204,7 +233,62 @@ WorkerPool::Worker* WorkerPool::GetWorkerForNextTask() { |
| return workers_.front(); |
| } |
| -void WorkerPool::DidNumPendingTasksChange() { |
| +void WorkerPool::ScheduleCheckForCompletedTasks() { |
| + if (check_for_completed_tasks_pending_) |
| + return; |
| + |
| + check_for_completed_tasks_callback_.Reset( |
| + base::Bind(&WorkerPool::CheckForCompletedTasks, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + origin_loop_->PostDelayedTask( |
| + FROM_HERE, |
| + check_for_completed_tasks_callback_.callback(), |
| + base::TimeDelta::FromMilliseconds(kCheckForCompletedTasksDelayMs)); |
| + check_for_completed_tasks_pending_ = true; |
| +} |
| + |
| +void WorkerPool::WillPostTask() { |
| + base::subtle::Barrier_AtomicIncrement(&pending_task_count_, 1); |
| + ScheduleCheckForCompletedTasks(); |
| + workers_need_sorting_ = true; |
| +} |
| + |
| +void WorkerPool::OnWorkCompletedOnWorkerThread() { |
| + // Post idle handler task when pool work count reaches 0. |
| + if (base::subtle::Barrier_AtomicIncrement(&pending_task_count_, -1) == 0) { |
| + origin_loop_->PostTask(FROM_HERE, idle_callback_); |
| + } |
| +} |
| + |
| +void WorkerPool::OnIdle() { |
| + if (base::subtle::Acquire_Load(&pending_task_count_) == 0) { |
| + check_for_completed_tasks_callback_.Cancel(); |
| + CheckForCompletedTasks(); |
| + } |
| +} |
| + |
| +void WorkerPool::CheckForCompletedTasks() { |
| + check_for_completed_tasks_pending_ = false; |
| + |
| + for (WorkerVector::iterator it = workers_.begin(); |
| + it != workers_.end(); it++) { |
| + Worker* worker = *it; |
| + worker->CheckForCompletedTasks(); |
| + } |
| + |
| + client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
| + |
| + for (WorkerVector::iterator it = workers_.begin(); |
| + it != workers_.end(); it++) { |
| + Worker* worker = *it; |
| + if (worker->num_pending_tasks()) { |
|
brianderson
2013/02/14 01:12:07
No race condition and no unnecessary checks schedu
|
| + ScheduleCheckForCompletedTasks(); |
| + break; |
| + } |
| + } |
| +} |
| + |
| +void WorkerPool::OnTaskCompleted() { |
| workers_need_sorting_ = true; |
| } |