Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2808)

Unified Diff: cc/base/worker_pool.cc

Issue 14689004: Re-land: cc: Cancel and re-prioritize worker pool tasks. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Added support for dependencies and tests. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: cc/base/worker_pool.cc
diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc
index 27e6440eb0509674c9eb265135fa6565cd73f1e9..e3eccc5267a7ca9f3f288c1efe0a0c7a9083756d 100644
--- a/cc/base/worker_pool.cc
+++ b/cc/base/worker_pool.cc
@@ -9,45 +9,134 @@
#include <sys/resource.h>
#endif
-#include <algorithm>
-
#include "base/bind.h"
#include "base/debug/trace_event.h"
+#include "base/hash_tables.h"
#include "base/stringprintf.h"
#include "base/synchronization/condition_variable.h"
#include "base/threading/simple_thread.h"
+#include "cc/base/scoped_ptr_deque.h"
-namespace cc {
-
-namespace {
-
-class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
- public:
- WorkerPoolTaskImpl(const WorkerPool::Callback& task,
- const base::Closure& reply)
- : internal::WorkerPoolTask(reply),
- task_(task) {}
-
- virtual void RunOnThread(unsigned thread_index) OVERRIDE {
- task_.Run();
+#if defined(COMPILER_GCC)
+namespace BASE_HASH_NAMESPACE {
+template <> struct hash<cc::internal::WorkerPoolTask*> {
+ size_t operator()(cc::internal::WorkerPoolTask* ptr) const {
+ return hash<size_t>()(reinterpret_cast<size_t>(ptr));
}
-
- private:
- WorkerPool::Callback task_;
};
+} // namespace BASE_HASH_NAMESPACE
+#endif // COMPILER
-} // namespace
+namespace cc {
namespace internal {
-WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
+WorkerPoolTask::WorkerPoolTask() : did_schedule_(false),
+ did_run_(false),
+ did_complete_(false) {
}
WorkerPoolTask::~WorkerPoolTask() {
+ DCHECK_EQ(did_schedule_, did_complete_);
+ DCHECK(!did_run_ || did_schedule_);
+ DCHECK(!did_run_ || did_complete_);
+}
+
+void WorkerPoolTask::DidSchedule() {
+ DCHECK(!did_complete_);
+ did_schedule_ = true;
+}
+
+void WorkerPoolTask::WillRun() {
+ DCHECK(did_schedule_);
+ DCHECK(!did_complete_);
+ DCHECK(!did_run_);
+}
+
+void WorkerPoolTask::DidRun() {
+ did_run_ = true;
}
void WorkerPoolTask::DidComplete() {
- reply_.Run();
+ DCHECK(did_schedule_);
+ DCHECK(!did_complete_);
+ did_complete_ = true;
+}
+
+bool WorkerPoolTask::HasFinished() const {
+ return did_run_;
+}
+
+WorkerPoolTaskDependency::Iterator::Iterator(
+ const WorkerPoolTaskDependency* root)
+ : current_(root),
+ root_(root) {
+ ++(*this);
+}
+
+WorkerPoolTaskDependency::Iterator::~Iterator() {
+}
+
+WorkerPoolTaskDependency::TaskIterator::TaskIterator(
+ const WorkerPoolTaskDependency* parent)
+ : current_(parent->first_child_.get()) {
+}
+
+WorkerPoolTaskDependency::TaskIterator::~TaskIterator() {
+}
+
+WorkerPoolTaskDependency::WorkerPoolTaskDependency()
+ : parent_(NULL),
+ last_child_(NULL) {
+}
+
+WorkerPoolTaskDependency::~WorkerPoolTaskDependency() {
+}
+
+// static
+void WorkerPoolTaskDependency::Create(
+ WorkerPoolTask* task,
+ WorkerPoolTaskDependency* parent,
+ WorkerPoolTaskDependency* dependencies) {
+ scoped_ptr<WorkerPoolTaskDependency> dependency(
+ new WorkerPoolTaskDependency);
+
+ if (dependencies) {
+ DCHECK(!dependencies->task());
+ dependency->Swap(dependencies);
+ }
+ dependency->task_ = task;
+ dependency->parent_ = parent;
+
+ if (parent->last_child_) {
+ parent->last_child_->next_sibling_ = dependency.Pass();
+ // |parent_| should only be set for |last_child_|
+ parent->last_child_->parent_ = NULL;
+ parent->last_child_ = parent->last_child_->next_sibling_.get();
+ } else {
+ parent->first_child_ = dependency.Pass();
+ parent->last_child_ = parent->first_child_.get();
+ }
+}
+
+void WorkerPoolTaskDependency::Swap(WorkerPoolTaskDependency* other) {
+ DCHECK(other);
+
+ first_child_.swap(other->first_child_);
+ next_sibling_.swap(other->next_sibling_);
+ task_.swap(other->task_);
+
+ // |last_child_| has a pointer to the parent. Make sure this pointer
+ // is properly swapped.
+ WorkerPoolTaskDependency* other_last_child_ = other->last_child_;
+ if (other_last_child_)
+ other_last_child_->parent_ = this;
+ if (last_child_)
+ last_child_->parent_ = other;
+
+ // Swap |last_child_| pointers.
+ other->last_child_ = last_child_;
+ last_child_ = other_last_child_;
}
} // namespace internal
@@ -64,17 +153,26 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
void Shutdown();
- void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
+ // Schedule running of tasks in |task_graph|. All tasks previously
+ // scheduled but not present in |task_graph| will be canceled unless
+ // already running. Canceled tasks are moved to |completed_tasks_|
+ // without being run. The result is that once scheduled, a task is
+ // guaranteed to end up in the |completed_tasks_| queue even if they
+ // later get canceled by another call to ScheduleTasks().
+ void ScheduleTasks(internal::WorkerPoolTaskGraph* task_graph);
- // Appends all completed tasks to worker pool's completed tasks queue
- // and returns true if idle.
- bool CollectCompletedTasks();
+ // Collect all completed tasks in |completed_tasks|. Returns true if idle.
+ bool CollectCompletedTasks(TaskDeque* completed_tasks);
private:
- // Appends all completed tasks to |completed_tasks|. Lock must
- // already be acquired before calling this function.
- bool AppendCompletedTasksWithLockAcquired(
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks);
+ // Find a task that is ready to run by traversing the dependency graph.
+ // Lock must be acquired before calling this function.
+ internal::WorkerPoolTask* FindTaskThatIsReadyToRunWithLockAcquired();
+
+ // Collect all completed tasks by swapping the contents of
+ // |completed_tasks| and |completed_tasks_|. Lock must be acquired
+ // before calling this function. Returns true if idle.
+ bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
// Schedule an OnIdleOnOriginThread callback if not already pending.
// Lock must already be acquired before calling this function.
@@ -110,15 +208,21 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
// loop index is 0.
unsigned next_thread_index_;
- // Number of tasks currently running.
- unsigned running_task_count_;
-
// Set during shutdown. Tells workers to exit when no more tasks
// are pending.
bool shutdown_;
- typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque;
- TaskDeque pending_tasks_;
+ // The dependency graph. Describes task dependencies and task priority.
+ internal::WorkerPoolTaskGraph task_graph_;
+
+ // This set contains all pending tasks.
+ typedef base::hash_set<internal::WorkerPoolTask*> TaskSet;
+ TaskSet pending_tasks_;
+
+ // This set contains all currently running tasks.
+ TaskSet running_tasks_;
+
+ // Completed tasks not yet collected by origin thread.
TaskDeque completed_tasks_;
ScopedPtrDeque<base::DelegateSimpleThread> workers_;
@@ -138,18 +242,17 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool,
weak_ptr_factory_.GetWeakPtr())),
on_idle_pending_(false),
next_thread_index_(0),
- running_task_count_(0),
shutdown_(false) {
base::AutoLock lock(lock_);
while (workers_.size() < num_threads) {
scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
new base::DelegateSimpleThread(
- this,
- thread_name_prefix +
- base::StringPrintf(
- "Worker%u",
- static_cast<unsigned>(workers_.size() + 1)).c_str()));
+ this,
+ thread_name_prefix +
+ base::StringPrintf(
+ "Worker%u",
+ static_cast<unsigned>(workers_.size() + 1)).c_str()));
worker->Start();
workers_.push_back(worker.Pass());
}
@@ -160,12 +263,9 @@ WorkerPool::Inner::~Inner() {
DCHECK(shutdown_);
- // Cancel all pending callbacks.
- weak_ptr_factory_.InvalidateWeakPtrs();
-
DCHECK_EQ(0u, pending_tasks_.size());
+ DCHECK_EQ(0u, running_tasks_.size());
DCHECK_EQ(0u, completed_tasks_.size());
- DCHECK_EQ(0u, running_task_count_);
}
void WorkerPool::Inner::Shutdown() {
@@ -184,32 +284,111 @@ void WorkerPool::Inner::Shutdown() {
scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
worker->Join();
}
+
+ // Cancel any pending OnIdle callback.
+ weak_ptr_factory_.InvalidateWeakPtrs();
}
-void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
- base::AutoLock lock(lock_);
+void WorkerPool::Inner::ScheduleTasks(
+ internal::WorkerPoolTaskGraph* task_graph) {
+ // Move all dependencies to |new_graph|.
+ internal::WorkerPoolTaskGraph new_graph;
+ new_graph.Swap(task_graph);
+
+ TaskSet tasks;
+ // Traverse task graph to build new pending task set.
+ for (internal::WorkerPoolTaskGraph::Iterator it(&new_graph); it; ++it) {
+ internal::WorkerPoolTask* task = it->task();
+ task->DidSchedule();
+ tasks.insert(task);
+ }
+
+ {
+ base::AutoLock lock(lock_);
- pending_tasks_.push_back(task.Pass());
+ // Move tasks not present in |new_graph| to |completed_tasks_|.
+ for (TaskSet::iterator it = pending_tasks_.begin();
+ it != pending_tasks_.end(); ++it) {
+ internal::WorkerPoolTask* task = *it;
- // There is more work available, so wake up worker thread.
- has_pending_tasks_cv_.Signal();
+ // Task has completed if not present in |new_graph|.
+ if (tasks.find(task) == tasks.end())
+ completed_tasks_.push_back(task);
+ }
+
+ // Remove all running tasks from new pending task set.
+ for (TaskSet::iterator it = running_tasks_.begin();
+ it != running_tasks_.end(); ++it) {
+ internal::WorkerPoolTask* task = *it;
+
+ if (tasks.find(task) != tasks.end())
+ tasks.erase(task);
+ }
+
+ // And remove all completed tasks from new pending task set.
+ for (TaskDeque::iterator it = completed_tasks_.begin();
+ it != completed_tasks_.end(); ++it) {
+ internal::WorkerPoolTask* task = *it;
+
+ if (tasks.find(task) != tasks.end())
+ tasks.erase(task);
+ }
+
+ // Swap task graphs.
+ // Note: old tasks are intentionally destroyed after releasing |lock_|.
+ task_graph_.Swap(&new_graph);
+
+ // Swap pending task sets.
+ pending_tasks_.swap(tasks);
+
+ // There is more work available, so wake up worker thread.
+ has_pending_tasks_cv_.Signal();
+ }
}
-bool WorkerPool::Inner::CollectCompletedTasks() {
+bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
base::AutoLock lock(lock_);
- return AppendCompletedTasksWithLockAcquired(
- &worker_pool_on_origin_thread_->completed_tasks_);
+ return CollectCompletedTasksWithLockAcquired(completed_tasks);
}
-bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
+internal::WorkerPoolTask*
+ WorkerPool::Inner::FindTaskThatIsReadyToRunWithLockAcquired() {
lock_.AssertAcquired();
- while (completed_tasks_.size())
- completed_tasks->push_back(completed_tasks_.take_front().Pass());
+ for (internal::WorkerPoolTaskGraph::Iterator it(&task_graph_); it; ++it) {
+ internal::WorkerPoolTask* task = it->task();
- return !running_task_count_ && pending_tasks_.empty();
+ // Skip task if not present in |pending_tasks_|. These tasks are
+ // either already running or have finished.
+ // Note: Skip traversal of sub-tree if this function becomes too slow.
+ if (pending_tasks_.find(task) == pending_tasks_.end())
+ continue;
+
+ // Task is ready to run if all direct children have finished running.
+ bool all_dependencies_are_satisfied = true;
+
+ for (internal::WorkerPoolTaskGraph::TaskIterator task_it(*it);
+ task_it && all_dependencies_are_satisfied;
+ ++task_it) {
+ all_dependencies_are_satisfied &= task_it->HasFinished();
+ }
+
+ if (all_dependencies_are_satisfied)
+ return task;
+ }
+
+ return NULL;
+}
+
+bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
+ TaskDeque* completed_tasks) {
+ lock_.AssertAcquired();
+
+ DCHECK_EQ(0u, completed_tasks->size());
+ completed_tasks->swap(completed_tasks_);
+
+ return running_tasks_.empty() && pending_tasks_.empty();
}
void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
@@ -222,6 +401,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
}
void WorkerPool::Inner::OnIdleOnOriginThread() {
+ TaskDeque completed_tasks;
+
{
base::AutoLock lock(lock_);
@@ -229,14 +410,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() {
on_idle_pending_ = false;
// Early out if no longer idle.
- if (running_task_count_ || !pending_tasks_.empty())
+ if (!running_tasks_.empty() || !pending_tasks_.empty())
return;
- AppendCompletedTasksWithLockAcquired(
- &worker_pool_on_origin_thread_->completed_tasks_);
+ CollectCompletedTasksWithLockAcquired(&completed_tasks);
}
- worker_pool_on_origin_thread_->OnIdle();
+ worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
}
void WorkerPool::Inner::Run() {
@@ -252,40 +432,53 @@ void WorkerPool::Inner::Run() {
int thread_index = next_thread_index_++;
while (true) {
- if (pending_tasks_.empty()) {
- // Exit when shutdown is set and no more tasks are pending.
- if (shutdown_)
- break;
-
- // Schedule an idle callback if requested and not pending.
- if (!running_task_count_)
- ScheduleOnIdleWithLockAcquired();
-
- // Wait for new pending tasks.
+ scoped_refptr<internal::WorkerPoolTask> task(
+ FindTaskThatIsReadyToRunWithLockAcquired());
+ if (!task) {
+ if (pending_tasks_.empty()) {
+ // Exit when shutdown is set and no more tasks are pending.
+ if (shutdown_)
+ break;
+
+ // Schedule an idle callback if no tasks are running.
+ if (running_tasks_.empty())
+ ScheduleOnIdleWithLockAcquired();
+ }
+
+ // Wait for more tasks.
has_pending_tasks_cv_.Wait();
continue;
}
- // Get next task.
- scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
+ // First remove task from |pending_tasks_|.
+ DCHECK(pending_tasks_.find(task) != pending_tasks_.end());
+ pending_tasks_.erase(task);
- // Increment |running_task_count_| before starting to run task.
- running_task_count_++;
+ // Insert task in |running_tasks_| before starting to run it.
+ DCHECK(running_tasks_.find(task) == running_tasks_.end());
+ running_tasks_.insert(task);
- // There may be more work available, so wake up another
- // worker thread.
+ // There may be more work available, so wake up another worker thread.
has_pending_tasks_cv_.Signal();
+ // Call WillRun() before releasing |lock_| and running task.
+ task->WillRun();
+
{
base::AutoUnlock unlock(lock_);
task->RunOnThread(thread_index);
}
- completed_tasks_.push_back(task.Pass());
+ // This will mark task as finished.
+ task->DidRun();
+
+ // Remove task from |running_tasks_| now that we are done running it.
+ DCHECK(running_tasks_.find(task) != running_tasks_.end());
+ running_tasks_.erase(task);
- // Decrement |running_task_count_| now that we are done running task.
- running_task_count_--;
+ // Finally add task to |completed_tasks_|.
+ completed_tasks_.push_back(task);
}
// We noticed we should exit. Wake up the next worker so it knows it should
@@ -298,7 +491,6 @@ WorkerPool::WorkerPool(size_t num_threads,
const std::string& thread_name_prefix)
: client_(NULL),
origin_loop_(base::MessageLoopProxy::current()),
- weak_ptr_factory_(this),
check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
check_for_completed_tasks_pending_(false),
inner_(make_scoped_ptr(new Inner(this,
@@ -307,38 +499,35 @@ WorkerPool::WorkerPool(size_t num_threads,
}
WorkerPool::~WorkerPool() {
- // Cancel all pending callbacks.
- weak_ptr_factory_.InvalidateWeakPtrs();
-
- DCHECK_EQ(0u, completed_tasks_.size());
}
void WorkerPool::Shutdown() {
inner_->Shutdown();
- inner_->CollectCompletedTasks();
- DispatchCompletionCallbacks();
-}
-void WorkerPool::PostTaskAndReply(
- const Callback& task, const base::Closure& reply) {
- PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
- task,
- reply)).PassAs<internal::WorkerPoolTask>());
+ TaskDeque completed_tasks;
+ inner_->CollectCompletedTasks(&completed_tasks);
+ DispatchCompletionCallbacks(&completed_tasks);
}
-void WorkerPool::OnIdle() {
+void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
TRACE_EVENT0("cc", "WorkerPool::OnIdle");
- DispatchCompletionCallbacks();
+ DispatchCompletionCallbacks(completed_tasks);
+
+ // Cancel any pending check for completed tasks.
+ check_for_completed_tasks_callback_.Cancel();
+ check_for_completed_tasks_pending_ = false;
}
void WorkerPool::ScheduleCheckForCompletedTasks() {
if (check_for_completed_tasks_pending_)
return;
+ check_for_completed_tasks_callback_.Reset(
+ base::Bind(&WorkerPool::CheckForCompletedTasks,
+ base::Unretained(this)));
origin_loop_->PostDelayedTask(
FROM_HERE,
- base::Bind(&WorkerPool::CheckForCompletedTasks,
- weak_ptr_factory_.GetWeakPtr()),
+ check_for_completed_tasks_callback_.callback(),
check_for_completed_tasks_delay_);
check_for_completed_tasks_pending_ = true;
}
@@ -348,33 +537,42 @@ void WorkerPool::CheckForCompletedTasks() {
DCHECK(check_for_completed_tasks_pending_);
check_for_completed_tasks_pending_ = false;
+ TaskDeque completed_tasks;
+
// Schedule another check for completed tasks if not idle.
- if (!inner_->CollectCompletedTasks())
+ if (!inner_->CollectCompletedTasks(&completed_tasks))
ScheduleCheckForCompletedTasks();
- DispatchCompletionCallbacks();
+ DispatchCompletionCallbacks(&completed_tasks);
}
-void WorkerPool::DispatchCompletionCallbacks() {
+void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
- if (completed_tasks_.empty())
+ // Early out when |completed_tasks| is empty to prevent unnecessary
+ // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
+ if (completed_tasks->empty())
return;
- while (completed_tasks_.size()) {
- scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front();
+ while (!completed_tasks->empty()) {
+ scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
+ completed_tasks->pop_front();
task->DidComplete();
+ task->DispatchCompletionCallback();
}
DCHECK(client_);
client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
}
-void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
- // Schedule check for completed tasks if not pending.
- ScheduleCheckForCompletedTasks();
+void WorkerPool::ScheduleTasks(internal::WorkerPoolTaskGraph* task_graph) {
+ TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks");
+
+ // Schedule check for completed tasks if graph is non-empty.
+ if (internal::WorkerPoolTaskDependency::Iterator(task_graph))
+ ScheduleCheckForCompletedTasks();
- inner_->PostTask(task.Pass());
+ inner_->ScheduleTasks(task_graph);
}
} // namespace cc

Powered by Google App Engine
This is Rietveld 408576698