Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2366)

Unified Diff: cc/base/worker_pool.cc

Issue 14689004: Re-land: cc: Cancel and re-prioritize worker pool tasks. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: move alignment check to RP Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: cc/base/worker_pool.cc
diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc
index be9abf9b87d33e6c0b3fc2d473864568a84070fd..85f3c9ae03507b758ed1a63d017b932e72e4d650 100644
--- a/cc/base/worker_pool.cc
+++ b/cc/base/worker_pool.cc
@@ -9,49 +9,44 @@
#include <sys/resource.h>
#endif
-#include <algorithm>
+#include <set>
#include "base/bind.h"
#include "base/debug/trace_event.h"
#include "base/stringprintf.h"
#include "base/synchronization/condition_variable.h"
#include "base/threading/simple_thread.h"
+#include "cc/base/scoped_ptr_deque.h"
namespace cc {
namespace {
-class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
+class WorkerPoolTaskGraphImpl : public internal::WorkerPoolTaskGraph {
public:
- WorkerPoolTaskImpl(const WorkerPool::Callback& task,
- const base::Closure& reply)
- : internal::WorkerPoolTask(reply),
- task_(task) {}
+ WorkerPoolTaskGraphImpl() {}
- virtual void RunOnThread(unsigned thread_index) OVERRIDE {
- task_.Run();
+ // Overridden from internal::WorkerPoolTaskGraph:
+ virtual bool HasMoreTasks() OVERRIDE { return false; }
+ virtual bool HasTask(internal::WorkerPoolTask* task) OVERRIDE {
+ return false;
+ }
+ virtual internal::WorkerPoolTask* TopTask() OVERRIDE {
+ NOTREACHED();
+ return NULL;
+ }
+ virtual scoped_refptr<internal::WorkerPoolTask> TakeTask(
+ internal::WorkerPoolTask* task) OVERRIDE {
+ NOTREACHED();
+ return NULL;
}
private:
- WorkerPool::Callback task_;
+ virtual ~WorkerPoolTaskGraphImpl() {}
};
} // namespace
-namespace internal {
-
-WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
-}
-
-WorkerPoolTask::~WorkerPoolTask() {
-}
-
-void WorkerPoolTask::DidComplete() {
- reply_.Run();
-}
-
-} // namespace internal
-
// Internal to the worker pool. Any data or logic that needs to be
// shared between threads lives in this class. All members are guarded
// by |lock_|.
@@ -64,17 +59,22 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
void Shutdown();
- void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
+ // Schedule running of tasks in |task_graph|. All tasks previously
+ // scheduled but not present in |task_graph| will be canceled unless
+ // already running. Canceled tasks are moved to |completed_tasks_|
+ // without being run. The result is that once scheduled, a task is
+ // guaranteed to end up in the |completed_tasks_| queue even if they
+ // later get canceled by another call to ScheduleTasks().
+ void ScheduleTasks(scoped_ptr<internal::WorkerPoolTaskGraph> task_graph);
- // Appends all completed tasks to worker pool's completed tasks queue
- // and returns true if idle.
- bool CollectCompletedTasks();
+ // Collect all completed tasks in |completed_tasks|. Returns true if idle.
+ bool CollectCompletedTasks(TaskDeque* completed_tasks);
private:
- // Appends all completed tasks to |completed_tasks|. Lock must
- // already be acquired before calling this function.
- bool AppendCompletedTasksWithLockAcquired(
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks);
+ // Collect all completed tasks by swapping the contents of
+ // |completed_tasks| and |completed_tasks_|. Lock must be acquired
+ // before calling this function. Returns true if idle.
+ bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
// Schedule an OnIdleOnOriginThread callback if not already pending.
// Lock must already be acquired before calling this function.
@@ -110,15 +110,18 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
// loop index is 0.
unsigned next_thread_index_;
- // Number of tasks currently running.
- unsigned running_task_count_;
-
// Set during shutdown. Tells workers to exit when no more tasks
// are pending.
bool shutdown_;
- typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque;
- TaskDeque pending_tasks_;
+ // The task graph. Provides tasks in order of priority.
+ scoped_ptr<internal::WorkerPoolTaskGraph> task_graph_;
+
+ // This set contains all currently running tasks.
+ typedef std::set<internal::WorkerPoolTask*> TaskSet;
+ TaskSet running_tasks_;
+
+ // Completed tasks not yet collected by origin thread.
TaskDeque completed_tasks_;
ScopedPtrDeque<base::DelegateSimpleThread> workers_;
@@ -138,8 +141,8 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool,
weak_ptr_factory_.GetWeakPtr())),
on_idle_pending_(false),
next_thread_index_(0),
- running_task_count_(0),
- shutdown_(false) {
+ shutdown_(false),
+ task_graph_(new WorkerPoolTaskGraphImpl) {
base::AutoLock lock(lock_);
while (workers_.size() < num_threads) {
@@ -160,12 +163,9 @@ WorkerPool::Inner::~Inner() {
DCHECK(shutdown_);
- // Cancel all pending callbacks.
- weak_ptr_factory_.InvalidateWeakPtrs();
-
- DCHECK_EQ(0u, pending_tasks_.size());
+ DCHECK(!task_graph_->HasMoreTasks());
+ DCHECK_EQ(0u, running_tasks_.size());
DCHECK_EQ(0u, completed_tasks_.size());
- DCHECK_EQ(0u, running_task_count_);
}
void WorkerPool::Inner::Shutdown() {
@@ -184,32 +184,62 @@ void WorkerPool::Inner::Shutdown() {
scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
worker->Join();
}
+
+ // Cancel any pending OnIdle callback.
+ weak_ptr_factory_.InvalidateWeakPtrs();
}
-void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
+void WorkerPool::Inner::ScheduleTasks(
+ scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) {
base::AutoLock lock(lock_);
- pending_tasks_.push_back(task.Pass());
+ // Move tasks not present in new graph to |completed_tasks_|.
+ while (task_graph_->HasMoreTasks()) {
+ scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask(
+ task_graph_->TopTask());
+
+ // Task has not completed if present in new graph.
+ if (task_graph->HasTask(task))
+ continue;
+
+ completed_tasks_.push_back(task);
+ }
+
+ // Take any running tasks from new graph.
+ for (TaskSet::iterator it = running_tasks_.begin();
+ it != running_tasks_.end(); ++it) {
+ if (task_graph->HasTask(*it))
+ task_graph->TakeTask(*it);
+ }
+
+ // And take any completed tasks from new graph.
+ for (TaskDeque::iterator it = completed_tasks_.begin();
+ it != completed_tasks_.end(); ++it) {
+ if (task_graph->HasTask(*it))
+ task_graph->TakeTask(*it);
+ }
+
+ // Finally switch to the new graph.
+ task_graph_ = task_graph.Pass();
// There is more work available, so wake up worker thread.
has_pending_tasks_cv_.Signal();
}
-bool WorkerPool::Inner::CollectCompletedTasks() {
+bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
base::AutoLock lock(lock_);
- return AppendCompletedTasksWithLockAcquired(
- &worker_pool_on_origin_thread_->completed_tasks_);
+ return CollectCompletedTasksWithLockAcquired(completed_tasks);
}
-bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
+bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
+ TaskDeque* completed_tasks) {
lock_.AssertAcquired();
- while (completed_tasks_.size())
- completed_tasks->push_back(completed_tasks_.take_front().Pass());
+ DCHECK_EQ(0u, completed_tasks->size());
+ completed_tasks->swap(completed_tasks_);
- return !running_task_count_ && pending_tasks_.empty();
+ return running_tasks_.empty() && !task_graph_->HasMoreTasks();
}
void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
@@ -222,6 +252,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
}
void WorkerPool::Inner::OnIdleOnOriginThread() {
+ TaskDeque completed_tasks;
+
{
base::AutoLock lock(lock_);
@@ -229,14 +261,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() {
on_idle_pending_ = false;
// Early out if no longer idle.
- if (running_task_count_ || !pending_tasks_.empty())
+ if (!running_tasks_.empty() || task_graph_->HasMoreTasks())
return;
- AppendCompletedTasksWithLockAcquired(
- &worker_pool_on_origin_thread_->completed_tasks_);
+ CollectCompletedTasksWithLockAcquired(&completed_tasks);
}
- worker_pool_on_origin_thread_->OnIdle();
+ worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
}
void WorkerPool::Inner::Run() {
@@ -252,28 +283,28 @@ void WorkerPool::Inner::Run() {
int thread_index = next_thread_index_++;
while (true) {
- if (pending_tasks_.empty()) {
+ if (!task_graph_->HasMoreTasks()) {
// Exit when shutdown is set and no more tasks are pending.
if (shutdown_)
vmpstr 2013/05/06 23:08:48 Now that task_graph_ contains all work that needs
reveman 2013/05/07 01:33:54 Yes, we could cancel all pending tasks at shutdown
break;
- // Schedule an idle callback if requested and not pending.
- if (!running_task_count_)
+ // Schedule an idle callback if not tasks are running.
vmpstr 2013/05/06 23:08:48 nit: not -> no
reveman 2013/05/07 01:33:54 Done.
+ if (running_tasks_.empty())
ScheduleOnIdleWithLockAcquired();
- // Wait for new pending tasks.
+ // Wait for more tasks.
has_pending_tasks_cv_.Wait();
continue;
}
- // Get next task.
- scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
+ // Take the highest priority task from the task graph.
+ scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask(
+ task_graph_->TopTask());
- // Increment |running_task_count_| before starting to run task.
- running_task_count_++;
+ // Insert task in |running_tasks_| before starting to run it.
+ running_tasks_.insert(task);
- // There may be more work available, so wake up another
- // worker thread.
+ // There may be more work available, so wake up another worker thread.
has_pending_tasks_cv_.Signal();
{
@@ -282,10 +313,11 @@ void WorkerPool::Inner::Run() {
task->RunOnThread(thread_index);
}
- completed_tasks_.push_back(task.Pass());
+ // Remove task from |running_tasks_| now that we are done running it.
+ running_tasks_.erase(task);
- // Decrement |running_task_count_| now that we are done running task.
- running_task_count_--;
+ // Finally add task to |completed_tasks_|.
+ completed_tasks_.push_back(task);
}
// We noticed we should exit. Wake up the next worker so it knows it should
@@ -299,7 +331,6 @@ WorkerPool::WorkerPool(WorkerPoolClient* client,
const std::string& thread_name_prefix)
: client_(client),
origin_loop_(base::MessageLoopProxy::current()),
- weak_ptr_factory_(this),
check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
check_for_completed_tasks_pending_(false),
inner_(make_scoped_ptr(new Inner(this,
@@ -308,38 +339,35 @@ WorkerPool::WorkerPool(WorkerPoolClient* client,
}
WorkerPool::~WorkerPool() {
- // Cancel all pending callbacks.
- weak_ptr_factory_.InvalidateWeakPtrs();
-
- DCHECK_EQ(0u, completed_tasks_.size());
}
void WorkerPool::Shutdown() {
inner_->Shutdown();
- inner_->CollectCompletedTasks();
- DispatchCompletionCallbacks();
-}
-void WorkerPool::PostTaskAndReply(
- const Callback& task, const base::Closure& reply) {
- PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
- task,
- reply)).PassAs<internal::WorkerPoolTask>());
+ TaskDeque completed_tasks;
+ inner_->CollectCompletedTasks(&completed_tasks);
+ DispatchCompletionCallbacks(&completed_tasks);
}
-void WorkerPool::OnIdle() {
+void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
TRACE_EVENT0("cc", "WorkerPool::OnIdle");
- DispatchCompletionCallbacks();
+ DispatchCompletionCallbacks(completed_tasks);
+
+ // Cancel any pending check for completed tasks.
+ check_for_completed_tasks_callback_.Cancel();
+ check_for_completed_tasks_pending_ = false;
}
void WorkerPool::ScheduleCheckForCompletedTasks() {
if (check_for_completed_tasks_pending_)
return;
+ check_for_completed_tasks_callback_.Reset(
+ base::Bind(&WorkerPool::CheckForCompletedTasks,
+ base::Unretained(this)));
origin_loop_->PostDelayedTask(
FROM_HERE,
- base::Bind(&WorkerPool::CheckForCompletedTasks,
- weak_ptr_factory_.GetWeakPtr()),
+ check_for_completed_tasks_callback_.callback(),
check_for_completed_tasks_delay_);
check_for_completed_tasks_pending_ = true;
}
@@ -349,32 +377,37 @@ void WorkerPool::CheckForCompletedTasks() {
DCHECK(check_for_completed_tasks_pending_);
check_for_completed_tasks_pending_ = false;
+ TaskDeque completed_tasks;
+
// Schedule another check for completed tasks if not idle.
- if (!inner_->CollectCompletedTasks())
+ if (!inner_->CollectCompletedTasks(&completed_tasks))
ScheduleCheckForCompletedTasks();
- DispatchCompletionCallbacks();
+ DispatchCompletionCallbacks(&completed_tasks);
}
-void WorkerPool::DispatchCompletionCallbacks() {
+void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
- if (completed_tasks_.empty())
+ // Early out when |completed_tasks| is empty to prevent unnecessary
+ // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
+ if (completed_tasks->empty())
return;
- while (completed_tasks_.size()) {
- scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front();
- task->DidComplete();
+ while (completed_tasks->size()) {
vmpstr 2013/05/06 23:08:48 nit: consider !empty
reveman 2013/05/07 01:33:54 Done.
+ scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
+ completed_tasks->pop_front();
+ task->DispatchCompletionCallback();
}
client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
}
-void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
- // Schedule check for completed tasks if not pending.
- ScheduleCheckForCompletedTasks();
-
- inner_->PostTask(task.Pass());
+void WorkerPool::ScheduleTasks(
+ scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) {
+ if (task_graph->HasMoreTasks())
+ ScheduleCheckForCompletedTasks();
+ inner_->ScheduleTasks(task_graph.Pass());
}
} // namespace cc

Powered by Google App Engine
This is Rietveld 408576698