| Index: cc/worker_pool.cc
|
| diff --git a/cc/worker_pool.cc b/cc/worker_pool.cc
|
| index 7d3428aa74d2f8bde286b73cb418549fb1504a70..b9c8c4a4359c3c90d5e9c5a83762cb8f95c6bda2 100644
|
| --- a/cc/worker_pool.cc
|
| +++ b/cc/worker_pool.cc
|
| @@ -28,6 +28,7 @@ class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
|
|
|
| virtual void Run(RenderingStats* rendering_stats) OVERRIDE {
|
| task_.Run(rendering_stats);
|
| + base::subtle::Release_Store(&completed_, 1);
|
| }
|
|
|
| private:
|
| @@ -36,22 +37,25 @@ class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
|
|
|
| const char* kWorkerThreadNamePrefix = "Compositor";
|
|
|
| -// Allow two pending tasks per worker. This keeps resource usage
|
| -// low while making sure workers aren't unnecessarily idle.
|
| -const int kNumPendingTasksPerWorker = 2;
|
| +const int kCheckForCompletedTasksDelayMs = 6;
|
|
|
| } // namespace
|
|
|
| namespace internal {
|
|
|
| -WorkerPoolTask::WorkerPoolTask(const base::Closure& reply)
|
| - : reply_(reply) {
|
| +WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
|
| + base::subtle::Acquire_Store(&completed_, 0);
|
| }
|
|
|
| WorkerPoolTask::~WorkerPoolTask() {
|
| }
|
|
|
| -void WorkerPoolTask::Completed() {
|
| +bool WorkerPoolTask::HasCompleted() {
|
| + return base::subtle::Acquire_Load(&completed_) == 0;
|
| +}
|
| +
|
| +void WorkerPoolTask::DidComplete() {
|
| + DCHECK_EQ(base::subtle::Acquire_Load(&completed_), 1);
|
| reply_.Run();
|
| }
|
|
|
| @@ -60,7 +64,6 @@ void WorkerPoolTask::Completed() {
|
| WorkerPool::Worker::Worker(WorkerPool* worker_pool, const std::string name)
|
| : base::Thread(name.c_str()),
|
| worker_pool_(worker_pool),
|
| - weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
|
| rendering_stats_(make_scoped_ptr(new RenderingStats)),
|
| record_rendering_stats_(false) {
|
| Start();
|
| @@ -80,27 +83,22 @@ void WorkerPool::Worker::StopAfterCompletingAllPendingTasks() {
|
| // all tasks have finished running.
|
| while (!pending_tasks_.empty())
|
| OnTaskCompleted();
|
| -
|
| - // Cancel all pending replies.
|
| - weak_ptr_factory_.InvalidateWeakPtrs();
|
| }
|
|
|
| void WorkerPool::Worker::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
|
| - DCHECK_LT(num_pending_tasks(), kNumPendingTasksPerWorker);
|
| -
|
| RenderingStats* stats =
|
| record_rendering_stats_ ? rendering_stats_.get() : NULL;
|
|
|
| - message_loop_proxy()->PostTaskAndReply(
|
| + worker_pool_->WillPostTask();
|
| +
|
| + message_loop_proxy()->PostTask(
|
| FROM_HERE,
|
| base::Bind(&Worker::RunTask,
|
| base::Unretained(task.get()),
|
| - base::Unretained(stats)),
|
| - base::Bind(&Worker::OnTaskCompleted, weak_ptr_factory_.GetWeakPtr()));
|
| + base::Unretained(worker_pool_),
|
| + base::Unretained(stats)));
|
|
|
| pending_tasks_.push_back(task.Pass());
|
| -
|
| - worker_pool_->DidNumPendingTasksChange();
|
| }
|
|
|
| void WorkerPool::Worker::Init() {
|
| @@ -113,22 +111,43 @@ void WorkerPool::Worker::Init() {
|
|
|
| // static
|
| void WorkerPool::Worker::RunTask(
|
| - internal::WorkerPoolTask* task, RenderingStats* rendering_stats) {
|
| + internal::WorkerPoolTask* task,
|
| + WorkerPool* worker_pool,
|
| + RenderingStats* rendering_stats) {
|
| task->Run(rendering_stats);
|
| + worker_pool->OnWorkCompletedOnWorkerThread();
|
| }
|
|
|
| void WorkerPool::Worker::OnTaskCompleted() {
|
| CHECK(!pending_tasks_.empty());
|
|
|
| scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
|
| - task->Completed();
|
|
|
| - worker_pool_->DidNumPendingTasksChange();
|
| + // Notify worker pool of task completion.
|
| + worker_pool_->OnTaskCompleted();
|
| +
|
| + task->DidComplete();
|
| +}
|
| +
|
| +void WorkerPool::Worker::CheckForCompletedTasks() {
|
| + while (!pending_tasks_.empty()) {
|
| + if (pending_tasks_.front()->HasCompleted())
|
| + return;
|
| +
|
| + OnTaskCompleted();
|
| + }
|
| }
|
|
|
| -WorkerPool::WorkerPool(size_t num_threads)
|
| - : workers_need_sorting_(false),
|
| - shutdown_(false) {
|
| +WorkerPool::WorkerPool(WorkerPoolClient* client, size_t num_threads)
|
| + : client_(client),
|
| + origin_loop_(base::MessageLoopProxy::current()),
|
| + weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
|
| + workers_need_sorting_(false),
|
| + pending_task_count_(0),
|
| + shutdown_(false),
|
| + check_for_completed_tasks_pending_(false),
|
| + idle_callback_(
|
| + base::Bind(&WorkerPool::OnIdle, weak_ptr_factory_.GetWeakPtr())) {
|
| const std::string thread_name_prefix = kWorkerThreadNamePrefix;
|
| while (workers_.size() < num_threads) {
|
| int thread_number = workers_.size() + 1;
|
| @@ -136,11 +155,15 @@ WorkerPool::WorkerPool(size_t num_threads)
|
| this,
|
| thread_name_prefix + StringPrintf("Worker%d", thread_number).c_str()));
|
| }
|
| + base::subtle::Acquire_Store(&pending_task_count_, 0);
|
| }
|
|
|
| WorkerPool::~WorkerPool() {
|
| Shutdown();
|
| STLDeleteElements(&workers_);
|
| + // Cancel all pending callbacks.
|
| + weak_ptr_factory_.InvalidateWeakPtrs();
|
| + DCHECK_EQ(base::subtle::Acquire_Load(&pending_task_count_), 0);
|
| }
|
|
|
| void WorkerPool::Shutdown() {
|
| @@ -164,12 +187,6 @@ void WorkerPool::PostTaskAndReply(
|
| reply)).PassAs<internal::WorkerPoolTask>());
|
| }
|
|
|
| -bool WorkerPool::IsBusy() {
|
| - Worker* worker = GetWorkerForNextTask();
|
| -
|
| - return worker->num_pending_tasks() >= kNumPendingTasksPerWorker;
|
| -}
|
| -
|
| void WorkerPool::SetRecordRenderingStats(bool record_rendering_stats) {
|
| for (WorkerVector::iterator it = workers_.begin();
|
| it != workers_.end(); ++it) {
|
| @@ -204,7 +221,56 @@ WorkerPool::Worker* WorkerPool::GetWorkerForNextTask() {
|
| return workers_.front();
|
| }
|
|
|
| -void WorkerPool::DidNumPendingTasksChange() {
|
| +void WorkerPool::ScheduleCheckForCompletedTasks() {
|
| + if (check_for_completed_tasks_pending_)
|
| + return;
|
| +
|
| + check_for_completed_tasks_callback_.Reset(
|
| + base::Bind(&WorkerPool::CheckForCompletedTasks,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| + origin_loop_->PostDelayedTask(
|
| + FROM_HERE,
|
| + check_for_completed_tasks_callback_.callback(),
|
| + base::TimeDelta::FromMilliseconds(kCheckForCompletedTasksDelayMs));
|
| + check_for_completed_tasks_pending_ = true;
|
| +}
|
| +
|
| +void WorkerPool::WillPostTask() {
|
| + base::subtle::Barrier_AtomicIncrement(&pending_task_count_, 1);
|
| + ScheduleCheckForCompletedTasks();
|
| + workers_need_sorting_ = true;
|
| +}
|
| +
|
| +void WorkerPool::OnWorkCompletedOnWorkerThread() {
|
| + // Post idle handler task when pool work count reaches 0.
|
| + if (base::subtle::Barrier_AtomicIncrement(&pending_task_count_, -1) == 0) {
|
| + origin_loop_->PostTask(FROM_HERE, idle_callback_);
|
| + }
|
| +}
|
| +
|
| +void WorkerPool::OnIdle() {
|
| + if (base::subtle::Acquire_Load(&pending_task_count_) == 0) {
|
| + check_for_completed_tasks_callback_.Cancel();
|
| + CheckForCompletedTasks();
|
| + }
|
| +}
|
| +
|
| +void WorkerPool::CheckForCompletedTasks() {
|
| + check_for_completed_tasks_pending_ = false;
|
| +
|
| + for (WorkerVector::iterator it = workers_.begin();
|
| + it != workers_.end(); it++) {
|
| + Worker* worker = *it;
|
| + worker->CheckForCompletedTasks();
|
| + }
|
| +
|
| + client_->DidFinishDispatchingCompletionCallbacks();
|
| +
|
| + if (base::subtle::Acquire_Load(&pending_task_count_))
|
| + ScheduleCheckForCompletedTasks();
|
| +}
|
| +
|
| +void WorkerPool::OnTaskCompleted() {
|
| workers_need_sorting_ = true;
|
| }
|
|
|
|
|