Index: cc/base/worker_pool.cc |
diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc |
index 27e6440eb0509674c9eb265135fa6565cd73f1e9..e3eccc5267a7ca9f3f288c1efe0a0c7a9083756d 100644 |
--- a/cc/base/worker_pool.cc |
+++ b/cc/base/worker_pool.cc |
@@ -9,45 +9,134 @@ |
#include <sys/resource.h> |
#endif |
-#include <algorithm> |
- |
#include "base/bind.h" |
#include "base/debug/trace_event.h" |
+#include "base/hash_tables.h" |
#include "base/stringprintf.h" |
#include "base/synchronization/condition_variable.h" |
#include "base/threading/simple_thread.h" |
+#include "cc/base/scoped_ptr_deque.h" |
-namespace cc { |
- |
-namespace { |
- |
-class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
- public: |
- WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
- const base::Closure& reply) |
- : internal::WorkerPoolTask(reply), |
- task_(task) {} |
- |
- virtual void RunOnThread(unsigned thread_index) OVERRIDE { |
- task_.Run(); |
+#if defined(COMPILER_GCC) |
+namespace BASE_HASH_NAMESPACE { |
+template <> struct hash<cc::internal::WorkerPoolTask*> { |
+ size_t operator()(cc::internal::WorkerPoolTask* ptr) const { |
+ return hash<size_t>()(reinterpret_cast<size_t>(ptr)); |
} |
- |
- private: |
- WorkerPool::Callback task_; |
}; |
+} // namespace BASE_HASH_NAMESPACE |
+#endif // COMPILER |
-} // namespace |
+namespace cc { |
namespace internal { |
-WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { |
+WorkerPoolTask::WorkerPoolTask() : did_schedule_(false), |
+ did_run_(false), |
+ did_complete_(false) { |
} |
WorkerPoolTask::~WorkerPoolTask() { |
+ DCHECK_EQ(did_schedule_, did_complete_); |
+ DCHECK(!did_run_ || did_schedule_); |
+ DCHECK(!did_run_ || did_complete_); |
+} |
+ |
+void WorkerPoolTask::DidSchedule() { |
+ DCHECK(!did_complete_); |
+ did_schedule_ = true; |
+} |
+ |
+void WorkerPoolTask::WillRun() { |
+ DCHECK(did_schedule_); |
+ DCHECK(!did_complete_); |
+ DCHECK(!did_run_); |
+} |
+ |
+void WorkerPoolTask::DidRun() { |
+ did_run_ = true; |
} |
void WorkerPoolTask::DidComplete() { |
- reply_.Run(); |
+ DCHECK(did_schedule_); |
+ DCHECK(!did_complete_); |
+ did_complete_ = true; |
+} |
+ |
+bool WorkerPoolTask::HasFinished() const { |
+ return did_run_; |
+} |
+ |
+WorkerPoolTaskDependency::Iterator::Iterator( |
+ const WorkerPoolTaskDependency* root) |
+ : current_(root), |
+ root_(root) { |
+ ++(*this); |
+} |
+ |
+WorkerPoolTaskDependency::Iterator::~Iterator() { |
+} |
+ |
+WorkerPoolTaskDependency::TaskIterator::TaskIterator( |
+ const WorkerPoolTaskDependency* parent) |
+ : current_(parent->first_child_.get()) { |
+} |
+ |
+WorkerPoolTaskDependency::TaskIterator::~TaskIterator() { |
+} |
+ |
+WorkerPoolTaskDependency::WorkerPoolTaskDependency() |
+ : parent_(NULL), |
+ last_child_(NULL) { |
+} |
+ |
+WorkerPoolTaskDependency::~WorkerPoolTaskDependency() { |
+} |
+ |
+// static |
+void WorkerPoolTaskDependency::Create( |
+ WorkerPoolTask* task, |
+ WorkerPoolTaskDependency* parent, |
+ WorkerPoolTaskDependency* dependencies) { |
+ scoped_ptr<WorkerPoolTaskDependency> dependency( |
+ new WorkerPoolTaskDependency); |
+ |
+ if (dependencies) { |
+ DCHECK(!dependencies->task()); |
+ dependency->Swap(dependencies); |
+ } |
+ dependency->task_ = task; |
+ dependency->parent_ = parent; |
+ |
+ if (parent->last_child_) { |
+ parent->last_child_->next_sibling_ = dependency.Pass(); |
+ // |parent_| should only be set for |last_child_| |
+ parent->last_child_->parent_ = NULL; |
+ parent->last_child_ = parent->last_child_->next_sibling_.get(); |
+ } else { |
+ parent->first_child_ = dependency.Pass(); |
+ parent->last_child_ = parent->first_child_.get(); |
+ } |
+} |
+ |
+void WorkerPoolTaskDependency::Swap(WorkerPoolTaskDependency* other) { |
+ DCHECK(other); |
+ |
+ first_child_.swap(other->first_child_); |
+ next_sibling_.swap(other->next_sibling_); |
+ task_.swap(other->task_); |
+ |
+ // |last_child_| has a pointer to the parent. Make sure this pointer |
+ // is properly swapped. |
+ WorkerPoolTaskDependency* other_last_child_ = other->last_child_; |
+ if (other_last_child_) |
+ other_last_child_->parent_ = this; |
+ if (last_child_) |
+ last_child_->parent_ = other; |
+ |
+ // Swap |last_child_| pointers. |
+ other->last_child_ = last_child_; |
+ last_child_ = other_last_child_; |
} |
} // namespace internal |
@@ -64,17 +153,26 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
void Shutdown(); |
- void PostTask(scoped_ptr<internal::WorkerPoolTask> task); |
+ // Schedule running of tasks in |task_graph|. All tasks previously |
+ // scheduled but not present in |task_graph| will be canceled unless |
+ // already running. Canceled tasks are moved to |completed_tasks_| |
+ // without being run. The result is that once scheduled, a task is |
+ // guaranteed to end up in the |completed_tasks_| queue even if they |
+ // later get canceled by another call to ScheduleTasks(). |
+ void ScheduleTasks(internal::WorkerPoolTaskGraph* task_graph); |
- // Appends all completed tasks to worker pool's completed tasks queue |
- // and returns true if idle. |
- bool CollectCompletedTasks(); |
+ // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
+ bool CollectCompletedTasks(TaskDeque* completed_tasks); |
private: |
- // Appends all completed tasks to |completed_tasks|. Lock must |
- // already be acquired before calling this function. |
- bool AppendCompletedTasksWithLockAcquired( |
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); |
+ // Find a task that is ready to run by traversing the dependency graph. |
+ // Lock must be acquired before calling this function. |
+ internal::WorkerPoolTask* FindTaskThatIsReadyToRunWithLockAcquired(); |
+ |
+ // Collect all completed tasks by swapping the contents of |
+ // |completed_tasks| and |completed_tasks_|. Lock must be acquired |
+ // before calling this function. Returns true if idle. |
+ bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); |
// Schedule an OnIdleOnOriginThread callback if not already pending. |
// Lock must already be acquired before calling this function. |
@@ -110,15 +208,21 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
// loop index is 0. |
unsigned next_thread_index_; |
- // Number of tasks currently running. |
- unsigned running_task_count_; |
- |
// Set during shutdown. Tells workers to exit when no more tasks |
// are pending. |
bool shutdown_; |
- typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; |
- TaskDeque pending_tasks_; |
+ // The dependency graph. Describes task dependencies and task priority. |
+ internal::WorkerPoolTaskGraph task_graph_; |
+ |
+ // This set contains all pending tasks. |
+ typedef base::hash_set<internal::WorkerPoolTask*> TaskSet; |
+ TaskSet pending_tasks_; |
+ |
+ // This set contains all currently running tasks. |
+ TaskSet running_tasks_; |
+ |
+ // Completed tasks not yet collected by origin thread. |
TaskDeque completed_tasks_; |
ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
@@ -138,18 +242,17 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
weak_ptr_factory_.GetWeakPtr())), |
on_idle_pending_(false), |
next_thread_index_(0), |
- running_task_count_(0), |
shutdown_(false) { |
base::AutoLock lock(lock_); |
while (workers_.size() < num_threads) { |
scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
new base::DelegateSimpleThread( |
- this, |
- thread_name_prefix + |
- base::StringPrintf( |
- "Worker%u", |
- static_cast<unsigned>(workers_.size() + 1)).c_str())); |
+ this, |
+ thread_name_prefix + |
+ base::StringPrintf( |
+ "Worker%u", |
+ static_cast<unsigned>(workers_.size() + 1)).c_str())); |
worker->Start(); |
workers_.push_back(worker.Pass()); |
} |
@@ -160,12 +263,9 @@ WorkerPool::Inner::~Inner() { |
DCHECK(shutdown_); |
- // Cancel all pending callbacks. |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
- |
DCHECK_EQ(0u, pending_tasks_.size()); |
+ DCHECK_EQ(0u, running_tasks_.size()); |
DCHECK_EQ(0u, completed_tasks_.size()); |
- DCHECK_EQ(0u, running_task_count_); |
} |
void WorkerPool::Inner::Shutdown() { |
@@ -184,32 +284,111 @@ void WorkerPool::Inner::Shutdown() { |
scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
worker->Join(); |
} |
+ |
+ // Cancel any pending OnIdle callback. |
+ weak_ptr_factory_.InvalidateWeakPtrs(); |
} |
-void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
- base::AutoLock lock(lock_); |
+void WorkerPool::Inner::ScheduleTasks( |
+ internal::WorkerPoolTaskGraph* task_graph) { |
+ // Move all dependencies to |new_graph|. |
+ internal::WorkerPoolTaskGraph new_graph; |
+ new_graph.Swap(task_graph); |
+ |
+ TaskSet tasks; |
+ // Traverse task graph to build new pending task set. |
+ for (internal::WorkerPoolTaskGraph::Iterator it(&new_graph); it; ++it) { |
+ internal::WorkerPoolTask* task = it->task(); |
+ task->DidSchedule(); |
+ tasks.insert(task); |
+ } |
+ |
+ { |
+ base::AutoLock lock(lock_); |
- pending_tasks_.push_back(task.Pass()); |
+ // Move tasks not present in |new_graph| to |completed_tasks_|. |
+ for (TaskSet::iterator it = pending_tasks_.begin(); |
+ it != pending_tasks_.end(); ++it) { |
+ internal::WorkerPoolTask* task = *it; |
- // There is more work available, so wake up worker thread. |
- has_pending_tasks_cv_.Signal(); |
+ // Task has completed if not present in |new_graph|. |
+ if (tasks.find(task) == tasks.end()) |
+ completed_tasks_.push_back(task); |
+ } |
+ |
+ // Remove all running tasks from new pending task set. |
+ for (TaskSet::iterator it = running_tasks_.begin(); |
+ it != running_tasks_.end(); ++it) { |
+ internal::WorkerPoolTask* task = *it; |
+ |
+ if (tasks.find(task) != tasks.end()) |
+ tasks.erase(task); |
+ } |
+ |
+ // And remove all completed tasks from new pending task set. |
+ for (TaskDeque::iterator it = completed_tasks_.begin(); |
+ it != completed_tasks_.end(); ++it) { |
+ internal::WorkerPoolTask* task = *it; |
+ |
+ if (tasks.find(task) != tasks.end()) |
+ tasks.erase(task); |
+ } |
+ |
+ // Swap task graphs. |
+ // Note: old tasks are intentionally destroyed after releasing |lock_|. |
+ task_graph_.Swap(&new_graph); |
+ |
+ // Swap pending task sets. |
+ pending_tasks_.swap(tasks); |
+ |
+ // There is more work available, so wake up worker thread. |
+ has_pending_tasks_cv_.Signal(); |
+ } |
} |
-bool WorkerPool::Inner::CollectCompletedTasks() { |
+bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { |
base::AutoLock lock(lock_); |
- return AppendCompletedTasksWithLockAcquired( |
- &worker_pool_on_origin_thread_->completed_tasks_); |
+ return CollectCompletedTasksWithLockAcquired(completed_tasks); |
} |
-bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( |
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { |
+internal::WorkerPoolTask* |
+ WorkerPool::Inner::FindTaskThatIsReadyToRunWithLockAcquired() { |
lock_.AssertAcquired(); |
- while (completed_tasks_.size()) |
- completed_tasks->push_back(completed_tasks_.take_front().Pass()); |
+ for (internal::WorkerPoolTaskGraph::Iterator it(&task_graph_); it; ++it) { |
+ internal::WorkerPoolTask* task = it->task(); |
- return !running_task_count_ && pending_tasks_.empty(); |
+ // Skip task if not present in |pending_tasks_|. These tasks are |
+ // either already running or have finished. |
+ // Note: Skip traversal of sub-tree if this function becomes too slow. |
+ if (pending_tasks_.find(task) == pending_tasks_.end()) |
+ continue; |
+ |
+ // Task is ready to run if all direct children have finished running. |
+ bool all_dependencies_are_satisfied = true; |
+ |
+ for (internal::WorkerPoolTaskGraph::TaskIterator task_it(*it); |
+ task_it && all_dependencies_are_satisfied; |
+ ++task_it) { |
+ all_dependencies_are_satisfied &= task_it->HasFinished(); |
+ } |
+ |
+ if (all_dependencies_are_satisfied) |
+ return task; |
+ } |
+ |
+ return NULL; |
+} |
+ |
+bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
+ TaskDeque* completed_tasks) { |
+ lock_.AssertAcquired(); |
+ |
+ DCHECK_EQ(0u, completed_tasks->size()); |
+ completed_tasks->swap(completed_tasks_); |
+ |
+ return running_tasks_.empty() && pending_tasks_.empty(); |
} |
void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
@@ -222,6 +401,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
} |
void WorkerPool::Inner::OnIdleOnOriginThread() { |
+ TaskDeque completed_tasks; |
+ |
{ |
base::AutoLock lock(lock_); |
@@ -229,14 +410,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() { |
on_idle_pending_ = false; |
// Early out if no longer idle. |
- if (running_task_count_ || !pending_tasks_.empty()) |
+ if (!running_tasks_.empty() || !pending_tasks_.empty()) |
return; |
- AppendCompletedTasksWithLockAcquired( |
- &worker_pool_on_origin_thread_->completed_tasks_); |
+ CollectCompletedTasksWithLockAcquired(&completed_tasks); |
} |
- worker_pool_on_origin_thread_->OnIdle(); |
+ worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
} |
void WorkerPool::Inner::Run() { |
@@ -252,40 +432,53 @@ void WorkerPool::Inner::Run() { |
int thread_index = next_thread_index_++; |
while (true) { |
- if (pending_tasks_.empty()) { |
- // Exit when shutdown is set and no more tasks are pending. |
- if (shutdown_) |
- break; |
- |
- // Schedule an idle callback if requested and not pending. |
- if (!running_task_count_) |
- ScheduleOnIdleWithLockAcquired(); |
- |
- // Wait for new pending tasks. |
+ scoped_refptr<internal::WorkerPoolTask> task( |
+ FindTaskThatIsReadyToRunWithLockAcquired()); |
+ if (!task) { |
+ if (pending_tasks_.empty()) { |
+ // Exit when shutdown is set and no more tasks are pending. |
+ if (shutdown_) |
+ break; |
+ |
+ // Schedule an idle callback if no tasks are running. |
+ if (running_tasks_.empty()) |
+ ScheduleOnIdleWithLockAcquired(); |
+ } |
+ |
+ // Wait for more tasks. |
has_pending_tasks_cv_.Wait(); |
continue; |
} |
- // Get next task. |
- scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
+ // First remove task from |pending_tasks_|. |
+ DCHECK(pending_tasks_.find(task) != pending_tasks_.end()); |
+ pending_tasks_.erase(task); |
- // Increment |running_task_count_| before starting to run task. |
- running_task_count_++; |
+ // Insert task in |running_tasks_| before starting to run it. |
+ DCHECK(running_tasks_.find(task) == running_tasks_.end()); |
+ running_tasks_.insert(task); |
- // There may be more work available, so wake up another |
- // worker thread. |
+ // There may be more work available, so wake up another worker thread. |
has_pending_tasks_cv_.Signal(); |
+ // Call WillRun() before releasing |lock_| and running task. |
+ task->WillRun(); |
+ |
{ |
base::AutoUnlock unlock(lock_); |
task->RunOnThread(thread_index); |
} |
- completed_tasks_.push_back(task.Pass()); |
+ // This will mark task as finished. |
+ task->DidRun(); |
+ |
+ // Remove task from |running_tasks_| now that we are done running it. |
+ DCHECK(running_tasks_.find(task) != running_tasks_.end()); |
+ running_tasks_.erase(task); |
- // Decrement |running_task_count_| now that we are done running task. |
- running_task_count_--; |
+ // Finally add task to |completed_tasks_|. |
+ completed_tasks_.push_back(task); |
} |
// We noticed we should exit. Wake up the next worker so it knows it should |
@@ -298,7 +491,6 @@ WorkerPool::WorkerPool(size_t num_threads, |
const std::string& thread_name_prefix) |
: client_(NULL), |
origin_loop_(base::MessageLoopProxy::current()), |
- weak_ptr_factory_(this), |
check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
check_for_completed_tasks_pending_(false), |
inner_(make_scoped_ptr(new Inner(this, |
@@ -307,38 +499,35 @@ WorkerPool::WorkerPool(size_t num_threads, |
} |
WorkerPool::~WorkerPool() { |
- // Cancel all pending callbacks. |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
- |
- DCHECK_EQ(0u, completed_tasks_.size()); |
} |
void WorkerPool::Shutdown() { |
inner_->Shutdown(); |
- inner_->CollectCompletedTasks(); |
- DispatchCompletionCallbacks(); |
-} |
-void WorkerPool::PostTaskAndReply( |
- const Callback& task, const base::Closure& reply) { |
- PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( |
- task, |
- reply)).PassAs<internal::WorkerPoolTask>()); |
+ TaskDeque completed_tasks; |
+ inner_->CollectCompletedTasks(&completed_tasks); |
+ DispatchCompletionCallbacks(&completed_tasks); |
} |
-void WorkerPool::OnIdle() { |
+void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
- DispatchCompletionCallbacks(); |
+ DispatchCompletionCallbacks(completed_tasks); |
+ |
+ // Cancel any pending check for completed tasks. |
+ check_for_completed_tasks_callback_.Cancel(); |
+ check_for_completed_tasks_pending_ = false; |
} |
void WorkerPool::ScheduleCheckForCompletedTasks() { |
if (check_for_completed_tasks_pending_) |
return; |
+ check_for_completed_tasks_callback_.Reset( |
+ base::Bind(&WorkerPool::CheckForCompletedTasks, |
+ base::Unretained(this))); |
origin_loop_->PostDelayedTask( |
FROM_HERE, |
- base::Bind(&WorkerPool::CheckForCompletedTasks, |
- weak_ptr_factory_.GetWeakPtr()), |
+ check_for_completed_tasks_callback_.callback(), |
check_for_completed_tasks_delay_); |
check_for_completed_tasks_pending_ = true; |
} |
@@ -348,33 +537,42 @@ void WorkerPool::CheckForCompletedTasks() { |
DCHECK(check_for_completed_tasks_pending_); |
check_for_completed_tasks_pending_ = false; |
+ TaskDeque completed_tasks; |
+ |
// Schedule another check for completed tasks if not idle. |
- if (!inner_->CollectCompletedTasks()) |
+ if (!inner_->CollectCompletedTasks(&completed_tasks)) |
ScheduleCheckForCompletedTasks(); |
- DispatchCompletionCallbacks(); |
+ DispatchCompletionCallbacks(&completed_tasks); |
} |
-void WorkerPool::DispatchCompletionCallbacks() { |
+void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
- if (completed_tasks_.empty()) |
+ // Early out when |completed_tasks| is empty to prevent unnecessary |
+ // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). |
+ if (completed_tasks->empty()) |
return; |
- while (completed_tasks_.size()) { |
- scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); |
+ while (!completed_tasks->empty()) { |
+ scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
+ completed_tasks->pop_front(); |
task->DidComplete(); |
+ task->DispatchCompletionCallback(); |
} |
DCHECK(client_); |
client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
} |
-void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
- // Schedule check for completed tasks if not pending. |
- ScheduleCheckForCompletedTasks(); |
+void WorkerPool::ScheduleTasks(internal::WorkerPoolTaskGraph* task_graph) { |
+ TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); |
+ |
+ // Schedule check for completed tasks if graph is non-empty. |
+ if (internal::WorkerPoolTaskDependency::Iterator(task_graph)) |
+ ScheduleCheckForCompletedTasks(); |
- inner_->PostTask(task.Pass()); |
+ inner_->ScheduleTasks(task_graph); |
} |
} // namespace cc |