| Index: cc/base/worker_pool.cc
|
| diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc
|
| index be9abf9b87d33e6c0b3fc2d473864568a84070fd..3d05afa1725345a6226cb596f5a34c14309a9442 100644
|
| --- a/cc/base/worker_pool.cc
|
| +++ b/cc/base/worker_pool.cc
|
| @@ -9,49 +9,43 @@
|
| #include <sys/resource.h>
|
| #endif
|
|
|
| -#include <algorithm>
|
| +#include <set>
|
|
|
| #include "base/bind.h"
|
| #include "base/debug/trace_event.h"
|
| #include "base/stringprintf.h"
|
| #include "base/synchronization/condition_variable.h"
|
| #include "base/threading/simple_thread.h"
|
| +#include "cc/base/scoped_ptr_deque.h"
|
|
|
| namespace cc {
|
|
|
| namespace {
|
|
|
| -class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
|
| +class WorkerPoolTaskGraphImpl : public internal::WorkerPoolTaskGraph {
|
| public:
|
| - WorkerPoolTaskImpl(const WorkerPool::Callback& task,
|
| - const base::Closure& reply)
|
| - : internal::WorkerPoolTask(reply),
|
| - task_(task) {}
|
| + WorkerPoolTaskGraphImpl() {}
|
|
|
| - virtual void RunOnThread(unsigned thread_index) OVERRIDE {
|
| - task_.Run();
|
| + // Overridden from internal::WorkerPoolTaskGraph:
|
| + virtual bool HasMoreTasks() OVERRIDE { return false; }
|
| + virtual bool HasTask(internal::WorkerPoolTask* task) OVERRIDE {
|
| + return false;
|
| + }
|
| + virtual internal::WorkerPoolTask* TopTask() OVERRIDE {
|
| + NOTREACHED();
|
| + return NULL;
|
| + }
|
| + virtual scoped_refptr<internal::WorkerPoolTask> TakeTask(
|
| + internal::WorkerPoolTask* task) OVERRIDE {
|
| + return make_scoped_refptr(task);
|
| }
|
|
|
| private:
|
| - WorkerPool::Callback task_;
|
| + virtual ~WorkerPoolTaskGraphImpl() {}
|
| };
|
|
|
| } // namespace
|
|
|
| -namespace internal {
|
| -
|
| -WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
|
| -}
|
| -
|
| -WorkerPoolTask::~WorkerPoolTask() {
|
| -}
|
| -
|
| -void WorkerPoolTask::DidComplete() {
|
| - reply_.Run();
|
| -}
|
| -
|
| -} // namespace internal
|
| -
|
| // Internal to the worker pool. Any data or logic that needs to be
|
| // shared between threads lives in this class. All members are guarded
|
| // by |lock_|.
|
| @@ -64,17 +58,22 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
|
|
|
| void Shutdown();
|
|
|
| - void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
|
| + // Schedule running of tasks in |task_graph|. All tasks previously
|
| + // scheduled but not present in |task_graph| will be canceled unless
|
| + // already running. Canceled tasks are moved to |completed_tasks_|
|
| + // without being run. The result is that once scheduled, a task is
|
| + // guaranteed to end up in the |completed_tasks_| queue even if they
|
| + // later get canceled by another call to ScheduleTasks().
|
| + void ScheduleTasks(scoped_ptr<internal::WorkerPoolTaskGraph> task_graph);
|
|
|
| - // Appends all completed tasks to worker pool's completed tasks queue
|
| - // and returns true if idle.
|
| - bool CollectCompletedTasks();
|
| + // Collect all completed tasks in |completed_tasks|. Returns true if idle.
|
| + bool CollectCompletedTasks(TaskDeque* completed_tasks);
|
|
|
| private:
|
| - // Appends all completed tasks to |completed_tasks|. Lock must
|
| - // already be acquired before calling this function.
|
| - bool AppendCompletedTasksWithLockAcquired(
|
| - ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks);
|
| + // Collect all completed tasks by swapping the contents of
|
| + // |completed_tasks| and |completed_tasks_|. Lock must be acquired
|
| + // before calling this function. Returns true if idle.
|
| + bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
|
|
|
| // Schedule an OnIdleOnOriginThread callback if not already pending.
|
| // Lock must already be acquired before calling this function.
|
| @@ -110,15 +109,18 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
|
| // loop index is 0.
|
| unsigned next_thread_index_;
|
|
|
| - // Number of tasks currently running.
|
| - unsigned running_task_count_;
|
| -
|
| // Set during shutdown. Tells workers to exit when no more tasks
|
| // are pending.
|
| bool shutdown_;
|
|
|
| - typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque;
|
| - TaskDeque pending_tasks_;
|
| + // The task graph. Provides tasks in order of priority.
|
| + scoped_ptr<internal::WorkerPoolTaskGraph> task_graph_;
|
| +
|
| + // This set contains all currently running tasks.
|
| + typedef std::set<internal::WorkerPoolTask*> TaskSet;
|
| + TaskSet running_tasks_;
|
| +
|
| + // Completed tasks not yet collected by origin thread.
|
| TaskDeque completed_tasks_;
|
|
|
| ScopedPtrDeque<base::DelegateSimpleThread> workers_;
|
| @@ -138,8 +140,8 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool,
|
| weak_ptr_factory_.GetWeakPtr())),
|
| on_idle_pending_(false),
|
| next_thread_index_(0),
|
| - running_task_count_(0),
|
| - shutdown_(false) {
|
| + shutdown_(false),
|
| + task_graph_(new WorkerPoolTaskGraphImpl) {
|
| base::AutoLock lock(lock_);
|
|
|
| while (workers_.size() < num_threads) {
|
| @@ -160,12 +162,9 @@ WorkerPool::Inner::~Inner() {
|
|
|
| DCHECK(shutdown_);
|
|
|
| - // Cancel all pending callbacks.
|
| - weak_ptr_factory_.InvalidateWeakPtrs();
|
| -
|
| - DCHECK_EQ(0u, pending_tasks_.size());
|
| + DCHECK(!task_graph_->HasMoreTasks());
|
| + DCHECK_EQ(0u, running_tasks_.size());
|
| DCHECK_EQ(0u, completed_tasks_.size());
|
| - DCHECK_EQ(0u, running_task_count_);
|
| }
|
|
|
| void WorkerPool::Inner::Shutdown() {
|
| @@ -184,32 +183,58 @@ void WorkerPool::Inner::Shutdown() {
|
| scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
|
| worker->Join();
|
| }
|
| +
|
| + // Cancel any pending OnIdle callback.
|
| + weak_ptr_factory_.InvalidateWeakPtrs();
|
| }
|
|
|
| -void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
|
| +void WorkerPool::Inner::ScheduleTasks(
|
| + scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) {
|
| base::AutoLock lock(lock_);
|
|
|
| - pending_tasks_.push_back(task.Pass());
|
| + // Move tasks not present in new graph to |completed_tasks_|.
|
| + while (task_graph_->HasMoreTasks()) {
|
| + scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask(
|
| + task_graph_->TopTask());
|
| +
|
| + // Task has not completed if present in new graph.
|
| + if (task_graph->HasTask(task))
|
| + continue;
|
| +
|
| + completed_tasks_.push_back(task);
|
| + }
|
| +
|
| + // Take any running tasks from new graph.
|
| + for (TaskSet::iterator it = running_tasks_.begin();
|
| + it != running_tasks_.end(); ++it)
|
| + task_graph->TakeTask(*it);
|
| +
|
| + // And take any completed tasks from new graph.
|
| + for (TaskDeque::iterator it = completed_tasks_.begin();
|
| + it != completed_tasks_.end(); ++it)
|
| + task_graph->TakeTask(*it);
|
| +
|
| + // Finally switch to the new graph.
|
| + task_graph_ = task_graph.Pass();
|
|
|
| // There is more work available, so wake up worker thread.
|
| has_pending_tasks_cv_.Signal();
|
| }
|
|
|
| -bool WorkerPool::Inner::CollectCompletedTasks() {
|
| +bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
|
| base::AutoLock lock(lock_);
|
|
|
| - return AppendCompletedTasksWithLockAcquired(
|
| - &worker_pool_on_origin_thread_->completed_tasks_);
|
| + return CollectCompletedTasksWithLockAcquired(completed_tasks);
|
| }
|
|
|
| -bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
|
| - ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
|
| +bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
|
| + TaskDeque* completed_tasks) {
|
| lock_.AssertAcquired();
|
|
|
| - while (completed_tasks_.size())
|
| - completed_tasks->push_back(completed_tasks_.take_front().Pass());
|
| + DCHECK_EQ(0u, completed_tasks->size());
|
| + completed_tasks->swap(completed_tasks_);
|
|
|
| - return !running_task_count_ && pending_tasks_.empty();
|
| + return running_tasks_.empty() && !task_graph_->HasMoreTasks();
|
| }
|
|
|
| void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
|
| @@ -222,6 +247,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
|
| }
|
|
|
| void WorkerPool::Inner::OnIdleOnOriginThread() {
|
| + TaskDeque completed_tasks;
|
| +
|
| {
|
| base::AutoLock lock(lock_);
|
|
|
| @@ -229,14 +256,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() {
|
| on_idle_pending_ = false;
|
|
|
| // Early out if no longer idle.
|
| - if (running_task_count_ || !pending_tasks_.empty())
|
| + if (!running_tasks_.empty() || task_graph_->HasMoreTasks())
|
| return;
|
|
|
| - AppendCompletedTasksWithLockAcquired(
|
| - &worker_pool_on_origin_thread_->completed_tasks_);
|
| + CollectCompletedTasksWithLockAcquired(&completed_tasks);
|
| }
|
|
|
| - worker_pool_on_origin_thread_->OnIdle();
|
| + worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
|
| }
|
|
|
| void WorkerPool::Inner::Run() {
|
| @@ -252,28 +278,28 @@ void WorkerPool::Inner::Run() {
|
| int thread_index = next_thread_index_++;
|
|
|
| while (true) {
|
| - if (pending_tasks_.empty()) {
|
| + if (!task_graph_->HasMoreTasks()) {
|
| // Exit when shutdown is set and no more tasks are pending.
|
| if (shutdown_)
|
| break;
|
|
|
| - // Schedule an idle callback if requested and not pending.
|
| - if (!running_task_count_)
|
| + // Schedule an idle callback if not tasks are running.
|
| + if (running_tasks_.empty())
|
| ScheduleOnIdleWithLockAcquired();
|
|
|
| - // Wait for new pending tasks.
|
| + // Wait for more tasks.
|
| has_pending_tasks_cv_.Wait();
|
| continue;
|
| }
|
|
|
| - // Get next task.
|
| - scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
|
| + // Take the highest priority task from the task graph.
|
| + scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask(
|
| + task_graph_->TopTask());
|
|
|
| - // Increment |running_task_count_| before starting to run task.
|
| - running_task_count_++;
|
| + // Insert task in |running_tasks_| before starting to run it.
|
| + running_tasks_.insert(task);
|
|
|
| - // There may be more work available, so wake up another
|
| - // worker thread.
|
| + // There may be more work available, so wake up another worker thread.
|
| has_pending_tasks_cv_.Signal();
|
|
|
| {
|
| @@ -282,10 +308,11 @@ void WorkerPool::Inner::Run() {
|
| task->RunOnThread(thread_index);
|
| }
|
|
|
| - completed_tasks_.push_back(task.Pass());
|
| + // Remove task from |running_tasks_| now that we are done running it.
|
| + running_tasks_.erase(task);
|
|
|
| - // Decrement |running_task_count_| now that we are done running task.
|
| - running_task_count_--;
|
| + // Finally add task to |completed_tasks_|.
|
| + completed_tasks_.push_back(task);
|
| }
|
|
|
| // We noticed we should exit. Wake up the next worker so it knows it should
|
| @@ -299,7 +326,6 @@ WorkerPool::WorkerPool(WorkerPoolClient* client,
|
| const std::string& thread_name_prefix)
|
| : client_(client),
|
| origin_loop_(base::MessageLoopProxy::current()),
|
| - weak_ptr_factory_(this),
|
| check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
|
| check_for_completed_tasks_pending_(false),
|
| inner_(make_scoped_ptr(new Inner(this,
|
| @@ -308,38 +334,35 @@ WorkerPool::WorkerPool(WorkerPoolClient* client,
|
| }
|
|
|
| WorkerPool::~WorkerPool() {
|
| - // Cancel all pending callbacks.
|
| - weak_ptr_factory_.InvalidateWeakPtrs();
|
| -
|
| - DCHECK_EQ(0u, completed_tasks_.size());
|
| }
|
|
|
| void WorkerPool::Shutdown() {
|
| inner_->Shutdown();
|
| - inner_->CollectCompletedTasks();
|
| - DispatchCompletionCallbacks();
|
| -}
|
|
|
| -void WorkerPool::PostTaskAndReply(
|
| - const Callback& task, const base::Closure& reply) {
|
| - PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
|
| - task,
|
| - reply)).PassAs<internal::WorkerPoolTask>());
|
| + TaskDeque completed_tasks;
|
| + inner_->CollectCompletedTasks(&completed_tasks);
|
| + DispatchCompletionCallbacks(&completed_tasks);
|
| }
|
|
|
| -void WorkerPool::OnIdle() {
|
| +void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
|
| TRACE_EVENT0("cc", "WorkerPool::OnIdle");
|
|
|
| - DispatchCompletionCallbacks();
|
| + DispatchCompletionCallbacks(completed_tasks);
|
| +
|
| + // Cancel any pending check for completed tasks.
|
| + check_for_completed_tasks_callback_.Cancel();
|
| + check_for_completed_tasks_pending_ = false;
|
| }
|
|
|
| void WorkerPool::ScheduleCheckForCompletedTasks() {
|
| if (check_for_completed_tasks_pending_)
|
| return;
|
| + check_for_completed_tasks_callback_.Reset(
|
| + base::Bind(&WorkerPool::CheckForCompletedTasks,
|
| + base::Unretained(this)));
|
| origin_loop_->PostDelayedTask(
|
| FROM_HERE,
|
| - base::Bind(&WorkerPool::CheckForCompletedTasks,
|
| - weak_ptr_factory_.GetWeakPtr()),
|
| + check_for_completed_tasks_callback_.callback(),
|
| check_for_completed_tasks_delay_);
|
| check_for_completed_tasks_pending_ = true;
|
| }
|
| @@ -349,32 +372,37 @@ void WorkerPool::CheckForCompletedTasks() {
|
| DCHECK(check_for_completed_tasks_pending_);
|
| check_for_completed_tasks_pending_ = false;
|
|
|
| + TaskDeque completed_tasks;
|
| +
|
| // Schedule another check for completed tasks if not idle.
|
| - if (!inner_->CollectCompletedTasks())
|
| + if (!inner_->CollectCompletedTasks(&completed_tasks))
|
| ScheduleCheckForCompletedTasks();
|
|
|
| - DispatchCompletionCallbacks();
|
| + DispatchCompletionCallbacks(&completed_tasks);
|
| }
|
|
|
| -void WorkerPool::DispatchCompletionCallbacks() {
|
| +void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
|
| TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
|
|
|
| - if (completed_tasks_.empty())
|
| + // Early out when |completed_tasks| is empty to prevent unnecessary
|
| + // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
|
| + if (completed_tasks->empty())
|
| return;
|
|
|
| - while (completed_tasks_.size()) {
|
| - scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front();
|
| - task->DidComplete();
|
| + while (completed_tasks->size()) {
|
| + scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
|
| + completed_tasks->pop_front();
|
| + task->DispatchCompletionCallback();
|
| }
|
|
|
| client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
|
| }
|
|
|
| -void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
|
| - // Schedule check for completed tasks if not pending.
|
| - ScheduleCheckForCompletedTasks();
|
| -
|
| - inner_->PostTask(task.Pass());
|
| +void WorkerPool::ScheduleTasks(
|
| + scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) {
|
| + if (task_graph->HasMoreTasks())
|
| + ScheduleCheckForCompletedTasks();
|
| + inner_->ScheduleTasks(task_graph.Pass());
|
| }
|
|
|
| } // namespace cc
|
|
|