Chromium Code Reviews| Index: cc/base/worker_pool.cc |
| diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc |
| index be9abf9b87d33e6c0b3fc2d473864568a84070fd..85f3c9ae03507b758ed1a63d017b932e72e4d650 100644 |
| --- a/cc/base/worker_pool.cc |
| +++ b/cc/base/worker_pool.cc |
| @@ -9,49 +9,44 @@ |
| #include <sys/resource.h> |
| #endif |
| -#include <algorithm> |
| +#include <set> |
| #include "base/bind.h" |
| #include "base/debug/trace_event.h" |
| #include "base/stringprintf.h" |
| #include "base/synchronization/condition_variable.h" |
| #include "base/threading/simple_thread.h" |
| +#include "cc/base/scoped_ptr_deque.h" |
| namespace cc { |
| namespace { |
| -class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
| +class WorkerPoolTaskGraphImpl : public internal::WorkerPoolTaskGraph { |
| public: |
| - WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
| - const base::Closure& reply) |
| - : internal::WorkerPoolTask(reply), |
| - task_(task) {} |
| + WorkerPoolTaskGraphImpl() {} |
| - virtual void RunOnThread(unsigned thread_index) OVERRIDE { |
| - task_.Run(); |
| + // Overridden from internal::WorkerPoolTaskGraph: |
| + virtual bool HasMoreTasks() OVERRIDE { return false; } |
| + virtual bool HasTask(internal::WorkerPoolTask* task) OVERRIDE { |
| + return false; |
| + } |
| + virtual internal::WorkerPoolTask* TopTask() OVERRIDE { |
| + NOTREACHED(); |
| + return NULL; |
| + } |
| + virtual scoped_refptr<internal::WorkerPoolTask> TakeTask( |
| + internal::WorkerPoolTask* task) OVERRIDE { |
| + NOTREACHED(); |
| + return NULL; |
| } |
| private: |
| - WorkerPool::Callback task_; |
| + virtual ~WorkerPoolTaskGraphImpl() {} |
| }; |
| } // namespace |
| -namespace internal { |
| - |
| -WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { |
| -} |
| - |
| -WorkerPoolTask::~WorkerPoolTask() { |
| -} |
| - |
| -void WorkerPoolTask::DidComplete() { |
| - reply_.Run(); |
| -} |
| - |
| -} // namespace internal |
| - |
| // Internal to the worker pool. Any data or logic that needs to be |
| // shared between threads lives in this class. All members are guarded |
| // by |lock_|. |
| @@ -64,17 +59,22 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
| void Shutdown(); |
| - void PostTask(scoped_ptr<internal::WorkerPoolTask> task); |
| + // Schedule running of tasks in |task_graph|. All tasks previously |
| + // scheduled but not present in |task_graph| will be canceled unless |
| + // already running. Canceled tasks are moved to |completed_tasks_| |
| + // without being run. The result is that once scheduled, a task is |
| + // guaranteed to end up in the |completed_tasks_| queue even if they |
| + // later get canceled by another call to ScheduleTasks(). |
| + void ScheduleTasks(scoped_ptr<internal::WorkerPoolTaskGraph> task_graph); |
| - // Appends all completed tasks to worker pool's completed tasks queue |
| - // and returns true if idle. |
| - bool CollectCompletedTasks(); |
| + // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
| + bool CollectCompletedTasks(TaskDeque* completed_tasks); |
| private: |
| - // Appends all completed tasks to |completed_tasks|. Lock must |
| - // already be acquired before calling this function. |
| - bool AppendCompletedTasksWithLockAcquired( |
| - ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); |
| + // Collect all completed tasks by swapping the contents of |
| + // |completed_tasks| and |completed_tasks_|. Lock must be acquired |
| + // before calling this function. Returns true if idle. |
| + bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); |
| // Schedule an OnIdleOnOriginThread callback if not already pending. |
| // Lock must already be acquired before calling this function. |
| @@ -110,15 +110,18 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
| // loop index is 0. |
| unsigned next_thread_index_; |
| - // Number of tasks currently running. |
| - unsigned running_task_count_; |
| - |
| // Set during shutdown. Tells workers to exit when no more tasks |
| // are pending. |
| bool shutdown_; |
| - typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; |
| - TaskDeque pending_tasks_; |
| + // The task graph. Provides tasks in order of priority. |
| + scoped_ptr<internal::WorkerPoolTaskGraph> task_graph_; |
| + |
| + // This set contains all currently running tasks. |
| + typedef std::set<internal::WorkerPoolTask*> TaskSet; |
| + TaskSet running_tasks_; |
| + |
| + // Completed tasks not yet collected by origin thread. |
| TaskDeque completed_tasks_; |
| ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
| @@ -138,8 +141,8 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
| weak_ptr_factory_.GetWeakPtr())), |
| on_idle_pending_(false), |
| next_thread_index_(0), |
| - running_task_count_(0), |
| - shutdown_(false) { |
| + shutdown_(false), |
| + task_graph_(new WorkerPoolTaskGraphImpl) { |
| base::AutoLock lock(lock_); |
| while (workers_.size() < num_threads) { |
| @@ -160,12 +163,9 @@ WorkerPool::Inner::~Inner() { |
| DCHECK(shutdown_); |
| - // Cancel all pending callbacks. |
| - weak_ptr_factory_.InvalidateWeakPtrs(); |
| - |
| - DCHECK_EQ(0u, pending_tasks_.size()); |
| + DCHECK(!task_graph_->HasMoreTasks()); |
| + DCHECK_EQ(0u, running_tasks_.size()); |
| DCHECK_EQ(0u, completed_tasks_.size()); |
| - DCHECK_EQ(0u, running_task_count_); |
| } |
| void WorkerPool::Inner::Shutdown() { |
| @@ -184,32 +184,62 @@ void WorkerPool::Inner::Shutdown() { |
| scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
| worker->Join(); |
| } |
| + |
| + // Cancel any pending OnIdle callback. |
| + weak_ptr_factory_.InvalidateWeakPtrs(); |
| } |
| -void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
| +void WorkerPool::Inner::ScheduleTasks( |
| + scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) { |
| base::AutoLock lock(lock_); |
| - pending_tasks_.push_back(task.Pass()); |
| + // Move tasks not present in new graph to |completed_tasks_|. |
| + while (task_graph_->HasMoreTasks()) { |
| + scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask( |
| + task_graph_->TopTask()); |
| + |
| + // Task has not completed if present in new graph. |
| + if (task_graph->HasTask(task)) |
| + continue; |
| + |
| + completed_tasks_.push_back(task); |
| + } |
| + |
| + // Take any running tasks from new graph. |
| + for (TaskSet::iterator it = running_tasks_.begin(); |
| + it != running_tasks_.end(); ++it) { |
| + if (task_graph->HasTask(*it)) |
| + task_graph->TakeTask(*it); |
| + } |
| + |
| + // And take any completed tasks from new graph. |
| + for (TaskDeque::iterator it = completed_tasks_.begin(); |
| + it != completed_tasks_.end(); ++it) { |
| + if (task_graph->HasTask(*it)) |
| + task_graph->TakeTask(*it); |
| + } |
| + |
| + // Finally switch to the new graph. |
| + task_graph_ = task_graph.Pass(); |
| // There is more work available, so wake up worker thread. |
| has_pending_tasks_cv_.Signal(); |
| } |
| -bool WorkerPool::Inner::CollectCompletedTasks() { |
| +bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { |
| base::AutoLock lock(lock_); |
| - return AppendCompletedTasksWithLockAcquired( |
| - &worker_pool_on_origin_thread_->completed_tasks_); |
| + return CollectCompletedTasksWithLockAcquired(completed_tasks); |
| } |
| -bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( |
| - ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { |
| +bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
| + TaskDeque* completed_tasks) { |
| lock_.AssertAcquired(); |
| - while (completed_tasks_.size()) |
| - completed_tasks->push_back(completed_tasks_.take_front().Pass()); |
| + DCHECK_EQ(0u, completed_tasks->size()); |
| + completed_tasks->swap(completed_tasks_); |
| - return !running_task_count_ && pending_tasks_.empty(); |
| + return running_tasks_.empty() && !task_graph_->HasMoreTasks(); |
| } |
| void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
| @@ -222,6 +252,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
| } |
| void WorkerPool::Inner::OnIdleOnOriginThread() { |
| + TaskDeque completed_tasks; |
| + |
| { |
| base::AutoLock lock(lock_); |
| @@ -229,14 +261,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() { |
| on_idle_pending_ = false; |
| // Early out if no longer idle. |
| - if (running_task_count_ || !pending_tasks_.empty()) |
| + if (!running_tasks_.empty() || task_graph_->HasMoreTasks()) |
| return; |
| - AppendCompletedTasksWithLockAcquired( |
| - &worker_pool_on_origin_thread_->completed_tasks_); |
| + CollectCompletedTasksWithLockAcquired(&completed_tasks); |
| } |
| - worker_pool_on_origin_thread_->OnIdle(); |
| + worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
| } |
| void WorkerPool::Inner::Run() { |
| @@ -252,28 +283,28 @@ void WorkerPool::Inner::Run() { |
| int thread_index = next_thread_index_++; |
| while (true) { |
| - if (pending_tasks_.empty()) { |
| + if (!task_graph_->HasMoreTasks()) { |
| // Exit when shutdown is set and no more tasks are pending. |
| if (shutdown_) |
|
vmpstr
2013/05/06 23:08:48
Now that task_graph_ contains all work that needs
reveman
2013/05/07 01:33:54
Yes, we could cancel all pending tasks at shutdown
|
| break; |
| - // Schedule an idle callback if requested and not pending. |
| - if (!running_task_count_) |
| + // Schedule an idle callback if not tasks are running. |
|
vmpstr
2013/05/06 23:08:48
nit: not -> no
reveman
2013/05/07 01:33:54
Done.
|
| + if (running_tasks_.empty()) |
| ScheduleOnIdleWithLockAcquired(); |
| - // Wait for new pending tasks. |
| + // Wait for more tasks. |
| has_pending_tasks_cv_.Wait(); |
| continue; |
| } |
| - // Get next task. |
| - scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
| + // Take the highest priority task from the task graph. |
| + scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask( |
| + task_graph_->TopTask()); |
| - // Increment |running_task_count_| before starting to run task. |
| - running_task_count_++; |
| + // Insert task in |running_tasks_| before starting to run it. |
| + running_tasks_.insert(task); |
| - // There may be more work available, so wake up another |
| - // worker thread. |
| + // There may be more work available, so wake up another worker thread. |
| has_pending_tasks_cv_.Signal(); |
| { |
| @@ -282,10 +313,11 @@ void WorkerPool::Inner::Run() { |
| task->RunOnThread(thread_index); |
| } |
| - completed_tasks_.push_back(task.Pass()); |
| + // Remove task from |running_tasks_| now that we are done running it. |
| + running_tasks_.erase(task); |
| - // Decrement |running_task_count_| now that we are done running task. |
| - running_task_count_--; |
| + // Finally add task to |completed_tasks_|. |
| + completed_tasks_.push_back(task); |
| } |
| // We noticed we should exit. Wake up the next worker so it knows it should |
| @@ -299,7 +331,6 @@ WorkerPool::WorkerPool(WorkerPoolClient* client, |
| const std::string& thread_name_prefix) |
| : client_(client), |
| origin_loop_(base::MessageLoopProxy::current()), |
| - weak_ptr_factory_(this), |
| check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
| check_for_completed_tasks_pending_(false), |
| inner_(make_scoped_ptr(new Inner(this, |
| @@ -308,38 +339,35 @@ WorkerPool::WorkerPool(WorkerPoolClient* client, |
| } |
| WorkerPool::~WorkerPool() { |
| - // Cancel all pending callbacks. |
| - weak_ptr_factory_.InvalidateWeakPtrs(); |
| - |
| - DCHECK_EQ(0u, completed_tasks_.size()); |
| } |
| void WorkerPool::Shutdown() { |
| inner_->Shutdown(); |
| - inner_->CollectCompletedTasks(); |
| - DispatchCompletionCallbacks(); |
| -} |
| -void WorkerPool::PostTaskAndReply( |
| - const Callback& task, const base::Closure& reply) { |
| - PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( |
| - task, |
| - reply)).PassAs<internal::WorkerPoolTask>()); |
| + TaskDeque completed_tasks; |
| + inner_->CollectCompletedTasks(&completed_tasks); |
| + DispatchCompletionCallbacks(&completed_tasks); |
| } |
| -void WorkerPool::OnIdle() { |
| +void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
| TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
| - DispatchCompletionCallbacks(); |
| + DispatchCompletionCallbacks(completed_tasks); |
| + |
| + // Cancel any pending check for completed tasks. |
| + check_for_completed_tasks_callback_.Cancel(); |
| + check_for_completed_tasks_pending_ = false; |
| } |
| void WorkerPool::ScheduleCheckForCompletedTasks() { |
| if (check_for_completed_tasks_pending_) |
| return; |
| + check_for_completed_tasks_callback_.Reset( |
| + base::Bind(&WorkerPool::CheckForCompletedTasks, |
| + base::Unretained(this))); |
| origin_loop_->PostDelayedTask( |
| FROM_HERE, |
| - base::Bind(&WorkerPool::CheckForCompletedTasks, |
| - weak_ptr_factory_.GetWeakPtr()), |
| + check_for_completed_tasks_callback_.callback(), |
| check_for_completed_tasks_delay_); |
| check_for_completed_tasks_pending_ = true; |
| } |
| @@ -349,32 +377,37 @@ void WorkerPool::CheckForCompletedTasks() { |
| DCHECK(check_for_completed_tasks_pending_); |
| check_for_completed_tasks_pending_ = false; |
| + TaskDeque completed_tasks; |
| + |
| // Schedule another check for completed tasks if not idle. |
| - if (!inner_->CollectCompletedTasks()) |
| + if (!inner_->CollectCompletedTasks(&completed_tasks)) |
| ScheduleCheckForCompletedTasks(); |
| - DispatchCompletionCallbacks(); |
| + DispatchCompletionCallbacks(&completed_tasks); |
| } |
| -void WorkerPool::DispatchCompletionCallbacks() { |
| +void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
| TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
| - if (completed_tasks_.empty()) |
| + // Early out when |completed_tasks| is empty to prevent unnecessary |
| + // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). |
| + if (completed_tasks->empty()) |
| return; |
| - while (completed_tasks_.size()) { |
| - scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); |
| - task->DidComplete(); |
| + while (completed_tasks->size()) { |
|
vmpstr
2013/05/06 23:08:48
nit: consider !empty
reveman
2013/05/07 01:33:54
Done.
|
| + scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
| + completed_tasks->pop_front(); |
| + task->DispatchCompletionCallback(); |
| } |
| client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
| } |
| -void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
| - // Schedule check for completed tasks if not pending. |
| - ScheduleCheckForCompletedTasks(); |
| - |
| - inner_->PostTask(task.Pass()); |
| +void WorkerPool::ScheduleTasks( |
| + scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) { |
| + if (task_graph->HasMoreTasks()) |
| + ScheduleCheckForCompletedTasks(); |
| + inner_->ScheduleTasks(task_graph.Pass()); |
| } |
| } // namespace cc |