Index: cc/worker_pool.cc |
diff --git a/cc/worker_pool.cc b/cc/worker_pool.cc |
index 7d3428aa74d2f8bde286b73cb418549fb1504a70..53fa235b63719c5dda272b05c691ab35189fddae 100644 |
--- a/cc/worker_pool.cc |
+++ b/cc/worker_pool.cc |
@@ -22,12 +22,13 @@ namespace { |
class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
public: |
WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
- const base::Closure& reply) |
+ const base::Callback<void(bool)>& reply) |
: internal::WorkerPoolTask(reply), |
task_(task) {} |
virtual void Run(RenderingStats* rendering_stats) OVERRIDE { |
task_.Run(rendering_stats); |
+ base::subtle::Release_Store(&completed_, 1); |
} |
private: |
@@ -36,23 +37,27 @@ class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
const char* kWorkerThreadNamePrefix = "Compositor"; |
-// Allow two pending tasks per worker. This keeps resource usage |
-// low while making sure workers aren't unnecessarily idle. |
-const int kNumPendingTasksPerWorker = 2; |
+const int kCheckForCompletedTasksDelayMs = 6; |
} // namespace |
namespace internal { |
-WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) |
+WorkerPoolTask::WorkerPoolTask(const base::Callback<void(bool)>& reply) |
: reply_(reply) { |
+ base::subtle::Acquire_Store(&completed_, 0); |
} |
WorkerPoolTask::~WorkerPoolTask() { |
} |
-void WorkerPoolTask::Completed() { |
- reply_.Run(); |
+bool WorkerPoolTask::IsPending() { |
+ return base::subtle::Acquire_Load(&completed_) == 0; |
+} |
+ |
+void WorkerPoolTask::Completed(bool more_tasks_completed) { |
+ DCHECK_EQ(base::subtle::Acquire_Load(&completed_), 1); |
+ reply_.Run(more_tasks_completed); |
} |
} // namespace internal |
@@ -60,7 +65,6 @@ void WorkerPoolTask::Completed() { |
WorkerPool::Worker::Worker(WorkerPool* worker_pool, const std::string name) |
: base::Thread(name.c_str()), |
worker_pool_(worker_pool), |
- weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
rendering_stats_(make_scoped_ptr(new RenderingStats)), |
record_rendering_stats_(false) { |
Start(); |
@@ -80,27 +84,22 @@ void WorkerPool::Worker::StopAfterCompletingAllPendingTasks() { |
// all tasks have finished running. |
while (!pending_tasks_.empty()) |
OnTaskCompleted(); |
- |
- // Cancel all pending replies. |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
} |
void WorkerPool::Worker::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
- DCHECK_LT(num_pending_tasks(), kNumPendingTasksPerWorker); |
- |
RenderingStats* stats = |
record_rendering_stats_ ? rendering_stats_.get() : NULL; |
- message_loop_proxy()->PostTaskAndReply( |
+ worker_pool_->WillPostWorkTask(); |
+ |
+ message_loop_proxy()->PostTask( |
FROM_HERE, |
base::Bind(&Worker::RunTask, |
base::Unretained(task.get()), |
- base::Unretained(stats)), |
- base::Bind(&Worker::OnTaskCompleted, weak_ptr_factory_.GetWeakPtr())); |
+ base::Unretained(worker_pool_), |
+ base::Unretained(stats))); |
pending_tasks_.push_back(task.Pass()); |
- |
- worker_pool_->DidNumPendingTasksChange(); |
} |
void WorkerPool::Worker::Init() { |
@@ -113,22 +112,41 @@ void WorkerPool::Worker::Init() { |
// static |
void WorkerPool::Worker::RunTask( |
- internal::WorkerPoolTask* task, RenderingStats* rendering_stats) { |
+ internal::WorkerPoolTask* task, |
+ WorkerPool* worker_pool, |
+ RenderingStats* rendering_stats) { |
task->Run(rendering_stats); |
+ worker_pool->OnWorkCompletedOnWorkerThread(); |
} |
void WorkerPool::Worker::OnTaskCompleted() { |
CHECK(!pending_tasks_.empty()); |
+ // Notify worker pool of task completion. |
+ worker_pool_->OnTaskCompleted(); |
+ |
scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
- task->Completed(); |
+ task->Completed(worker_pool_->MoreTasksCompleted()); |
+} |
+ |
+void WorkerPool::Worker::CheckForCompletedTasks() { |
+ while (!pending_tasks_.empty()) { |
nduca
2013/02/13 09:06:29
any reason we return tasks fifo? doesnt seem requi
reveman
2013/02/13 15:32:57
not really but they will complete in fifo order on
|
+ if (pending_tasks_.front()->IsPending()) |
+ return; |
- worker_pool_->DidNumPendingTasksChange(); |
+ OnTaskCompleted(); |
+ } |
} |
WorkerPool::WorkerPool(size_t num_threads) |
- : workers_need_sorting_(false), |
- shutdown_(false) { |
+ : origin_loop_(base::MessageLoopProxy::current()), |
+ weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
+ workers_need_sorting_(false), |
+ task_count_(0), |
+ shutdown_(false), |
+ check_for_completed_tasks_pending_(false), |
+ idle_callback_( |
+ base::Bind(&WorkerPool::OnIdle, weak_ptr_factory_.GetWeakPtr())) { |
const std::string thread_name_prefix = kWorkerThreadNamePrefix; |
while (workers_.size() < num_threads) { |
int thread_number = workers_.size() + 1; |
@@ -136,11 +154,16 @@ WorkerPool::WorkerPool(size_t num_threads) |
this, |
thread_name_prefix + StringPrintf("Worker%d", thread_number).c_str())); |
} |
+ base::subtle::Acquire_Store(&work_count_, 0); |
} |
WorkerPool::~WorkerPool() { |
Shutdown(); |
STLDeleteElements(&workers_); |
+ // Cancel all pending callbacks. |
+ weak_ptr_factory_.InvalidateWeakPtrs(); |
+ DCHECK_EQ(base::subtle::Acquire_Load(&work_count_), 0); |
+ DCHECK_EQ(task_count_, 0); |
} |
void WorkerPool::Shutdown() { |
@@ -154,8 +177,7 @@ void WorkerPool::Shutdown() { |
} |
} |
-void WorkerPool::PostTaskAndReply( |
- const Callback& task, const base::Closure& reply) { |
+void WorkerPool::PostTaskAndReply(const Callback& task, const Reply& reply) { |
Worker* worker = GetWorkerForNextTask(); |
worker->PostTask( |
@@ -164,12 +186,6 @@ void WorkerPool::PostTaskAndReply( |
reply)).PassAs<internal::WorkerPoolTask>()); |
} |
-bool WorkerPool::IsBusy() { |
- Worker* worker = GetWorkerForNextTask(); |
- |
- return worker->num_pending_tasks() >= kNumPendingTasksPerWorker; |
-} |
- |
void WorkerPool::SetRecordRenderingStats(bool record_rendering_stats) { |
for (WorkerVector::iterator it = workers_.begin(); |
it != workers_.end(); ++it) { |
@@ -204,8 +220,61 @@ WorkerPool::Worker* WorkerPool::GetWorkerForNextTask() { |
return workers_.front(); |
} |
-void WorkerPool::DidNumPendingTasksChange() { |
+void WorkerPool::ScheduleCheckForCompletedTasks() { |
+ if (check_for_completed_tasks_pending_) |
+ return; |
+ |
+ check_for_completed_tasks_callback_.Reset( |
+ base::Bind(&WorkerPool::CheckForCompletedTasks, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ origin_loop_->PostDelayedTask( |
+ FROM_HERE, |
+ check_for_completed_tasks_callback_.callback(), |
+ base::TimeDelta::FromMilliseconds(kCheckForCompletedTasksDelayMs)); |
+ check_for_completed_tasks_pending_ = true; |
+} |
+ |
+void WorkerPool::WillPostWorkTask() { |
+ base::subtle::Barrier_AtomicIncrement(&work_count_, 1); |
+ ScheduleCheckForCompletedTasks(); |
workers_need_sorting_ = true; |
+ ++task_count_; |
+} |
+ |
+void WorkerPool::OnWorkCompletedOnWorkerThread() { |
+ // Post idle handler task when pool work count reaches 0. |
+ if (base::subtle::Barrier_AtomicIncrement(&work_count_, -1) == 0) { |
nduca
2013/02/13 09:06:29
Do we not have atomicdec?
reveman
2013/02/13 15:32:57
no, but Barrier_AtomicIncrement(-1) is commonly us
|
+ origin_loop_->PostTask(FROM_HERE, idle_callback_); |
+ } |
+} |
+ |
+void WorkerPool::OnIdle() { |
+ if (base::subtle::Acquire_Load(&work_count_) == 0) { |
+ check_for_completed_tasks_callback_.Cancel(); |
+ CheckForCompletedTasks(); |
+ } |
+} |
+ |
+void WorkerPool::CheckForCompletedTasks() { |
+ check_for_completed_tasks_pending_ = false; |
+ |
+ for (WorkerVector::iterator it = workers_.begin(); |
+ it != workers_.end(); it++) { |
+ Worker* worker = *it; |
+ worker->CheckForCompletedTasks(); |
+ } |
+ |
+ if (task_count_) |
+ ScheduleCheckForCompletedTasks(); |
+} |
+ |
+void WorkerPool::OnTaskCompleted() { |
+ workers_need_sorting_ = true; |
+ --task_count_; |
+} |
+ |
+bool WorkerPool::MoreTasksCompleted() { |
+ return base::subtle::Acquire_Load(&work_count_) < task_count_; |
} |
void WorkerPool::SortWorkersIfNeeded() { |