Chromium Code Reviews| Index: cc/base/worker_pool.cc |
| diff --git a/cc/base/worker_pool.cc b/cc/base/worker_pool.cc |
| index cacfe9e0c018eeaa98b91cd8568f5c300842af06..5b8217e32baa47db09761d69db60e0764204ba78 100644 |
| --- a/cc/base/worker_pool.cc |
| +++ b/cc/base/worker_pool.cc |
| @@ -9,46 +9,88 @@ |
| #include <sys/resource.h> |
| #endif |
| -#include <algorithm> |
| +#include <map> |
| #include "base/bind.h" |
| #include "base/debug/trace_event.h" |
| +#include "base/hash_tables.h" |
| #include "base/stringprintf.h" |
| -#include "base/synchronization/condition_variable.h" |
| #include "base/threading/simple_thread.h" |
| #include "base/threading/thread_restrictions.h" |
| +#include "cc/base/scoped_ptr_deque.h" |
| +#include "cc/base/scoped_ptr_hash_map.h" |
| + |
| +#if defined(COMPILER_GCC) |
|
vmpstr
2013/05/24 17:01:37
Does this part just work with clang?
reveman
2013/05/24 18:53:48
yes, you'll find the same throughout the chromium
|
| +namespace BASE_HASH_NAMESPACE { |
| +template <> struct hash<cc::internal::WorkerPoolTask*> { |
| + size_t operator()(cc::internal::WorkerPoolTask* ptr) const { |
| + return hash<size_t>()(reinterpret_cast<size_t>(ptr)); |
| + } |
| +}; |
| +} // namespace BASE_HASH_NAMESPACE |
| +#endif // COMPILER |
| namespace cc { |
| -namespace { |
| - |
| -class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
| - public: |
| - WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
| - const base::Closure& reply) |
| - : internal::WorkerPoolTask(reply), |
| - task_(task) {} |
| +namespace internal { |
| - virtual void RunOnThread(unsigned thread_index) OVERRIDE { |
| - task_.Run(); |
| - } |
| +WorkerPoolTask::WorkerPoolTask() |
| + : did_schedule_(false), |
| + did_run_(false), |
| + did_complete_(false) { |
| +} |
| - private: |
| - WorkerPool::Callback task_; |
| -}; |
| +WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies) |
| + : did_schedule_(false), |
| + did_run_(false), |
| + did_complete_(false) { |
| + dependencies_.swap(*dependencies); |
| +} |
| -} // namespace |
| +WorkerPoolTask::~WorkerPoolTask() { |
| + DCHECK_EQ(did_schedule_, did_complete_); |
| + DCHECK(!did_run_ || did_schedule_); |
| + DCHECK(!did_run_ || did_complete_); |
| +} |
| -namespace internal { |
| +void WorkerPoolTask::DidSchedule() { |
| + DCHECK(!did_complete_); |
| + did_schedule_ = true; |
| +} |
| -WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { |
| +void WorkerPoolTask::WillRun() { |
| + DCHECK(did_schedule_); |
| + DCHECK(!did_complete_); |
| + DCHECK(!did_run_); |
| } |
| -WorkerPoolTask::~WorkerPoolTask() { |
| +void WorkerPoolTask::DidRun() { |
| + did_run_ = true; |
| } |
| void WorkerPoolTask::DidComplete() { |
| - reply_.Run(); |
| + DCHECK(did_schedule_); |
| + DCHECK(!did_complete_); |
| + did_complete_ = true; |
| +} |
| + |
| +bool WorkerPoolTask::IsReadyToRun() const { |
| + // TODO(reveman): Use counter to improve performance. |
| + for (TaskVector::const_reverse_iterator it = dependencies_.rbegin(); |
| + it != dependencies_.rend(); ++it) { |
| + WorkerPoolTask* dependency = *it; |
| + if (!dependency->HasFinishedRunning()) |
| + return false; |
| + } |
| + return true; |
| +} |
| + |
| +bool WorkerPoolTask::HasFinishedRunning() const { |
| + return did_run_; |
| +} |
| + |
| +bool WorkerPoolTask::HasCompleted() const { |
| + return did_complete_; |
| } |
| } // namespace internal |
| @@ -65,17 +107,51 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
| void Shutdown(); |
| - void PostTask(scoped_ptr<internal::WorkerPoolTask> task); |
| + // Schedule running of |root| task and all its dependencies. Tasks |
| + // previously scheduled but no longer needed to run |root| will be |
| + // canceled unless already running. Canceled tasks are moved to |
| + // |completed_tasks_| without being run. The result is that once |
| + // scheduled, a task is guaranteed to end up in the |completed_tasks_| |
| + // queue even if they later get canceled by another call to |
| + // ScheduleTasks(). |
| + void ScheduleTasks(internal::WorkerPoolTask* root); |
| - // Appends all completed tasks to worker pool's completed tasks queue |
| - // and returns true if idle. |
| - bool CollectCompletedTasks(); |
| + // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
| + bool CollectCompletedTasks(TaskDeque* completed_tasks); |
| private: |
| - // Appends all completed tasks to |completed_tasks|. Lock must |
| - // already be acquired before calling this function. |
| - bool AppendCompletedTasksWithLockAcquired( |
| - ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); |
| + class ScheduledTask { |
| + public: |
| + ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority) |
| + : priority_(priority) { |
| + if (dependent) |
| + dependents_.push_back(dependent); |
| + } |
| + |
| + internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; } |
| + unsigned priority() const { return priority_; } |
| + |
| + private: |
| + internal::WorkerPoolTask::TaskVector dependents_; |
| + unsigned priority_; |
| + }; |
| + typedef internal::WorkerPoolTask* ScheduledTaskMapKey; |
| + typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask> |
| + ScheduledTaskMap; |
| + |
| + // This builds a ScheduledTaskMap from a root task. |
| + static unsigned BuildScheduledTaskMapRecursive( |
| + internal::WorkerPoolTask* task, |
| + internal::WorkerPoolTask* dependent, |
| + unsigned priority, |
| + ScheduledTaskMap* scheduled_tasks); |
| + static void BuildScheduledTaskMap( |
| + internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks); |
| + |
| + // Collect all completed tasks by swapping the contents of |
| + // |completed_tasks| and |completed_tasks_|. Lock must be acquired |
| + // before calling this function. Returns true if idle. |
| + bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); |
| // Schedule an OnIdleOnOriginThread callback if not already pending. |
| // Lock must already be acquired before calling this function. |
| @@ -95,8 +171,8 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
| mutable base::Lock lock_; |
| // Condition variable that is waited on by worker threads until new |
| - // tasks are posted or shutdown starts. |
| - base::ConditionVariable has_pending_tasks_cv_; |
| + // tasks are ready to run or shutdown starts. |
| + base::ConditionVariable has_ready_to_run_tasks_cv_; |
| // Target message loop used for posting callbacks. |
| scoped_refptr<base::MessageLoopProxy> origin_loop_; |
| @@ -111,15 +187,25 @@ class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
| // loop index is 0. |
| unsigned next_thread_index_; |
| - // Number of tasks currently running. |
| - unsigned running_task_count_; |
| - |
| // Set during shutdown. Tells workers to exit when no more tasks |
| // are pending. |
| bool shutdown_; |
| - typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; |
| - TaskDeque pending_tasks_; |
| + // The root task that is a dependent of all other tasks. |
| + scoped_refptr<internal::WorkerPoolTask> root_; |
| + |
| + // This set contains all pending tasks. |
| + ScheduledTaskMap pending_tasks_; |
| + |
| + // Ordered set of tasks that are ready to run. |
| + // TODO(reveman): priority_queue might be more efficient. |
| + typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap; |
| + TaskMap ready_to_run_tasks_; |
| + |
| + // This set contains all currently running tasks. |
| + ScheduledTaskMap running_tasks_; |
| + |
| + // Completed tasks not yet collected by origin thread. |
| TaskDeque completed_tasks_; |
| ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
| @@ -132,25 +218,24 @@ WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
| const std::string& thread_name_prefix) |
| : worker_pool_on_origin_thread_(worker_pool), |
| lock_(), |
| - has_pending_tasks_cv_(&lock_), |
| + has_ready_to_run_tasks_cv_(&lock_), |
| origin_loop_(base::MessageLoopProxy::current()), |
| weak_ptr_factory_(this), |
| on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
| weak_ptr_factory_.GetWeakPtr())), |
| on_idle_pending_(false), |
| next_thread_index_(0), |
| - running_task_count_(0), |
| shutdown_(false) { |
| base::AutoLock lock(lock_); |
| while (workers_.size() < num_threads) { |
| scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
| new base::DelegateSimpleThread( |
| - this, |
| - thread_name_prefix + |
| - base::StringPrintf( |
| - "Worker%u", |
| - static_cast<unsigned>(workers_.size() + 1)).c_str())); |
| + this, |
| + thread_name_prefix + |
| + base::StringPrintf( |
| + "Worker%u", |
| + static_cast<unsigned>(workers_.size() + 1)).c_str())); |
| worker->Start(); |
| workers_.push_back(worker.Pass()); |
| } |
| @@ -161,12 +246,10 @@ WorkerPool::Inner::~Inner() { |
| DCHECK(shutdown_); |
| - // Cancel all pending callbacks. |
| - weak_ptr_factory_.InvalidateWeakPtrs(); |
| - |
| DCHECK_EQ(0u, pending_tasks_.size()); |
| + DCHECK_EQ(0u, ready_to_run_tasks_.size()); |
| + DCHECK_EQ(0u, running_tasks_.size()); |
| DCHECK_EQ(0u, completed_tasks_.size()); |
| - DCHECK_EQ(0u, running_task_count_); |
| } |
| void WorkerPool::Inner::Shutdown() { |
| @@ -178,7 +261,7 @@ void WorkerPool::Inner::Shutdown() { |
| // Wake up a worker so it knows it should exit. This will cause all workers |
| // to exit as each will wake up another worker before exiting. |
| - has_pending_tasks_cv_.Signal(); |
| + has_ready_to_run_tasks_cv_.Signal(); |
| } |
| while (workers_.size()) { |
| @@ -188,32 +271,93 @@ void WorkerPool::Inner::Shutdown() { |
| base::ThreadRestrictions::ScopedAllowIO allow_io; |
| worker->Join(); |
| } |
| + |
| + // Cancel any pending OnIdle callback. |
| + weak_ptr_factory_.InvalidateWeakPtrs(); |
| } |
| -void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
| - base::AutoLock lock(lock_); |
| +void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) { |
| + // It is OK to call ScheduleTasks() after shutdown if |root| is NULL. |
| + DCHECK(!root || !shutdown_); |
| + |
| + ScheduledTaskMap new_pending_tasks; |
| + if (root) |
| + BuildScheduledTaskMap(root, &new_pending_tasks); |
| + |
| + scoped_refptr<internal::WorkerPoolTask> new_root(root); |
| + |
| + { |
| + base::AutoLock lock(lock_); |
| - pending_tasks_.push_back(task.Pass()); |
| + // First remove all completed tasks from |new_pending_tasks|. |
| + for (TaskDeque::iterator it = completed_tasks_.begin(); |
| + it != completed_tasks_.end(); ++it) { |
| + internal::WorkerPoolTask* task = *it; |
| + new_pending_tasks.take_and_erase(task); |
| + } |
| + |
| + // Move tasks not present in |new_pending_tasks| to |completed_tasks_|. |
| + for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); |
| + it != pending_tasks_.end(); ++it) { |
| + internal::WorkerPoolTask* task = it->first; |
| + |
| + // Task has completed if not present in |new_pending_tasks|. |
| + if (!new_pending_tasks.contains(task)) |
| + completed_tasks_.push_back(task); |
| + } |
| + |
| + // Build new running task set. |
| + ScheduledTaskMap new_running_tasks; |
| + for (ScheduledTaskMap::iterator it = running_tasks_.begin(); |
| + it != running_tasks_.end(); ++it) { |
| + internal::WorkerPoolTask* task = it->first; |
| + new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); |
|
vmpstr
2013/05/24 17:01:37
Are currently running tasks guaranteed to be in th
reveman
2013/05/24 18:53:48
No, but we need to set the ScheduledTask value to
|
| + } |
| + |
| + // Swap root and task sets. |
| + // Note: old tasks are intentionally destroyed after releasing |lock_|. |
| + root_.swap(new_root); |
| + pending_tasks_.swap(new_pending_tasks); |
| + running_tasks_.swap(new_running_tasks); |
| + |
| + // Rebuild |ready_to_run_tasks_| before letting. |
| + ready_to_run_tasks_.clear(); |
| + for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); |
| + it != pending_tasks_.end(); ++it) { |
| + internal::WorkerPoolTask* task = it->first; |
| + |
| + // Completed tasks should not exist in |pending_tasks_|. |
| + DCHECK(!task->HasFinishedRunning()); |
| + |
| + // Call DidSchedule() to indicate that this task has been scheduled. |
| + // Note: This is only for debugging purposes. |
| + task->DidSchedule(); |
| - // There is more work available, so wake up worker thread. |
| - has_pending_tasks_cv_.Signal(); |
| + DCHECK_EQ(0u, ready_to_run_tasks_.count(it->second->priority())); |
| + if (task->IsReadyToRun()) |
| + ready_to_run_tasks_[it->second->priority()] = task; |
| + } |
| + |
| + // If there is more work available, wake up worker thread. |
| + if (!ready_to_run_tasks_.empty()) |
| + has_ready_to_run_tasks_cv_.Signal(); |
| + } |
| } |
| -bool WorkerPool::Inner::CollectCompletedTasks() { |
| +bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { |
| base::AutoLock lock(lock_); |
| - return AppendCompletedTasksWithLockAcquired( |
| - &worker_pool_on_origin_thread_->completed_tasks_); |
| + return CollectCompletedTasksWithLockAcquired(completed_tasks); |
| } |
| -bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( |
| - ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { |
| +bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
| + TaskDeque* completed_tasks) { |
| lock_.AssertAcquired(); |
| - while (completed_tasks_.size()) |
| - completed_tasks->push_back(completed_tasks_.take_front().Pass()); |
| + DCHECK_EQ(0u, completed_tasks->size()); |
| + completed_tasks->swap(completed_tasks_); |
| - return !running_task_count_ && pending_tasks_.empty(); |
| + return running_tasks_.empty() && pending_tasks_.empty(); |
| } |
| void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
| @@ -226,6 +370,8 @@ void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
| } |
| void WorkerPool::Inner::OnIdleOnOriginThread() { |
| + TaskDeque completed_tasks; |
| + |
| { |
| base::AutoLock lock(lock_); |
| @@ -233,14 +379,13 @@ void WorkerPool::Inner::OnIdleOnOriginThread() { |
| on_idle_pending_ = false; |
| // Early out if no longer idle. |
| - if (running_task_count_ || !pending_tasks_.empty()) |
| + if (!running_tasks_.empty() || !pending_tasks_.empty()) |
| return; |
| - AppendCompletedTasksWithLockAcquired( |
| - &worker_pool_on_origin_thread_->completed_tasks_); |
| + CollectCompletedTasksWithLockAcquired(&completed_tasks); |
| } |
| - worker_pool_on_origin_thread_->OnIdle(); |
| + worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
| } |
| void WorkerPool::Inner::Run() { |
| @@ -256,29 +401,37 @@ void WorkerPool::Inner::Run() { |
| int thread_index = next_thread_index_++; |
| while (true) { |
| - if (pending_tasks_.empty()) { |
| - // Exit when shutdown is set and no more tasks are pending. |
| - if (shutdown_) |
| - break; |
| - |
| - // Schedule an idle callback if requested and not pending. |
| - if (!running_task_count_) |
| - ScheduleOnIdleWithLockAcquired(); |
| - |
| - // Wait for new pending tasks. |
| - has_pending_tasks_cv_.Wait(); |
| + if (ready_to_run_tasks_.empty()) { |
| + if (pending_tasks_.empty()) { |
| + // Exit when shutdown is set and no more tasks are pending. |
| + if (shutdown_) |
| + break; |
| + |
| + // Schedule an idle callback if no tasks are running. |
| + if (running_tasks_.empty()) |
| + ScheduleOnIdleWithLockAcquired(); |
| + } |
| + |
| + // Wait for more tasks. |
| + has_ready_to_run_tasks_cv_.Wait(); |
| continue; |
| } |
| - // Get next task. |
| - scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
| + // Take top priority task from |ready_to_run_tasks_|. |
| + scoped_refptr<internal::WorkerPoolTask> task( |
| + ready_to_run_tasks_.begin()->second); |
| + ready_to_run_tasks_.erase(ready_to_run_tasks_.begin()); |
| + |
| + // Move task from |pending_tasks_| to |running_tasks_|. |
| + DCHECK(pending_tasks_.contains(task)); |
| + DCHECK(!running_tasks_.contains(task)); |
| + running_tasks_.set(task, pending_tasks_.take_and_erase(task)); |
| - // Increment |running_task_count_| before starting to run task. |
| - running_task_count_++; |
| + // There may be more work available, so wake up another worker thread. |
| + has_ready_to_run_tasks_cv_.Signal(); |
| - // There may be more work available, so wake up another |
| - // worker thread. |
| - has_pending_tasks_cv_.Signal(); |
| + // Call WillRun() before releasing |lock_| and running task. |
| + task->WillRun(); |
| { |
| base::AutoUnlock unlock(lock_); |
| @@ -286,15 +439,76 @@ void WorkerPool::Inner::Run() { |
| task->RunOnThread(thread_index); |
| } |
| - completed_tasks_.push_back(task.Pass()); |
| + // This will mark task as finished running. |
| + task->DidRun(); |
| + |
| + // Now iterate over all dependents to check if they are ready to run. |
| + scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase( |
| + task); |
| + if (scheduled_task) { |
| + typedef internal::WorkerPoolTask::TaskVector TaskVector; |
| + for (TaskVector::iterator it = scheduled_task->dependents().begin(); |
| + it != scheduled_task->dependents().end(); ++it) { |
| + internal::WorkerPoolTask* dependent = *it; |
| + if (!dependent->IsReadyToRun()) |
| + continue; |
| + |
| + // Task is ready. Add it to |ready_to_run_tasks_|. |
| + DCHECK(pending_tasks_.contains(dependent)); |
| + unsigned priority = pending_tasks_.get(dependent)->priority(); |
| + DCHECK(!ready_to_run_tasks_.count(priority) || |
| + ready_to_run_tasks_[priority] == dependent); |
| + ready_to_run_tasks_[priority] = dependent; |
| + } |
| + } |
| - // Decrement |running_task_count_| now that we are done running task. |
| - running_task_count_--; |
| + // Finally add task to |completed_tasks_|. |
| + completed_tasks_.push_back(task); |
| } |
| // We noticed we should exit. Wake up the next worker so it knows it should |
| // exit as well (because the Shutdown() code only signals once). |
| - has_pending_tasks_cv_.Signal(); |
| + has_ready_to_run_tasks_cv_.Signal(); |
| +} |
| + |
| +// static |
| +unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive( |
|
vmpstr
2013/05/24 17:01:37
Can you add a few comments in this function to jus
reveman
2013/05/24 18:53:48
Added some comments and an example graph to latest
|
| + internal::WorkerPoolTask* task, |
| + internal::WorkerPoolTask* dependent, |
| + unsigned priority, |
| + ScheduledTaskMap* scheduled_tasks) { |
| + if (task->HasCompleted()) |
| + return priority; |
| + |
| + ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task); |
| + if (scheduled_it != scheduled_tasks->end()) { |
| + DCHECK(dependent); |
| + scheduled_it->second->dependents().push_back(dependent); |
| + return priority; |
| + } |
| + |
| + typedef internal::WorkerPoolTask::TaskVector TaskVector; |
| + for (TaskVector::iterator it = task->dependencies().begin(); |
| + it != task->dependencies().end(); ++it) { |
| + internal::WorkerPoolTask* dependency = *it; |
| + priority = BuildScheduledTaskMapRecursive( |
| + dependency, task, priority, scheduled_tasks); |
| + } |
| + |
| + scheduled_tasks->set(task, |
| + make_scoped_ptr(new ScheduledTask(dependent, |
| + priority))); |
| + |
| + return priority + 1; |
| +} |
| + |
| +// static |
| +void WorkerPool::Inner::BuildScheduledTaskMap( |
| + internal::WorkerPoolTask* root, |
| + ScheduledTaskMap* scheduled_tasks) { |
| + const unsigned kBasePriority = 0u; |
| + DCHECK(root); |
| + BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks); |
| } |
| WorkerPool::WorkerPool(size_t num_threads, |
| @@ -302,7 +516,6 @@ WorkerPool::WorkerPool(size_t num_threads, |
| const std::string& thread_name_prefix) |
| : client_(NULL), |
| origin_loop_(base::MessageLoopProxy::current()), |
| - weak_ptr_factory_(this), |
| check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
| check_for_completed_tasks_pending_(false), |
| inner_(make_scoped_ptr(new Inner(this, |
| @@ -311,74 +524,80 @@ WorkerPool::WorkerPool(size_t num_threads, |
| } |
| WorkerPool::~WorkerPool() { |
| - // Cancel all pending callbacks. |
| - weak_ptr_factory_.InvalidateWeakPtrs(); |
| - |
| - DCHECK_EQ(0u, completed_tasks_.size()); |
| } |
| void WorkerPool::Shutdown() { |
| inner_->Shutdown(); |
| - inner_->CollectCompletedTasks(); |
| - DispatchCompletionCallbacks(); |
| -} |
| -void WorkerPool::PostTaskAndReply( |
| - const Callback& task, const base::Closure& reply) { |
| - PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( |
| - task, |
| - reply)).PassAs<internal::WorkerPoolTask>()); |
| + TaskDeque completed_tasks; |
| + inner_->CollectCompletedTasks(&completed_tasks); |
| + DispatchCompletionCallbacks(&completed_tasks); |
| } |
| -void WorkerPool::OnIdle() { |
| +void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
| TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
| - DispatchCompletionCallbacks(); |
| + DispatchCompletionCallbacks(completed_tasks); |
| + |
| + // Cancel any pending check for completed tasks. |
| + check_for_completed_tasks_callback_.Cancel(); |
| + check_for_completed_tasks_pending_ = false; |
| } |
| void WorkerPool::ScheduleCheckForCompletedTasks() { |
| if (check_for_completed_tasks_pending_) |
| return; |
| + check_for_completed_tasks_callback_.Reset( |
| + base::Bind(&WorkerPool::CheckForCompletedTasks, |
| + base::Unretained(this))); |
| origin_loop_->PostDelayedTask( |
| FROM_HERE, |
| - base::Bind(&WorkerPool::CheckForCompletedTasks, |
| - weak_ptr_factory_.GetWeakPtr()), |
| + check_for_completed_tasks_callback_.callback(), |
| check_for_completed_tasks_delay_); |
| check_for_completed_tasks_pending_ = true; |
| } |
| void WorkerPool::CheckForCompletedTasks() { |
| TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
| - DCHECK(check_for_completed_tasks_pending_); |
| + check_for_completed_tasks_callback_.Cancel(); |
| check_for_completed_tasks_pending_ = false; |
| + TaskDeque completed_tasks; |
| + |
| // Schedule another check for completed tasks if not idle. |
| - if (!inner_->CollectCompletedTasks()) |
| + if (!inner_->CollectCompletedTasks(&completed_tasks)) |
| ScheduleCheckForCompletedTasks(); |
| - DispatchCompletionCallbacks(); |
| + DispatchCompletionCallbacks(&completed_tasks); |
| } |
| -void WorkerPool::DispatchCompletionCallbacks() { |
| +void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
| TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
| - if (completed_tasks_.empty()) |
| + // Early out when |completed_tasks| is empty to prevent unnecessary |
| + // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). |
| + if (completed_tasks->empty()) |
| return; |
| - while (completed_tasks_.size()) { |
| - scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); |
| + while (!completed_tasks->empty()) { |
| + scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
| + completed_tasks->pop_front(); |
| task->DidComplete(); |
| + task->DispatchCompletionCallback(); |
| } |
| DCHECK(client_); |
| client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
| } |
| -void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
| - // Schedule check for completed tasks if not pending. |
| - ScheduleCheckForCompletedTasks(); |
| +void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) { |
| + TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); |
| + |
| + // Schedule check for completed tasks. |
| + if (root) |
| + ScheduleCheckForCompletedTasks(); |
| - inner_->PostTask(task.Pass()); |
| + inner_->ScheduleTasks(root); |
| } |
| } // namespace cc |