| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "cc/base/worker_pool.h" | 5 #include "cc/base/worker_pool.h" |
| 6 | 6 |
| 7 #if defined(OS_ANDROID) | 7 #include <algorithm> |
| 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | |
| 9 #include <sys/resource.h> | |
| 10 #endif | |
| 11 | |
| 12 #include <map> | |
| 13 | 8 |
| 14 #include "base/bind.h" | 9 #include "base/bind.h" |
| 15 #include "base/debug/trace_event.h" | 10 #include "base/debug/trace_event.h" |
| 16 #include "base/hash_tables.h" | |
| 17 #include "base/stringprintf.h" | 11 #include "base/stringprintf.h" |
| 12 #include "base/synchronization/condition_variable.h" |
| 18 #include "base/threading/simple_thread.h" | 13 #include "base/threading/simple_thread.h" |
| 19 #include "base/threading/thread_restrictions.h" | 14 #include "base/threading/thread_restrictions.h" |
| 20 #include "cc/base/scoped_ptr_deque.h" | |
| 21 #include "cc/base/scoped_ptr_hash_map.h" | |
| 22 | |
| 23 #if defined(COMPILER_GCC) | |
| 24 namespace BASE_HASH_NAMESPACE { | |
| 25 template <> struct hash<cc::internal::WorkerPoolTask*> { | |
| 26 size_t operator()(cc::internal::WorkerPoolTask* ptr) const { | |
| 27 return hash<size_t>()(reinterpret_cast<size_t>(ptr)); | |
| 28 } | |
| 29 }; | |
| 30 } // namespace BASE_HASH_NAMESPACE | |
| 31 #endif // COMPILER | |
| 32 | 15 |
| 33 namespace cc { | 16 namespace cc { |
| 34 | 17 |
| 18 namespace { |
| 19 |
| 20 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
| 21 public: |
| 22 WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
| 23 const base::Closure& reply) |
| 24 : internal::WorkerPoolTask(reply), |
| 25 task_(task) {} |
| 26 |
| 27 virtual void RunOnThread(unsigned thread_index) OVERRIDE { |
| 28 task_.Run(); |
| 29 } |
| 30 |
| 31 private: |
| 32 WorkerPool::Callback task_; |
| 33 }; |
| 34 |
| 35 } // namespace |
| 36 |
| 35 namespace internal { | 37 namespace internal { |
| 36 | 38 |
| 37 WorkerPoolTask::WorkerPoolTask() | 39 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { |
| 38 : did_schedule_(false), | |
| 39 did_run_(false), | |
| 40 did_complete_(false) { | |
| 41 } | |
| 42 | |
| 43 WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies) | |
| 44 : did_schedule_(false), | |
| 45 did_run_(false), | |
| 46 did_complete_(false) { | |
| 47 dependencies_.swap(*dependencies); | |
| 48 } | 40 } |
| 49 | 41 |
| 50 WorkerPoolTask::~WorkerPoolTask() { | 42 WorkerPoolTask::~WorkerPoolTask() { |
| 51 DCHECK_EQ(did_schedule_, did_complete_); | |
| 52 DCHECK(!did_run_ || did_schedule_); | |
| 53 DCHECK(!did_run_ || did_complete_); | |
| 54 } | |
| 55 | |
| 56 void WorkerPoolTask::DidSchedule() { | |
| 57 DCHECK(!did_complete_); | |
| 58 did_schedule_ = true; | |
| 59 } | |
| 60 | |
| 61 void WorkerPoolTask::WillRun() { | |
| 62 DCHECK(did_schedule_); | |
| 63 DCHECK(!did_complete_); | |
| 64 DCHECK(!did_run_); | |
| 65 } | |
| 66 | |
| 67 void WorkerPoolTask::DidRun() { | |
| 68 did_run_ = true; | |
| 69 } | 43 } |
| 70 | 44 |
| 71 void WorkerPoolTask::DidComplete() { | 45 void WorkerPoolTask::DidComplete() { |
| 72 DCHECK(did_schedule_); | 46 reply_.Run(); |
| 73 DCHECK(!did_complete_); | |
| 74 did_complete_ = true; | |
| 75 } | |
| 76 | |
| 77 bool WorkerPoolTask::IsReadyToRun() const { | |
| 78 // TODO(reveman): Use counter to improve performance. | |
| 79 for (TaskVector::const_reverse_iterator it = dependencies_.rbegin(); | |
| 80 it != dependencies_.rend(); ++it) { | |
| 81 WorkerPoolTask* dependency = *it; | |
| 82 if (!dependency->HasFinishedRunning()) | |
| 83 return false; | |
| 84 } | |
| 85 return true; | |
| 86 } | |
| 87 | |
| 88 bool WorkerPoolTask::HasFinishedRunning() const { | |
| 89 return did_run_; | |
| 90 } | |
| 91 | |
| 92 bool WorkerPoolTask::HasCompleted() const { | |
| 93 return did_complete_; | |
| 94 } | 47 } |
| 95 | 48 |
| 96 } // namespace internal | 49 } // namespace internal |
| 97 | 50 |
| 98 // Internal to the worker pool. Any data or logic that needs to be | 51 // Internal to the worker pool. Any data or logic that needs to be |
| 99 // shared between threads lives in this class. All members are guarded | 52 // shared between threads lives in this class. All members are guarded |
| 100 // by |lock_|. | 53 // by |lock_|. |
| 101 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { | 54 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
| 102 public: | 55 public: |
| 103 Inner(WorkerPool* worker_pool, | 56 Inner(WorkerPool* worker_pool, |
| 104 size_t num_threads, | 57 size_t num_threads, |
| 105 const std::string& thread_name_prefix); | 58 const std::string& thread_name_prefix); |
| 106 virtual ~Inner(); | 59 virtual ~Inner(); |
| 107 | 60 |
| 108 void Shutdown(); | 61 void Shutdown(); |
| 109 | 62 |
| 110 // Schedule running of |root| task and all its dependencies. Tasks | 63 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); |
| 111 // previously scheduled but no longer needed to run |root| will be | |
| 112 // canceled unless already running. Canceled tasks are moved to | |
| 113 // |completed_tasks_| without being run. The result is that once | |
| 114 // scheduled, a task is guaranteed to end up in the |completed_tasks_| | |
| 115 // queue even if they later get canceled by another call to | |
| 116 // ScheduleTasks(). | |
| 117 void ScheduleTasks(internal::WorkerPoolTask* root); | |
| 118 | 64 |
| 119 // Collect all completed tasks in |completed_tasks|. Returns true if idle. | 65 // Appends all completed tasks to worker pool's completed tasks queue |
| 120 bool CollectCompletedTasks(TaskDeque* completed_tasks); | 66 // and returns true if idle. |
| 67 bool CollectCompletedTasks(); |
| 121 | 68 |
| 122 private: | 69 private: |
| 123 class ScheduledTask { | 70 // Appends all completed tasks to |completed_tasks|. Lock must |
| 124 public: | 71 // already be acquired before calling this function. |
| 125 ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority) | 72 bool AppendCompletedTasksWithLockAcquired( |
| 126 : priority_(priority) { | 73 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); |
| 127 if (dependent) | |
| 128 dependents_.push_back(dependent); | |
| 129 } | |
| 130 | |
| 131 internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; } | |
| 132 unsigned priority() const { return priority_; } | |
| 133 | |
| 134 private: | |
| 135 internal::WorkerPoolTask::TaskVector dependents_; | |
| 136 unsigned priority_; | |
| 137 }; | |
| 138 typedef internal::WorkerPoolTask* ScheduledTaskMapKey; | |
| 139 typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask> | |
| 140 ScheduledTaskMap; | |
| 141 | |
| 142 // This builds a ScheduledTaskMap from a root task. | |
| 143 static unsigned BuildScheduledTaskMapRecursive( | |
| 144 internal::WorkerPoolTask* task, | |
| 145 internal::WorkerPoolTask* dependent, | |
| 146 unsigned priority, | |
| 147 ScheduledTaskMap* scheduled_tasks); | |
| 148 static void BuildScheduledTaskMap( | |
| 149 internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks); | |
| 150 | |
| 151 // Collect all completed tasks by swapping the contents of | |
| 152 // |completed_tasks| and |completed_tasks_|. Lock must be acquired | |
| 153 // before calling this function. Returns true if idle. | |
| 154 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); | |
| 155 | 74 |
| 156 // Schedule an OnIdleOnOriginThread callback if not already pending. | 75 // Schedule an OnIdleOnOriginThread callback if not already pending. |
| 157 // Lock must already be acquired before calling this function. | 76 // Lock must already be acquired before calling this function. |
| 158 void ScheduleOnIdleWithLockAcquired(); | 77 void ScheduleOnIdleWithLockAcquired(); |
| 159 void OnIdleOnOriginThread(); | 78 void OnIdleOnOriginThread(); |
| 160 | 79 |
| 161 // Overridden from base::DelegateSimpleThread: | 80 // Overridden from base::DelegateSimpleThread: |
| 162 virtual void Run() OVERRIDE; | 81 virtual void Run() OVERRIDE; |
| 163 | 82 |
| 164 // Pointer to worker pool. Can only be used on origin thread. | 83 // Pointer to worker pool. Can only be used on origin thread. |
| 165 // Not guarded by |lock_|. | 84 // Not guarded by |lock_|. |
| 166 WorkerPool* worker_pool_on_origin_thread_; | 85 WorkerPool* worker_pool_on_origin_thread_; |
| 167 | 86 |
| 168 // This lock protects all members of this class except | 87 // This lock protects all members of this class except |
| 169 // |worker_pool_on_origin_thread_|. Do not read or modify anything | 88 // |worker_pool_on_origin_thread_|. Do not read or modify anything |
| 170 // without holding this lock. Do not block while holding this lock. | 89 // without holding this lock. Do not block while holding this lock. |
| 171 mutable base::Lock lock_; | 90 mutable base::Lock lock_; |
| 172 | 91 |
| 173 // Condition variable that is waited on by worker threads until new | 92 // Condition variable that is waited on by worker threads until new |
| 174 // tasks are ready to run or shutdown starts. | 93 // tasks are posted or shutdown starts. |
| 175 base::ConditionVariable has_ready_to_run_tasks_cv_; | 94 base::ConditionVariable has_pending_tasks_cv_; |
| 176 | 95 |
| 177 // Target message loop used for posting callbacks. | 96 // Target message loop used for posting callbacks. |
| 178 scoped_refptr<base::MessageLoopProxy> origin_loop_; | 97 scoped_refptr<base::MessageLoopProxy> origin_loop_; |
| 179 | 98 |
| 180 base::WeakPtrFactory<Inner> weak_ptr_factory_; | 99 base::WeakPtrFactory<Inner> weak_ptr_factory_; |
| 181 | 100 |
| 182 const base::Closure on_idle_callback_; | 101 const base::Closure on_idle_callback_; |
| 183 // Set when a OnIdleOnOriginThread() callback is pending. | 102 // Set when a OnIdleOnOriginThread() callback is pending. |
| 184 bool on_idle_pending_; | 103 bool on_idle_pending_; |
| 185 | 104 |
| 186 // Provides each running thread loop with a unique index. First thread | 105 // Provides each running thread loop with a unique index. First thread |
| 187 // loop index is 0. | 106 // loop index is 0. |
| 188 unsigned next_thread_index_; | 107 unsigned next_thread_index_; |
| 189 | 108 |
| 109 // Number of tasks currently running. |
| 110 unsigned running_task_count_; |
| 111 |
| 190 // Set during shutdown. Tells workers to exit when no more tasks | 112 // Set during shutdown. Tells workers to exit when no more tasks |
| 191 // are pending. | 113 // are pending. |
| 192 bool shutdown_; | 114 bool shutdown_; |
| 193 | 115 |
| 194 // The root task that is a dependent of all other tasks. | 116 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; |
| 195 scoped_refptr<internal::WorkerPoolTask> root_; | 117 TaskDeque pending_tasks_; |
| 196 | |
| 197 // This set contains all pending tasks. | |
| 198 ScheduledTaskMap pending_tasks_; | |
| 199 | |
| 200 // Ordered set of tasks that are ready to run. | |
| 201 // TODO(reveman): priority_queue might be more efficient. | |
| 202 typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap; | |
| 203 TaskMap ready_to_run_tasks_; | |
| 204 | |
| 205 // This set contains all currently running tasks. | |
| 206 ScheduledTaskMap running_tasks_; | |
| 207 | |
| 208 // Completed tasks not yet collected by origin thread. | |
| 209 TaskDeque completed_tasks_; | 118 TaskDeque completed_tasks_; |
| 210 | 119 |
| 211 ScopedPtrDeque<base::DelegateSimpleThread> workers_; | 120 ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
| 212 | 121 |
| 213 DISALLOW_COPY_AND_ASSIGN(Inner); | 122 DISALLOW_COPY_AND_ASSIGN(Inner); |
| 214 }; | 123 }; |
| 215 | 124 |
| 216 WorkerPool::Inner::Inner(WorkerPool* worker_pool, | 125 WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
| 217 size_t num_threads, | 126 size_t num_threads, |
| 218 const std::string& thread_name_prefix) | 127 const std::string& thread_name_prefix) |
| 219 : worker_pool_on_origin_thread_(worker_pool), | 128 : worker_pool_on_origin_thread_(worker_pool), |
| 220 lock_(), | 129 lock_(), |
| 221 has_ready_to_run_tasks_cv_(&lock_), | 130 has_pending_tasks_cv_(&lock_), |
| 222 origin_loop_(base::MessageLoopProxy::current()), | 131 origin_loop_(base::MessageLoopProxy::current()), |
| 223 weak_ptr_factory_(this), | 132 weak_ptr_factory_(this), |
| 224 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, | 133 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
| 225 weak_ptr_factory_.GetWeakPtr())), | 134 weak_ptr_factory_.GetWeakPtr())), |
| 226 on_idle_pending_(false), | 135 on_idle_pending_(false), |
| 227 next_thread_index_(0), | 136 next_thread_index_(0), |
| 137 running_task_count_(0), |
| 228 shutdown_(false) { | 138 shutdown_(false) { |
| 229 base::AutoLock lock(lock_); | 139 base::AutoLock lock(lock_); |
| 230 | 140 |
| 231 while (workers_.size() < num_threads) { | 141 while (workers_.size() < num_threads) { |
| 232 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | 142 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
| 233 new base::DelegateSimpleThread( | 143 new base::DelegateSimpleThread( |
| 234 this, | 144 this, |
| 235 thread_name_prefix + | 145 thread_name_prefix + |
| 236 base::StringPrintf( | 146 base::StringPrintf( |
| 237 "Worker%u", | 147 "Worker%u", |
| 238 static_cast<unsigned>(workers_.size() + 1)).c_str())); | 148 static_cast<unsigned>(workers_.size() + 1)).c_str())); |
| 239 worker->Start(); | 149 worker->Start(); |
| 240 workers_.push_back(worker.Pass()); | 150 workers_.push_back(worker.Pass()); |
| 241 } | 151 } |
| 242 } | 152 } |
| 243 | 153 |
| 244 WorkerPool::Inner::~Inner() { | 154 WorkerPool::Inner::~Inner() { |
| 245 base::AutoLock lock(lock_); | 155 base::AutoLock lock(lock_); |
| 246 | 156 |
| 247 DCHECK(shutdown_); | 157 DCHECK(shutdown_); |
| 248 | 158 |
| 159 // Cancel all pending callbacks. |
| 160 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 161 |
| 249 DCHECK_EQ(0u, pending_tasks_.size()); | 162 DCHECK_EQ(0u, pending_tasks_.size()); |
| 250 DCHECK_EQ(0u, ready_to_run_tasks_.size()); | |
| 251 DCHECK_EQ(0u, running_tasks_.size()); | |
| 252 DCHECK_EQ(0u, completed_tasks_.size()); | 163 DCHECK_EQ(0u, completed_tasks_.size()); |
| 164 DCHECK_EQ(0u, running_task_count_); |
| 253 } | 165 } |
| 254 | 166 |
| 255 void WorkerPool::Inner::Shutdown() { | 167 void WorkerPool::Inner::Shutdown() { |
| 256 { | 168 { |
| 257 base::AutoLock lock(lock_); | 169 base::AutoLock lock(lock_); |
| 258 | 170 |
| 259 DCHECK(!shutdown_); | 171 DCHECK(!shutdown_); |
| 260 shutdown_ = true; | 172 shutdown_ = true; |
| 261 | 173 |
| 262 // Wake up a worker so it knows it should exit. This will cause all workers | 174 // Wake up a worker so it knows it should exit. This will cause all workers |
| 263 // to exit as each will wake up another worker before exiting. | 175 // to exit as each will wake up another worker before exiting. |
| 264 has_ready_to_run_tasks_cv_.Signal(); | 176 has_pending_tasks_cv_.Signal(); |
| 265 } | 177 } |
| 266 | 178 |
| 267 while (workers_.size()) { | 179 while (workers_.size()) { |
| 268 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | 180 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
| 269 // http://crbug.com/240453 - Join() is considered IO and will block this | 181 // http://crbug.com/240453 - Join() is considered IO and will block this |
| 270 // thread. See also http://crbug.com/239423 for further ideas. | 182 // thread. See also http://crbug.com/239423 for further ideas. |
| 271 base::ThreadRestrictions::ScopedAllowIO allow_io; | 183 base::ThreadRestrictions::ScopedAllowIO allow_io; |
| 272 worker->Join(); | 184 worker->Join(); |
| 273 } | 185 } |
| 274 | |
| 275 // Cancel any pending OnIdle callback. | |
| 276 weak_ptr_factory_.InvalidateWeakPtrs(); | |
| 277 } | 186 } |
| 278 | 187 |
| 279 void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) { | 188 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
| 280 // It is OK to call ScheduleTasks() after shutdown if |root| is NULL. | 189 base::AutoLock lock(lock_); |
| 281 DCHECK(!root || !shutdown_); | |
| 282 | 190 |
| 283 scoped_refptr<internal::WorkerPoolTask> new_root(root); | 191 pending_tasks_.push_back(task.Pass()); |
| 284 | 192 |
| 285 ScheduledTaskMap new_pending_tasks; | 193 // There is more work available, so wake up worker thread. |
| 286 ScheduledTaskMap new_running_tasks; | 194 has_pending_tasks_cv_.Signal(); |
| 287 TaskMap new_ready_to_run_tasks; | |
| 288 | |
| 289 // Build scheduled task map before acquiring |lock_|. | |
| 290 if (root) | |
| 291 BuildScheduledTaskMap(root, &new_pending_tasks); | |
| 292 | |
| 293 { | |
| 294 base::AutoLock lock(lock_); | |
| 295 | |
| 296 // First remove all completed tasks from |new_pending_tasks|. | |
| 297 for (TaskDeque::iterator it = completed_tasks_.begin(); | |
| 298 it != completed_tasks_.end(); ++it) { | |
| 299 internal::WorkerPoolTask* task = *it; | |
| 300 new_pending_tasks.take_and_erase(task); | |
| 301 } | |
| 302 | |
| 303 // Move tasks not present in |new_pending_tasks| to |completed_tasks_|. | |
| 304 for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); | |
| 305 it != pending_tasks_.end(); ++it) { | |
| 306 internal::WorkerPoolTask* task = it->first; | |
| 307 | |
| 308 // Task has completed if not present in |new_pending_tasks|. | |
| 309 if (!new_pending_tasks.contains(task)) | |
| 310 completed_tasks_.push_back(task); | |
| 311 } | |
| 312 | |
| 313 // Build new running task set. | |
| 314 for (ScheduledTaskMap::iterator it = running_tasks_.begin(); | |
| 315 it != running_tasks_.end(); ++it) { | |
| 316 internal::WorkerPoolTask* task = it->first; | |
| 317 // Transfer scheduled task value from |new_pending_tasks| to | |
| 318 // |new_running_tasks| if currently running. Value must be set to | |
| 319 // NULL if |new_pending_tasks| doesn't contain task. This does | |
| 320 // the right in both cases. | |
| 321 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); | |
| 322 } | |
| 323 | |
| 324 // Build new "ready to run" tasks queue. | |
| 325 for (ScheduledTaskMap::iterator it = new_pending_tasks.begin(); | |
| 326 it != new_pending_tasks.end(); ++it) { | |
| 327 internal::WorkerPoolTask* task = it->first; | |
| 328 | |
| 329 // Completed tasks should not exist in |new_pending_tasks_|. | |
| 330 DCHECK(!task->HasFinishedRunning()); | |
| 331 | |
| 332 // Call DidSchedule() to indicate that this task has been scheduled. | |
| 333 // Note: This is only for debugging purposes. | |
| 334 task->DidSchedule(); | |
| 335 | |
| 336 DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority())); | |
| 337 if (task->IsReadyToRun()) | |
| 338 new_ready_to_run_tasks[it->second->priority()] = task; | |
| 339 } | |
| 340 | |
| 341 // Swap root taskand task sets. | |
| 342 // Note: old tasks are intentionally destroyed after releasing |lock_|. | |
| 343 root_.swap(new_root); | |
| 344 pending_tasks_.swap(new_pending_tasks); | |
| 345 running_tasks_.swap(new_running_tasks); | |
| 346 ready_to_run_tasks_.swap(new_ready_to_run_tasks); | |
| 347 | |
| 348 // If there is more work available, wake up worker thread. | |
| 349 if (!ready_to_run_tasks_.empty()) | |
| 350 has_ready_to_run_tasks_cv_.Signal(); | |
| 351 } | |
| 352 } | 195 } |
| 353 | 196 |
| 354 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { | 197 bool WorkerPool::Inner::CollectCompletedTasks() { |
| 355 base::AutoLock lock(lock_); | 198 base::AutoLock lock(lock_); |
| 356 | 199 |
| 357 return CollectCompletedTasksWithLockAcquired(completed_tasks); | 200 return AppendCompletedTasksWithLockAcquired( |
| 201 &worker_pool_on_origin_thread_->completed_tasks_); |
| 358 } | 202 } |
| 359 | 203 |
| 360 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( | 204 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( |
| 361 TaskDeque* completed_tasks) { | 205 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { |
| 362 lock_.AssertAcquired(); | 206 lock_.AssertAcquired(); |
| 363 | 207 |
| 364 DCHECK_EQ(0u, completed_tasks->size()); | 208 while (completed_tasks_.size()) |
| 365 completed_tasks->swap(completed_tasks_); | 209 completed_tasks->push_back(completed_tasks_.take_front().Pass()); |
| 366 | 210 |
| 367 return running_tasks_.empty() && pending_tasks_.empty(); | 211 return !running_task_count_ && pending_tasks_.empty(); |
| 368 } | 212 } |
| 369 | 213 |
| 370 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { | 214 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
| 371 lock_.AssertAcquired(); | 215 lock_.AssertAcquired(); |
| 372 | 216 |
| 373 if (on_idle_pending_) | 217 if (on_idle_pending_) |
| 374 return; | 218 return; |
| 375 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); | 219 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); |
| 376 on_idle_pending_ = true; | 220 on_idle_pending_ = true; |
| 377 } | 221 } |
| 378 | 222 |
| 379 void WorkerPool::Inner::OnIdleOnOriginThread() { | 223 void WorkerPool::Inner::OnIdleOnOriginThread() { |
| 380 TaskDeque completed_tasks; | |
| 381 | |
| 382 { | 224 { |
| 383 base::AutoLock lock(lock_); | 225 base::AutoLock lock(lock_); |
| 384 | 226 |
| 385 DCHECK(on_idle_pending_); | 227 DCHECK(on_idle_pending_); |
| 386 on_idle_pending_ = false; | 228 on_idle_pending_ = false; |
| 387 | 229 |
| 388 // Early out if no longer idle. | 230 // Early out if no longer idle. |
| 389 if (!running_tasks_.empty() || !pending_tasks_.empty()) | 231 if (running_task_count_ || !pending_tasks_.empty()) |
| 390 return; | 232 return; |
| 391 | 233 |
| 392 CollectCompletedTasksWithLockAcquired(&completed_tasks); | 234 AppendCompletedTasksWithLockAcquired( |
| 235 &worker_pool_on_origin_thread_->completed_tasks_); |
| 393 } | 236 } |
| 394 | 237 |
| 395 worker_pool_on_origin_thread_->OnIdle(&completed_tasks); | 238 worker_pool_on_origin_thread_->OnIdle(); |
| 396 } | 239 } |
| 397 | 240 |
| 398 void WorkerPool::Inner::Run() { | 241 void WorkerPool::Inner::Run() { |
| 399 #if defined(OS_ANDROID) | 242 #if defined(OS_ANDROID) |
| 400 base::PlatformThread::SetThreadPriority( | 243 base::PlatformThread::SetThreadPriority( |
| 401 base::PlatformThread::CurrentHandle(), | 244 base::PlatformThread::CurrentHandle(), |
| 402 base::kThreadPriority_Background); | 245 base::kThreadPriority_Background); |
| 403 #endif | 246 #endif |
| 404 | 247 |
| 405 base::AutoLock lock(lock_); | 248 base::AutoLock lock(lock_); |
| 406 | 249 |
| 407 // Get a unique thread index. | 250 // Get a unique thread index. |
| 408 int thread_index = next_thread_index_++; | 251 int thread_index = next_thread_index_++; |
| 409 | 252 |
| 410 while (true) { | 253 while (true) { |
| 411 if (ready_to_run_tasks_.empty()) { | 254 if (pending_tasks_.empty()) { |
| 412 if (pending_tasks_.empty()) { | 255 // Exit when shutdown is set and no more tasks are pending. |
| 413 // Exit when shutdown is set and no more tasks are pending. | 256 if (shutdown_) |
| 414 if (shutdown_) | 257 break; |
| 415 break; | |
| 416 | 258 |
| 417 // Schedule an idle callback if no tasks are running. | 259 // Schedule an idle callback if requested and not pending. |
| 418 if (running_tasks_.empty()) | 260 if (!running_task_count_) |
| 419 ScheduleOnIdleWithLockAcquired(); | 261 ScheduleOnIdleWithLockAcquired(); |
| 420 } | |
| 421 | 262 |
| 422 // Wait for more tasks. | 263 // Wait for new pending tasks. |
| 423 has_ready_to_run_tasks_cv_.Wait(); | 264 has_pending_tasks_cv_.Wait(); |
| 424 continue; | 265 continue; |
| 425 } | 266 } |
| 426 | 267 |
| 427 // Take top priority task from |ready_to_run_tasks_|. | 268 // Get next task. |
| 428 scoped_refptr<internal::WorkerPoolTask> task( | 269 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
| 429 ready_to_run_tasks_.begin()->second); | |
| 430 ready_to_run_tasks_.erase(ready_to_run_tasks_.begin()); | |
| 431 | 270 |
| 432 // Move task from |pending_tasks_| to |running_tasks_|. | 271 // Increment |running_task_count_| before starting to run task. |
| 433 DCHECK(pending_tasks_.contains(task)); | 272 running_task_count_++; |
| 434 DCHECK(!running_tasks_.contains(task)); | |
| 435 running_tasks_.set(task, pending_tasks_.take_and_erase(task)); | |
| 436 | 273 |
| 437 // There may be more work available, so wake up another worker thread. | 274 // There may be more work available, so wake up another |
| 438 has_ready_to_run_tasks_cv_.Signal(); | 275 // worker thread. |
| 439 | 276 has_pending_tasks_cv_.Signal(); |
| 440 // Call WillRun() before releasing |lock_| and running task. | |
| 441 task->WillRun(); | |
| 442 | 277 |
| 443 { | 278 { |
| 444 base::AutoUnlock unlock(lock_); | 279 base::AutoUnlock unlock(lock_); |
| 445 | 280 |
| 446 task->RunOnThread(thread_index); | 281 task->RunOnThread(thread_index); |
| 447 } | 282 } |
| 448 | 283 |
| 449 // This will mark task as finished running. | 284 completed_tasks_.push_back(task.Pass()); |
| 450 task->DidRun(); | |
| 451 | 285 |
| 452 // Now iterate over all dependents to check if they are ready to run. | 286 // Decrement |running_task_count_| now that we are done running task. |
| 453 scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase( | 287 running_task_count_--; |
| 454 task); | |
| 455 if (scheduled_task) { | |
| 456 typedef internal::WorkerPoolTask::TaskVector TaskVector; | |
| 457 for (TaskVector::iterator it = scheduled_task->dependents().begin(); | |
| 458 it != scheduled_task->dependents().end(); ++it) { | |
| 459 internal::WorkerPoolTask* dependent = *it; | |
| 460 if (!dependent->IsReadyToRun()) | |
| 461 continue; | |
| 462 | |
| 463 // Task is ready. Add it to |ready_to_run_tasks_|. | |
| 464 DCHECK(pending_tasks_.contains(dependent)); | |
| 465 unsigned priority = pending_tasks_.get(dependent)->priority(); | |
| 466 DCHECK(!ready_to_run_tasks_.count(priority) || | |
| 467 ready_to_run_tasks_[priority] == dependent); | |
| 468 ready_to_run_tasks_[priority] = dependent; | |
| 469 } | |
| 470 } | |
| 471 | |
| 472 // Finally add task to |completed_tasks_|. | |
| 473 completed_tasks_.push_back(task); | |
| 474 } | 288 } |
| 475 | 289 |
| 476 // We noticed we should exit. Wake up the next worker so it knows it should | 290 // We noticed we should exit. Wake up the next worker so it knows it should |
| 477 // exit as well (because the Shutdown() code only signals once). | 291 // exit as well (because the Shutdown() code only signals once). |
| 478 has_ready_to_run_tasks_cv_.Signal(); | 292 has_pending_tasks_cv_.Signal(); |
| 479 } | |
| 480 | |
| 481 // BuildScheduledTaskMap() takes a task tree as input and constructs | |
| 482 // a unique set of tasks with edges between dependencies pointing in | |
| 483 // the direction of the dependents. Each task is given a unique priority | |
| 484 // which is currently the same as the DFS traversal order. | |
| 485 // | |
| 486 // Input: Output: | |
| 487 // | |
| 488 // root task4 Task | Priority (lower is better) | |
| 489 // / \ / \ -------+--------------------------- | |
| 490 // task1 task2 task3 task2 root | 4 | |
| 491 // | | | | task1 | 2 | |
| 492 // task3 | task1 | task2 | 3 | |
| 493 // | | \ / task3 | 1 | |
| 494 // task4 task4 root task4 | 0 | |
| 495 // | |
| 496 // The output can be used to efficiently maintain a queue of | |
| 497 // "ready to run" tasks. | |
| 498 | |
| 499 // static | |
| 500 unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive( | |
| 501 internal::WorkerPoolTask* task, | |
| 502 internal::WorkerPoolTask* dependent, | |
| 503 unsigned priority, | |
| 504 ScheduledTaskMap* scheduled_tasks) { | |
| 505 // Skip sub-tree if task has already completed. | |
| 506 if (task->HasCompleted()) | |
| 507 return priority; | |
| 508 | |
| 509 ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task); | |
| 510 if (scheduled_it != scheduled_tasks->end()) { | |
| 511 DCHECK(dependent); | |
| 512 scheduled_it->second->dependents().push_back(dependent); | |
| 513 return priority; | |
| 514 } | |
| 515 | |
| 516 typedef internal::WorkerPoolTask::TaskVector TaskVector; | |
| 517 for (TaskVector::iterator it = task->dependencies().begin(); | |
| 518 it != task->dependencies().end(); ++it) { | |
| 519 internal::WorkerPoolTask* dependency = *it; | |
| 520 priority = BuildScheduledTaskMapRecursive( | |
| 521 dependency, task, priority, scheduled_tasks); | |
| 522 } | |
| 523 | |
| 524 scheduled_tasks->set(task, | |
| 525 make_scoped_ptr(new ScheduledTask(dependent, | |
| 526 priority))); | |
| 527 | |
| 528 return priority + 1; | |
| 529 } | |
| 530 | |
| 531 // static | |
| 532 void WorkerPool::Inner::BuildScheduledTaskMap( | |
| 533 internal::WorkerPoolTask* root, | |
| 534 ScheduledTaskMap* scheduled_tasks) { | |
| 535 const unsigned kBasePriority = 0u; | |
| 536 DCHECK(root); | |
| 537 BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks); | |
| 538 } | 293 } |
| 539 | 294 |
| 540 WorkerPool::WorkerPool(size_t num_threads, | 295 WorkerPool::WorkerPool(size_t num_threads, |
| 541 base::TimeDelta check_for_completed_tasks_delay, | 296 base::TimeDelta check_for_completed_tasks_delay, |
| 542 const std::string& thread_name_prefix) | 297 const std::string& thread_name_prefix) |
| 543 : client_(NULL), | 298 : client_(NULL), |
| 544 origin_loop_(base::MessageLoopProxy::current()), | 299 origin_loop_(base::MessageLoopProxy::current()), |
| 300 weak_ptr_factory_(this), |
| 545 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), | 301 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
| 546 check_for_completed_tasks_pending_(false), | 302 check_for_completed_tasks_pending_(false), |
| 547 inner_(make_scoped_ptr(new Inner(this, | 303 inner_(make_scoped_ptr(new Inner(this, |
| 548 num_threads, | 304 num_threads, |
| 549 thread_name_prefix))) { | 305 thread_name_prefix))) { |
| 550 } | 306 } |
| 551 | 307 |
| 552 WorkerPool::~WorkerPool() { | 308 WorkerPool::~WorkerPool() { |
| 309 // Cancel all pending callbacks. |
| 310 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 311 |
| 312 DCHECK_EQ(0u, completed_tasks_.size()); |
| 553 } | 313 } |
| 554 | 314 |
| 555 void WorkerPool::Shutdown() { | 315 void WorkerPool::Shutdown() { |
| 556 inner_->Shutdown(); | 316 inner_->Shutdown(); |
| 557 | 317 inner_->CollectCompletedTasks(); |
| 558 TaskDeque completed_tasks; | 318 DispatchCompletionCallbacks(); |
| 559 inner_->CollectCompletedTasks(&completed_tasks); | |
| 560 DispatchCompletionCallbacks(&completed_tasks); | |
| 561 } | 319 } |
| 562 | 320 |
| 563 void WorkerPool::OnIdle(TaskDeque* completed_tasks) { | 321 void WorkerPool::PostTaskAndReply( |
| 322 const Callback& task, const base::Closure& reply) { |
| 323 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( |
| 324 task, |
| 325 reply)).PassAs<internal::WorkerPoolTask>()); |
| 326 } |
| 327 |
| 328 void WorkerPool::OnIdle() { |
| 564 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); | 329 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
| 565 | 330 |
| 566 DispatchCompletionCallbacks(completed_tasks); | 331 DispatchCompletionCallbacks(); |
| 567 | |
| 568 // Cancel any pending check for completed tasks. | |
| 569 check_for_completed_tasks_callback_.Cancel(); | |
| 570 check_for_completed_tasks_pending_ = false; | |
| 571 } | 332 } |
| 572 | 333 |
| 573 void WorkerPool::ScheduleCheckForCompletedTasks() { | 334 void WorkerPool::ScheduleCheckForCompletedTasks() { |
| 574 if (check_for_completed_tasks_pending_) | 335 if (check_for_completed_tasks_pending_) |
| 575 return; | 336 return; |
| 576 check_for_completed_tasks_callback_.Reset( | |
| 577 base::Bind(&WorkerPool::CheckForCompletedTasks, | |
| 578 base::Unretained(this))); | |
| 579 origin_loop_->PostDelayedTask( | 337 origin_loop_->PostDelayedTask( |
| 580 FROM_HERE, | 338 FROM_HERE, |
| 581 check_for_completed_tasks_callback_.callback(), | 339 base::Bind(&WorkerPool::CheckForCompletedTasks, |
| 340 weak_ptr_factory_.GetWeakPtr()), |
| 582 check_for_completed_tasks_delay_); | 341 check_for_completed_tasks_delay_); |
| 583 check_for_completed_tasks_pending_ = true; | 342 check_for_completed_tasks_pending_ = true; |
| 584 } | 343 } |
| 585 | 344 |
| 586 void WorkerPool::CheckForCompletedTasks() { | 345 void WorkerPool::CheckForCompletedTasks() { |
| 587 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); | 346 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
| 588 check_for_completed_tasks_callback_.Cancel(); | 347 DCHECK(check_for_completed_tasks_pending_); |
| 589 check_for_completed_tasks_pending_ = false; | 348 check_for_completed_tasks_pending_ = false; |
| 590 | 349 |
| 591 TaskDeque completed_tasks; | |
| 592 | |
| 593 // Schedule another check for completed tasks if not idle. | 350 // Schedule another check for completed tasks if not idle. |
| 594 if (!inner_->CollectCompletedTasks(&completed_tasks)) | 351 if (!inner_->CollectCompletedTasks()) |
| 595 ScheduleCheckForCompletedTasks(); | 352 ScheduleCheckForCompletedTasks(); |
| 596 | 353 |
| 597 DispatchCompletionCallbacks(&completed_tasks); | 354 DispatchCompletionCallbacks(); |
| 598 } | 355 } |
| 599 | 356 |
| 600 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { | 357 void WorkerPool::DispatchCompletionCallbacks() { |
| 601 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); | 358 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
| 602 | 359 |
| 603 // Early out when |completed_tasks| is empty to prevent unnecessary | 360 if (completed_tasks_.empty()) |
| 604 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). | |
| 605 if (completed_tasks->empty()) | |
| 606 return; | 361 return; |
| 607 | 362 |
| 608 while (!completed_tasks->empty()) { | 363 while (completed_tasks_.size()) { |
| 609 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); | 364 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); |
| 610 completed_tasks->pop_front(); | |
| 611 task->DidComplete(); | 365 task->DidComplete(); |
| 612 task->DispatchCompletionCallback(); | |
| 613 } | 366 } |
| 614 | 367 |
| 615 DCHECK(client_); | 368 DCHECK(client_); |
| 616 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); | 369 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
| 617 } | 370 } |
| 618 | 371 |
| 619 void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) { | 372 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
| 620 TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); | 373 // Schedule check for completed tasks if not pending. |
| 374 ScheduleCheckForCompletedTasks(); |
| 621 | 375 |
| 622 // Schedule check for completed tasks. | 376 inner_->PostTask(task.Pass()); |
| 623 if (root) | |
| 624 ScheduleCheckForCompletedTasks(); | |
| 625 | |
| 626 inner_->ScheduleTasks(root); | |
| 627 } | 377 } |
| 628 | 378 |
| 629 } // namespace cc | 379 } // namespace cc |
| OLD | NEW |