Index: cc/base/worker_pool.cc |
diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc |
index be9abf9b87d33e6c0b3fc2d473864568a84070fd..85f3c9ae03507b758ed1a63d017b932e72e4d650 100644 |
--- a/cc/base/worker_pool.cc |
+++ b/cc/base/worker_pool.cc |
@@ -9,49 +9,44 @@ |
#include <sys/resource.h> |
#endif |
-#include <algorithm> |
+#include <set> |
#include "base/bind.h" |
#include "base/debug/trace_event.h" |
#include "base/stringprintf.h" |
#include "base/synchronization/condition_variable.h" |
#include "base/threading/simple_thread.h" |
+#include "cc/base/scoped_ptr_deque.h" |
namespace cc { |
namespace { |
-class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
+class WorkerPoolTaskGraphImpl : public internal::WorkerPoolTaskGraph { |
public: |
- WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
- const base::Closure& reply) |
- : internal::WorkerPoolTask(reply), |
- task_(task) {} |
+ WorkerPoolTaskGraphImpl() {} |
- virtual void RunOnThread(unsigned thread_index) OVERRIDE { |
- task_.Run(); |
+ // Overridden from internal::WorkerPoolTaskGraph: |
+ virtual bool HasMoreTasks() OVERRIDE { return false; } |
+ virtual bool HasTask(internal::WorkerPoolTask* task) OVERRIDE { |
+ return false; |
+ } |
+ virtual internal::WorkerPoolTask* TopTask() OVERRIDE { |
+ NOTREACHED(); |
+ return NULL; |
+ } |
+ virtual scoped_refptr<internal::WorkerPoolTask> TakeTask( |
+ internal::WorkerPoolTask* task) OVERRIDE { |
+ NOTREACHED(); |
+ return NULL; |
} |
private: |
- WorkerPool::Callback task_; |
+ virtual ~WorkerPoolTaskGraphImpl() {} |
}; |
} // namespace |
-namespace internal { |
- |
-WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { |
-} |
- |
-WorkerPoolTask::~WorkerPoolTask() { |
-} |
- |
-void WorkerPoolTask::DidComplete() { |
- reply_.Run(); |
-} |
- |
-} // namespace internal |
- |
// Internal to the worker pool. Any data or logic that needs to be |
// shared between threads lives in this class. All members are guarded |
// by |lock_|. |
@@ -64,17 +59,22 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
void Shutdown(); |
- void PostTask(scoped_ptr<internal::WorkerPoolTask> task); |
+ // Schedule running of tasks in |task_graph|. All tasks previously |
+ // scheduled but not present in |task_graph| will be canceled unless |
+ // already running. Canceled tasks are moved to |completed_tasks_| |
+ // without being run. The result is that once scheduled, a task is |
+ // guaranteed to end up in the |completed_tasks_| queue even if they |
+ // later get canceled by another call to ScheduleTasks(). |
+ void ScheduleTasks(scoped_ptr<internal::WorkerPoolTaskGraph> task_graph); |
- // Appends all completed tasks to worker pool's completed tasks queue |
- // and returns true if idle. |
- bool CollectCompletedTasks(); |
+ // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
+ bool CollectCompletedTasks(TaskDeque* completed_tasks); |
private: |
- // Appends all completed tasks to |completed_tasks|. Lock must |
- // already be acquired before calling this function. |
- bool AppendCompletedTasksWithLockAcquired( |
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); |
+ // Collect all completed tasks by swapping the contents of |
+ // |completed_tasks| and |completed_tasks_|. Lock must be acquired |
+ // before calling this function. Returns true if idle. |
+ bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); |
// Schedule an OnIdleOnOriginThread callback if not already pending. |
// Lock must already be acquired before calling this function. |
@@ -110,15 +110,18 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
// loop index is 0. |
unsigned next_thread_index_; |
- // Number of tasks currently running. |
- unsigned running_task_count_; |
- |
// Set during shutdown. Tells workers to exit when no more tasks |
// are pending. |
bool shutdown_; |
- typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; |
- TaskDeque pending_tasks_; |
+ // The task graph. Provides tasks in order of priority. |
+ scoped_ptr<internal::WorkerPoolTaskGraph> task_graph_; |
+ |
+ // This set contains all currently running tasks. |
+ typedef std::set<internal::WorkerPoolTask*> TaskSet; |
+ TaskSet running_tasks_; |
+ |
+ // Completed tasks not yet collected by origin thread. |
TaskDeque completed_tasks_; |
ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
@@ -138,8 +141,8 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
weak_ptr_factory_.GetWeakPtr())), |
on_idle_pending_(false), |
next_thread_index_(0), |
- running_task_count_(0), |
- shutdown_(false) { |
+ shutdown_(false), |
+ task_graph_(new WorkerPoolTaskGraphImpl) { |
base::AutoLock lock(lock_); |
while (workers_.size() < num_threads) { |
@@ -160,12 +163,9 @@ WorkerPool::Inner::~Inner() { |
DCHECK(shutdown_); |
- // Cancel all pending callbacks. |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
- |
- DCHECK_EQ(0u, pending_tasks_.size()); |
+ DCHECK(!task_graph_->HasMoreTasks()); |
+ DCHECK_EQ(0u, running_tasks_.size()); |
DCHECK_EQ(0u, completed_tasks_.size()); |
- DCHECK_EQ(0u, running_task_count_); |
} |
void WorkerPool::Inner::Shutdown() { |
@@ -184,32 +184,62 @@ void WorkerPool::Inner::Shutdown() { |
scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
worker->Join(); |
} |
+ |
+ // Cancel any pending OnIdle callback. |
+ weak_ptr_factory_.InvalidateWeakPtrs(); |
} |
-void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
+void WorkerPool::Inner::ScheduleTasks( |
+ scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) { |
base::AutoLock lock(lock_); |
- pending_tasks_.push_back(task.Pass()); |
+ // Move tasks not present in new graph to |completed_tasks_|. |
+ while (task_graph_->HasMoreTasks()) { |
+ scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask( |
+ task_graph_->TopTask()); |
+ |
+ // Task has not completed if present in new graph. |
+ if (task_graph->HasTask(task)) |
+ continue; |
+ |
+ completed_tasks_.push_back(task); |
+ } |
+ |
+ // Take any running tasks from new graph. |
+ for (TaskSet::iterator it = running_tasks_.begin(); |
+ it != running_tasks_.end(); ++it) { |
+ if (task_graph->HasTask(*it)) |
+ task_graph->TakeTask(*it); |
+ } |
+ |
+ // And take any completed tasks from new graph. |
+ for (TaskDeque::iterator it = completed_tasks_.begin(); |
+ it != completed_tasks_.end(); ++it) { |
+ if (task_graph->HasTask(*it)) |
+ task_graph->TakeTask(*it); |
+ } |
+ |
+ // Finally switch to the new graph. |
+ task_graph_ = task_graph.Pass(); |
// There is more work available, so wake up worker thread. |
has_pending_tasks_cv_.Signal(); |
} |
-bool WorkerPool::Inner::CollectCompletedTasks() { |
+bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { |
base::AutoLock lock(lock_); |
- return AppendCompletedTasksWithLockAcquired( |
- &worker_pool_on_origin_thread_->completed_tasks_); |
+ return CollectCompletedTasksWithLockAcquired(completed_tasks); |
} |
-bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( |
- ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { |
+bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
+ TaskDeque* completed_tasks) { |
lock_.AssertAcquired(); |
- while (completed_tasks_.size()) |
- completed_tasks->push_back(completed_tasks_.take_front().Pass()); |
+ DCHECK_EQ(0u, completed_tasks->size()); |
+ completed_tasks->swap(completed_tasks_); |
- return !running_task_count_ && pending_tasks_.empty(); |
+ return running_tasks_.empty() && !task_graph_->HasMoreTasks(); |
} |
void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
@@ -222,6 +252,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
} |
void WorkerPool::Inner::OnIdleOnOriginThread() { |
+ TaskDeque completed_tasks; |
+ |
{ |
base::AutoLock lock(lock_); |
@@ -229,14 +261,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() { |
on_idle_pending_ = false; |
// Early out if no longer idle. |
- if (running_task_count_ || !pending_tasks_.empty()) |
+ if (!running_tasks_.empty() || task_graph_->HasMoreTasks()) |
return; |
- AppendCompletedTasksWithLockAcquired( |
- &worker_pool_on_origin_thread_->completed_tasks_); |
+ CollectCompletedTasksWithLockAcquired(&completed_tasks); |
} |
- worker_pool_on_origin_thread_->OnIdle(); |
+ worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
} |
void WorkerPool::Inner::Run() { |
@@ -252,28 +283,28 @@ void WorkerPool::Inner::Run() { |
int thread_index = next_thread_index_++; |
while (true) { |
- if (pending_tasks_.empty()) { |
+ if (!task_graph_->HasMoreTasks()) { |
// Exit when shutdown is set and no more tasks are pending. |
if (shutdown_) |
vmpstr
2013/05/06 23:08:48
Now that task_graph_ contains all work that needs
reveman
2013/05/07 01:33:54
Yes, we could cancel all pending tasks at shutdown
|
break; |
- // Schedule an idle callback if requested and not pending. |
- if (!running_task_count_) |
+ // Schedule an idle callback if not tasks are running. |
vmpstr
2013/05/06 23:08:48
nit: not -> no
reveman
2013/05/07 01:33:54
Done.
|
+ if (running_tasks_.empty()) |
ScheduleOnIdleWithLockAcquired(); |
- // Wait for new pending tasks. |
+ // Wait for more tasks. |
has_pending_tasks_cv_.Wait(); |
continue; |
} |
- // Get next task. |
- scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
+ // Take the highest priority task from the task graph. |
+ scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask( |
+ task_graph_->TopTask()); |
- // Increment |running_task_count_| before starting to run task. |
- running_task_count_++; |
+ // Insert task in |running_tasks_| before starting to run it. |
+ running_tasks_.insert(task); |
- // There may be more work available, so wake up another |
- // worker thread. |
+ // There may be more work available, so wake up another worker thread. |
has_pending_tasks_cv_.Signal(); |
{ |
@@ -282,10 +313,11 @@ void WorkerPool::Inner::Run() { |
task->RunOnThread(thread_index); |
} |
- completed_tasks_.push_back(task.Pass()); |
+ // Remove task from |running_tasks_| now that we are done running it. |
+ running_tasks_.erase(task); |
- // Decrement |running_task_count_| now that we are done running task. |
- running_task_count_--; |
+ // Finally add task to |completed_tasks_|. |
+ completed_tasks_.push_back(task); |
} |
// We noticed we should exit. Wake up the next worker so it knows it should |
@@ -299,7 +331,6 @@ WorkerPool::WorkerPool(WorkerPoolClient* client, |
const std::string& thread_name_prefix) |
: client_(client), |
origin_loop_(base::MessageLoopProxy::current()), |
- weak_ptr_factory_(this), |
check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
check_for_completed_tasks_pending_(false), |
inner_(make_scoped_ptr(new Inner(this, |
@@ -308,38 +339,35 @@ WorkerPool::WorkerPool(WorkerPoolClient* client, |
} |
WorkerPool::~WorkerPool() { |
- // Cancel all pending callbacks. |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
- |
- DCHECK_EQ(0u, completed_tasks_.size()); |
} |
void WorkerPool::Shutdown() { |
inner_->Shutdown(); |
- inner_->CollectCompletedTasks(); |
- DispatchCompletionCallbacks(); |
-} |
-void WorkerPool::PostTaskAndReply( |
- const Callback& task, const base::Closure& reply) { |
- PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( |
- task, |
- reply)).PassAs<internal::WorkerPoolTask>()); |
+ TaskDeque completed_tasks; |
+ inner_->CollectCompletedTasks(&completed_tasks); |
+ DispatchCompletionCallbacks(&completed_tasks); |
} |
-void WorkerPool::OnIdle() { |
+void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
- DispatchCompletionCallbacks(); |
+ DispatchCompletionCallbacks(completed_tasks); |
+ |
+ // Cancel any pending check for completed tasks. |
+ check_for_completed_tasks_callback_.Cancel(); |
+ check_for_completed_tasks_pending_ = false; |
} |
void WorkerPool::ScheduleCheckForCompletedTasks() { |
if (check_for_completed_tasks_pending_) |
return; |
+ check_for_completed_tasks_callback_.Reset( |
+ base::Bind(&WorkerPool::CheckForCompletedTasks, |
+ base::Unretained(this))); |
origin_loop_->PostDelayedTask( |
FROM_HERE, |
- base::Bind(&WorkerPool::CheckForCompletedTasks, |
- weak_ptr_factory_.GetWeakPtr()), |
+ check_for_completed_tasks_callback_.callback(), |
check_for_completed_tasks_delay_); |
check_for_completed_tasks_pending_ = true; |
} |
@@ -349,32 +377,37 @@ void WorkerPool::CheckForCompletedTasks() { |
DCHECK(check_for_completed_tasks_pending_); |
check_for_completed_tasks_pending_ = false; |
+ TaskDeque completed_tasks; |
+ |
// Schedule another check for completed tasks if not idle. |
- if (!inner_->CollectCompletedTasks()) |
+ if (!inner_->CollectCompletedTasks(&completed_tasks)) |
ScheduleCheckForCompletedTasks(); |
- DispatchCompletionCallbacks(); |
+ DispatchCompletionCallbacks(&completed_tasks); |
} |
-void WorkerPool::DispatchCompletionCallbacks() { |
+void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
- if (completed_tasks_.empty()) |
+ // Early out when |completed_tasks| is empty to prevent unnecessary |
+ // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). |
+ if (completed_tasks->empty()) |
return; |
- while (completed_tasks_.size()) { |
- scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); |
- task->DidComplete(); |
+ while (completed_tasks->size()) { |
vmpstr
2013/05/06 23:08:48
nit: consider !empty
reveman
2013/05/07 01:33:54
Done.
|
+ scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
+ completed_tasks->pop_front(); |
+ task->DispatchCompletionCallback(); |
} |
client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
} |
-void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
- // Schedule check for completed tasks if not pending. |
- ScheduleCheckForCompletedTasks(); |
- |
- inner_->PostTask(task.Pass()); |
+void WorkerPool::ScheduleTasks( |
+ scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) { |
+ if (task_graph->HasMoreTasks()) |
+ ScheduleCheckForCompletedTasks(); |
+ inner_->ScheduleTasks(task_graph.Pass()); |
} |
} // namespace cc |