Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "cc/base/worker_pool.h" | 5 #include "cc/base/worker_pool.h" |
| 6 | 6 |
| 7 #if defined(OS_ANDROID) | 7 #if defined(OS_ANDROID) |
| 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
| 9 #include <sys/resource.h> | 9 #include <sys/resource.h> |
| 10 #endif | 10 #endif |
| 11 | 11 |
| 12 #include <algorithm> | 12 #include <map> |
| 13 | 13 |
| 14 #include "base/bind.h" | 14 #include "base/bind.h" |
| 15 #include "base/debug/trace_event.h" | 15 #include "base/debug/trace_event.h" |
| 16 #include "base/hash_tables.h" | |
| 16 #include "base/stringprintf.h" | 17 #include "base/stringprintf.h" |
| 17 #include "base/synchronization/condition_variable.h" | |
| 18 #include "base/threading/simple_thread.h" | 18 #include "base/threading/simple_thread.h" |
| 19 #include "base/threading/thread_restrictions.h" | 19 #include "base/threading/thread_restrictions.h" |
| 20 #include "cc/base/scoped_ptr_deque.h" | |
| 21 #include "cc/base/scoped_ptr_hash_map.h" | |
| 22 | |
| 23 #if defined(COMPILER_GCC) | |
|
vmpstr
2013/05/24 17:01:37
Does this part just work with clang?
reveman
2013/05/24 18:53:48
yes, you'll find the same throughout the chromium
| |
| 24 namespace BASE_HASH_NAMESPACE { | |
| 25 template <> struct hash<cc::internal::WorkerPoolTask*> { | |
| 26 size_t operator()(cc::internal::WorkerPoolTask* ptr) const { | |
| 27 return hash<size_t>()(reinterpret_cast<size_t>(ptr)); | |
| 28 } | |
| 29 }; | |
| 30 } // namespace BASE_HASH_NAMESPACE | |
| 31 #endif // COMPILER | |
| 20 | 32 |
| 21 namespace cc { | 33 namespace cc { |
| 22 | 34 |
| 23 namespace { | |
| 24 | |
| 25 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { | |
| 26 public: | |
| 27 WorkerPoolTaskImpl(const WorkerPool::Callback& task, | |
| 28 const base::Closure& reply) | |
| 29 : internal::WorkerPoolTask(reply), | |
| 30 task_(task) {} | |
| 31 | |
| 32 virtual void RunOnThread(unsigned thread_index) OVERRIDE { | |
| 33 task_.Run(); | |
| 34 } | |
| 35 | |
| 36 private: | |
| 37 WorkerPool::Callback task_; | |
| 38 }; | |
| 39 | |
| 40 } // namespace | |
| 41 | |
| 42 namespace internal { | 35 namespace internal { |
| 43 | 36 |
| 44 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { | 37 WorkerPoolTask::WorkerPoolTask() |
| 38 : did_schedule_(false), | |
| 39 did_run_(false), | |
| 40 did_complete_(false) { | |
| 41 } | |
| 42 | |
| 43 WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies) | |
| 44 : did_schedule_(false), | |
| 45 did_run_(false), | |
| 46 did_complete_(false) { | |
| 47 dependencies_.swap(*dependencies); | |
| 45 } | 48 } |
| 46 | 49 |
| 47 WorkerPoolTask::~WorkerPoolTask() { | 50 WorkerPoolTask::~WorkerPoolTask() { |
| 51 DCHECK_EQ(did_schedule_, did_complete_); | |
| 52 DCHECK(!did_run_ || did_schedule_); | |
| 53 DCHECK(!did_run_ || did_complete_); | |
| 54 } | |
| 55 | |
| 56 void WorkerPoolTask::DidSchedule() { | |
| 57 DCHECK(!did_complete_); | |
| 58 did_schedule_ = true; | |
| 59 } | |
| 60 | |
| 61 void WorkerPoolTask::WillRun() { | |
| 62 DCHECK(did_schedule_); | |
| 63 DCHECK(!did_complete_); | |
| 64 DCHECK(!did_run_); | |
| 65 } | |
| 66 | |
| 67 void WorkerPoolTask::DidRun() { | |
| 68 did_run_ = true; | |
| 48 } | 69 } |
| 49 | 70 |
| 50 void WorkerPoolTask::DidComplete() { | 71 void WorkerPoolTask::DidComplete() { |
| 51 reply_.Run(); | 72 DCHECK(did_schedule_); |
| 73 DCHECK(!did_complete_); | |
| 74 did_complete_ = true; | |
| 75 } | |
| 76 | |
| 77 bool WorkerPoolTask::IsReadyToRun() const { | |
| 78 // TODO(reveman): Use counter to improve performance. | |
| 79 for (TaskVector::const_reverse_iterator it = dependencies_.rbegin(); | |
| 80 it != dependencies_.rend(); ++it) { | |
| 81 WorkerPoolTask* dependency = *it; | |
| 82 if (!dependency->HasFinishedRunning()) | |
| 83 return false; | |
| 84 } | |
| 85 return true; | |
| 86 } | |
| 87 | |
| 88 bool WorkerPoolTask::HasFinishedRunning() const { | |
| 89 return did_run_; | |
| 90 } | |
| 91 | |
| 92 bool WorkerPoolTask::HasCompleted() const { | |
| 93 return did_complete_; | |
| 52 } | 94 } |
| 53 | 95 |
| 54 } // namespace internal | 96 } // namespace internal |
| 55 | 97 |
| 56 // Internal to the worker pool. Any data or logic that needs to be | 98 // Internal to the worker pool. Any data or logic that needs to be |
| 57 // shared between threads lives in this class. All members are guarded | 99 // shared between threads lives in this class. All members are guarded |
| 58 // by |lock_|. | 100 // by |lock_|. |
| 59 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { | 101 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
| 60 public: | 102 public: |
| 61 Inner(WorkerPool* worker_pool, | 103 Inner(WorkerPool* worker_pool, |
| 62 size_t num_threads, | 104 size_t num_threads, |
| 63 const std::string& thread_name_prefix); | 105 const std::string& thread_name_prefix); |
| 64 virtual ~Inner(); | 106 virtual ~Inner(); |
| 65 | 107 |
| 66 void Shutdown(); | 108 void Shutdown(); |
| 67 | 109 |
| 68 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); | 110 // Schedule running of |root| task and all its dependencies. Tasks |
| 111 // previously scheduled but no longer needed to run |root| will be | |
| 112 // canceled unless already running. Canceled tasks are moved to | |
| 113 // |completed_tasks_| without being run. The result is that once | |
| 114 // scheduled, a task is guaranteed to end up in the |completed_tasks_| | |
| 115 // queue even if they later get canceled by another call to | |
| 116 // ScheduleTasks(). | |
| 117 void ScheduleTasks(internal::WorkerPoolTask* root); | |
| 69 | 118 |
| 70 // Appends all completed tasks to worker pool's completed tasks queue | 119 // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
| 71 // and returns true if idle. | 120 bool CollectCompletedTasks(TaskDeque* completed_tasks); |
| 72 bool CollectCompletedTasks(); | |
| 73 | 121 |
| 74 private: | 122 private: |
| 75 // Appends all completed tasks to |completed_tasks|. Lock must | 123 class ScheduledTask { |
| 76 // already be acquired before calling this function. | 124 public: |
| 77 bool AppendCompletedTasksWithLockAcquired( | 125 ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority) |
| 78 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); | 126 : priority_(priority) { |
| 127 if (dependent) | |
| 128 dependents_.push_back(dependent); | |
| 129 } | |
| 130 | |
| 131 internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; } | |
| 132 unsigned priority() const { return priority_; } | |
| 133 | |
| 134 private: | |
| 135 internal::WorkerPoolTask::TaskVector dependents_; | |
| 136 unsigned priority_; | |
| 137 }; | |
| 138 typedef internal::WorkerPoolTask* ScheduledTaskMapKey; | |
| 139 typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask> | |
| 140 ScheduledTaskMap; | |
| 141 | |
| 142 // This builds a ScheduledTaskMap from a root task. | |
| 143 static unsigned BuildScheduledTaskMapRecursive( | |
| 144 internal::WorkerPoolTask* task, | |
| 145 internal::WorkerPoolTask* dependent, | |
| 146 unsigned priority, | |
| 147 ScheduledTaskMap* scheduled_tasks); | |
| 148 static void BuildScheduledTaskMap( | |
| 149 internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks); | |
| 150 | |
| 151 // Collect all completed tasks by swapping the contents of | |
| 152 // |completed_tasks| and |completed_tasks_|. Lock must be acquired | |
| 153 // before calling this function. Returns true if idle. | |
| 154 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); | |
| 79 | 155 |
| 80 // Schedule an OnIdleOnOriginThread callback if not already pending. | 156 // Schedule an OnIdleOnOriginThread callback if not already pending. |
| 81 // Lock must already be acquired before calling this function. | 157 // Lock must already be acquired before calling this function. |
| 82 void ScheduleOnIdleWithLockAcquired(); | 158 void ScheduleOnIdleWithLockAcquired(); |
| 83 void OnIdleOnOriginThread(); | 159 void OnIdleOnOriginThread(); |
| 84 | 160 |
| 85 // Overridden from base::DelegateSimpleThread: | 161 // Overridden from base::DelegateSimpleThread: |
| 86 virtual void Run() OVERRIDE; | 162 virtual void Run() OVERRIDE; |
| 87 | 163 |
| 88 // Pointer to worker pool. Can only be used on origin thread. | 164 // Pointer to worker pool. Can only be used on origin thread. |
| 89 // Not guarded by |lock_|. | 165 // Not guarded by |lock_|. |
| 90 WorkerPool* worker_pool_on_origin_thread_; | 166 WorkerPool* worker_pool_on_origin_thread_; |
| 91 | 167 |
| 92 // This lock protects all members of this class except | 168 // This lock protects all members of this class except |
| 93 // |worker_pool_on_origin_thread_|. Do not read or modify anything | 169 // |worker_pool_on_origin_thread_|. Do not read or modify anything |
| 94 // without holding this lock. Do not block while holding this lock. | 170 // without holding this lock. Do not block while holding this lock. |
| 95 mutable base::Lock lock_; | 171 mutable base::Lock lock_; |
| 96 | 172 |
| 97 // Condition variable that is waited on by worker threads until new | 173 // Condition variable that is waited on by worker threads until new |
| 98 // tasks are posted or shutdown starts. | 174 // tasks are ready to run or shutdown starts. |
| 99 base::ConditionVariable has_pending_tasks_cv_; | 175 base::ConditionVariable has_ready_to_run_tasks_cv_; |
| 100 | 176 |
| 101 // Target message loop used for posting callbacks. | 177 // Target message loop used for posting callbacks. |
| 102 scoped_refptr<base::MessageLoopProxy> origin_loop_; | 178 scoped_refptr<base::MessageLoopProxy> origin_loop_; |
| 103 | 179 |
| 104 base::WeakPtrFactory<Inner> weak_ptr_factory_; | 180 base::WeakPtrFactory<Inner> weak_ptr_factory_; |
| 105 | 181 |
| 106 const base::Closure on_idle_callback_; | 182 const base::Closure on_idle_callback_; |
| 107 // Set when a OnIdleOnOriginThread() callback is pending. | 183 // Set when a OnIdleOnOriginThread() callback is pending. |
| 108 bool on_idle_pending_; | 184 bool on_idle_pending_; |
| 109 | 185 |
| 110 // Provides each running thread loop with a unique index. First thread | 186 // Provides each running thread loop with a unique index. First thread |
| 111 // loop index is 0. | 187 // loop index is 0. |
| 112 unsigned next_thread_index_; | 188 unsigned next_thread_index_; |
| 113 | 189 |
| 114 // Number of tasks currently running. | |
| 115 unsigned running_task_count_; | |
| 116 | |
| 117 // Set during shutdown. Tells workers to exit when no more tasks | 190 // Set during shutdown. Tells workers to exit when no more tasks |
| 118 // are pending. | 191 // are pending. |
| 119 bool shutdown_; | 192 bool shutdown_; |
| 120 | 193 |
| 121 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; | 194 // The root task that is a dependent of all other tasks. |
| 122 TaskDeque pending_tasks_; | 195 scoped_refptr<internal::WorkerPoolTask> root_; |
| 196 | |
| 197 // This set contains all pending tasks. | |
| 198 ScheduledTaskMap pending_tasks_; | |
| 199 | |
| 200 // Ordered set of tasks that are ready to run. | |
| 201 // TODO(reveman): priority_queue might be more efficient. | |
| 202 typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap; | |
| 203 TaskMap ready_to_run_tasks_; | |
| 204 | |
| 205 // This set contains all currently running tasks. | |
| 206 ScheduledTaskMap running_tasks_; | |
| 207 | |
| 208 // Completed tasks not yet collected by origin thread. | |
| 123 TaskDeque completed_tasks_; | 209 TaskDeque completed_tasks_; |
| 124 | 210 |
| 125 ScopedPtrDeque<base::DelegateSimpleThread> workers_; | 211 ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
| 126 | 212 |
| 127 DISALLOW_COPY_AND_ASSIGN(Inner); | 213 DISALLOW_COPY_AND_ASSIGN(Inner); |
| 128 }; | 214 }; |
| 129 | 215 |
| 130 WorkerPool::Inner::Inner(WorkerPool* worker_pool, | 216 WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
| 131 size_t num_threads, | 217 size_t num_threads, |
| 132 const std::string& thread_name_prefix) | 218 const std::string& thread_name_prefix) |
| 133 : worker_pool_on_origin_thread_(worker_pool), | 219 : worker_pool_on_origin_thread_(worker_pool), |
| 134 lock_(), | 220 lock_(), |
| 135 has_pending_tasks_cv_(&lock_), | 221 has_ready_to_run_tasks_cv_(&lock_), |
| 136 origin_loop_(base::MessageLoopProxy::current()), | 222 origin_loop_(base::MessageLoopProxy::current()), |
| 137 weak_ptr_factory_(this), | 223 weak_ptr_factory_(this), |
| 138 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, | 224 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
| 139 weak_ptr_factory_.GetWeakPtr())), | 225 weak_ptr_factory_.GetWeakPtr())), |
| 140 on_idle_pending_(false), | 226 on_idle_pending_(false), |
| 141 next_thread_index_(0), | 227 next_thread_index_(0), |
| 142 running_task_count_(0), | |
| 143 shutdown_(false) { | 228 shutdown_(false) { |
| 144 base::AutoLock lock(lock_); | 229 base::AutoLock lock(lock_); |
| 145 | 230 |
| 146 while (workers_.size() < num_threads) { | 231 while (workers_.size() < num_threads) { |
| 147 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | 232 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
| 148 new base::DelegateSimpleThread( | 233 new base::DelegateSimpleThread( |
| 149 this, | 234 this, |
| 150 thread_name_prefix + | 235 thread_name_prefix + |
| 151 base::StringPrintf( | 236 base::StringPrintf( |
| 152 "Worker%u", | 237 "Worker%u", |
| 153 static_cast<unsigned>(workers_.size() + 1)).c_str())); | 238 static_cast<unsigned>(workers_.size() + 1)).c_str())); |
| 154 worker->Start(); | 239 worker->Start(); |
| 155 workers_.push_back(worker.Pass()); | 240 workers_.push_back(worker.Pass()); |
| 156 } | 241 } |
| 157 } | 242 } |
| 158 | 243 |
| 159 WorkerPool::Inner::~Inner() { | 244 WorkerPool::Inner::~Inner() { |
| 160 base::AutoLock lock(lock_); | 245 base::AutoLock lock(lock_); |
| 161 | 246 |
| 162 DCHECK(shutdown_); | 247 DCHECK(shutdown_); |
| 163 | 248 |
| 164 // Cancel all pending callbacks. | |
| 165 weak_ptr_factory_.InvalidateWeakPtrs(); | |
| 166 | |
| 167 DCHECK_EQ(0u, pending_tasks_.size()); | 249 DCHECK_EQ(0u, pending_tasks_.size()); |
| 250 DCHECK_EQ(0u, ready_to_run_tasks_.size()); | |
| 251 DCHECK_EQ(0u, running_tasks_.size()); | |
| 168 DCHECK_EQ(0u, completed_tasks_.size()); | 252 DCHECK_EQ(0u, completed_tasks_.size()); |
| 169 DCHECK_EQ(0u, running_task_count_); | |
| 170 } | 253 } |
| 171 | 254 |
| 172 void WorkerPool::Inner::Shutdown() { | 255 void WorkerPool::Inner::Shutdown() { |
| 173 { | 256 { |
| 174 base::AutoLock lock(lock_); | 257 base::AutoLock lock(lock_); |
| 175 | 258 |
| 176 DCHECK(!shutdown_); | 259 DCHECK(!shutdown_); |
| 177 shutdown_ = true; | 260 shutdown_ = true; |
| 178 | 261 |
| 179 // Wake up a worker so it knows it should exit. This will cause all workers | 262 // Wake up a worker so it knows it should exit. This will cause all workers |
| 180 // to exit as each will wake up another worker before exiting. | 263 // to exit as each will wake up another worker before exiting. |
| 181 has_pending_tasks_cv_.Signal(); | 264 has_ready_to_run_tasks_cv_.Signal(); |
| 182 } | 265 } |
| 183 | 266 |
| 184 while (workers_.size()) { | 267 while (workers_.size()) { |
| 185 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | 268 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
| 186 // http://crbug.com/240453 - Join() is considered IO and will block this | 269 // http://crbug.com/240453 - Join() is considered IO and will block this |
| 187 // thread. See also http://crbug.com/239423 for further ideas. | 270 // thread. See also http://crbug.com/239423 for further ideas. |
| 188 base::ThreadRestrictions::ScopedAllowIO allow_io; | 271 base::ThreadRestrictions::ScopedAllowIO allow_io; |
| 189 worker->Join(); | 272 worker->Join(); |
| 190 } | 273 } |
| 274 | |
| 275 // Cancel any pending OnIdle callback. | |
| 276 weak_ptr_factory_.InvalidateWeakPtrs(); | |
| 191 } | 277 } |
| 192 | 278 |
| 193 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 279 void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) { |
| 280 // It is OK to call ScheduleTasks() after shutdown if |root| is NULL. | |
| 281 DCHECK(!root || !shutdown_); | |
| 282 | |
| 283 ScheduledTaskMap new_pending_tasks; | |
| 284 if (root) | |
| 285 BuildScheduledTaskMap(root, &new_pending_tasks); | |
| 286 | |
| 287 scoped_refptr<internal::WorkerPoolTask> new_root(root); | |
| 288 | |
| 289 { | |
| 290 base::AutoLock lock(lock_); | |
| 291 | |
| 292 // First remove all completed tasks from |new_pending_tasks|. | |
| 293 for (TaskDeque::iterator it = completed_tasks_.begin(); | |
| 294 it != completed_tasks_.end(); ++it) { | |
| 295 internal::WorkerPoolTask* task = *it; | |
| 296 new_pending_tasks.take_and_erase(task); | |
| 297 } | |
| 298 | |
| 299 // Move tasks not present in |new_pending_tasks| to |completed_tasks_|. | |
| 300 for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); | |
| 301 it != pending_tasks_.end(); ++it) { | |
| 302 internal::WorkerPoolTask* task = it->first; | |
| 303 | |
| 304 // Task has completed if not present in |new_pending_tasks|. | |
| 305 if (!new_pending_tasks.contains(task)) | |
| 306 completed_tasks_.push_back(task); | |
| 307 } | |
| 308 | |
| 309 // Build new running task set. | |
| 310 ScheduledTaskMap new_running_tasks; | |
| 311 for (ScheduledTaskMap::iterator it = running_tasks_.begin(); | |
| 312 it != running_tasks_.end(); ++it) { | |
| 313 internal::WorkerPoolTask* task = it->first; | |
| 314 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); | |
|
vmpstr
2013/05/24 17:01:37
Are currently running tasks guaranteed to be in th
reveman
2013/05/24 18:53:48
No, but we need to set the ScheduledTask value to
| |
| 315 } | |
| 316 | |
| 317 // Swap root and task sets. | |
| 318 // Note: old tasks are intentionally destroyed after releasing |lock_|. | |
| 319 root_.swap(new_root); | |
| 320 pending_tasks_.swap(new_pending_tasks); | |
| 321 running_tasks_.swap(new_running_tasks); | |
| 322 | |
| 323 // Rebuild |ready_to_run_tasks_| before letting. | |
| 324 ready_to_run_tasks_.clear(); | |
| 325 for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); | |
| 326 it != pending_tasks_.end(); ++it) { | |
| 327 internal::WorkerPoolTask* task = it->first; | |
| 328 | |
| 329 // Completed tasks should not exist in |pending_tasks_|. | |
| 330 DCHECK(!task->HasFinishedRunning()); | |
| 331 | |
| 332 // Call DidSchedule() to indicate that this task has been scheduled. | |
| 333 // Note: This is only for debugging purposes. | |
| 334 task->DidSchedule(); | |
| 335 | |
| 336 DCHECK_EQ(0u, ready_to_run_tasks_.count(it->second->priority())); | |
| 337 if (task->IsReadyToRun()) | |
| 338 ready_to_run_tasks_[it->second->priority()] = task; | |
| 339 } | |
| 340 | |
| 341 // If there is more work available, wake up worker thread. | |
| 342 if (!ready_to_run_tasks_.empty()) | |
| 343 has_ready_to_run_tasks_cv_.Signal(); | |
| 344 } | |
| 345 } | |
| 346 | |
| 347 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { | |
| 194 base::AutoLock lock(lock_); | 348 base::AutoLock lock(lock_); |
| 195 | 349 |
| 196 pending_tasks_.push_back(task.Pass()); | 350 return CollectCompletedTasksWithLockAcquired(completed_tasks); |
| 197 | |
| 198 // There is more work available, so wake up worker thread. | |
| 199 has_pending_tasks_cv_.Signal(); | |
| 200 } | 351 } |
| 201 | 352 |
| 202 bool WorkerPool::Inner::CollectCompletedTasks() { | 353 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
| 203 base::AutoLock lock(lock_); | 354 TaskDeque* completed_tasks) { |
| 204 | |
| 205 return AppendCompletedTasksWithLockAcquired( | |
| 206 &worker_pool_on_origin_thread_->completed_tasks_); | |
| 207 } | |
| 208 | |
| 209 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( | |
| 210 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { | |
| 211 lock_.AssertAcquired(); | 355 lock_.AssertAcquired(); |
| 212 | 356 |
| 213 while (completed_tasks_.size()) | 357 DCHECK_EQ(0u, completed_tasks->size()); |
| 214 completed_tasks->push_back(completed_tasks_.take_front().Pass()); | 358 completed_tasks->swap(completed_tasks_); |
| 215 | 359 |
| 216 return !running_task_count_ && pending_tasks_.empty(); | 360 return running_tasks_.empty() && pending_tasks_.empty(); |
| 217 } | 361 } |
| 218 | 362 |
| 219 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { | 363 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
| 220 lock_.AssertAcquired(); | 364 lock_.AssertAcquired(); |
| 221 | 365 |
| 222 if (on_idle_pending_) | 366 if (on_idle_pending_) |
| 223 return; | 367 return; |
| 224 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); | 368 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); |
| 225 on_idle_pending_ = true; | 369 on_idle_pending_ = true; |
| 226 } | 370 } |
| 227 | 371 |
| 228 void WorkerPool::Inner::OnIdleOnOriginThread() { | 372 void WorkerPool::Inner::OnIdleOnOriginThread() { |
| 373 TaskDeque completed_tasks; | |
| 374 | |
| 229 { | 375 { |
| 230 base::AutoLock lock(lock_); | 376 base::AutoLock lock(lock_); |
| 231 | 377 |
| 232 DCHECK(on_idle_pending_); | 378 DCHECK(on_idle_pending_); |
| 233 on_idle_pending_ = false; | 379 on_idle_pending_ = false; |
| 234 | 380 |
| 235 // Early out if no longer idle. | 381 // Early out if no longer idle. |
| 236 if (running_task_count_ || !pending_tasks_.empty()) | 382 if (!running_tasks_.empty() || !pending_tasks_.empty()) |
| 237 return; | 383 return; |
| 238 | 384 |
| 239 AppendCompletedTasksWithLockAcquired( | 385 CollectCompletedTasksWithLockAcquired(&completed_tasks); |
| 240 &worker_pool_on_origin_thread_->completed_tasks_); | |
| 241 } | 386 } |
| 242 | 387 |
| 243 worker_pool_on_origin_thread_->OnIdle(); | 388 worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
| 244 } | 389 } |
| 245 | 390 |
| 246 void WorkerPool::Inner::Run() { | 391 void WorkerPool::Inner::Run() { |
| 247 #if defined(OS_ANDROID) | 392 #if defined(OS_ANDROID) |
| 248 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 393 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
| 249 int nice_value = 10; // Idle priority. | 394 int nice_value = 10; // Idle priority. |
| 250 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); | 395 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); |
| 251 #endif | 396 #endif |
| 252 | 397 |
| 253 base::AutoLock lock(lock_); | 398 base::AutoLock lock(lock_); |
| 254 | 399 |
| 255 // Get a unique thread index. | 400 // Get a unique thread index. |
| 256 int thread_index = next_thread_index_++; | 401 int thread_index = next_thread_index_++; |
| 257 | 402 |
| 258 while (true) { | 403 while (true) { |
| 259 if (pending_tasks_.empty()) { | 404 if (ready_to_run_tasks_.empty()) { |
| 260 // Exit when shutdown is set and no more tasks are pending. | 405 if (pending_tasks_.empty()) { |
| 261 if (shutdown_) | 406 // Exit when shutdown is set and no more tasks are pending. |
| 262 break; | 407 if (shutdown_) |
| 408 break; | |
| 263 | 409 |
| 264 // Schedule an idle callback if requested and not pending. | 410 // Schedule an idle callback if no tasks are running. |
| 265 if (!running_task_count_) | 411 if (running_tasks_.empty()) |
| 266 ScheduleOnIdleWithLockAcquired(); | 412 ScheduleOnIdleWithLockAcquired(); |
| 413 } | |
| 267 | 414 |
| 268 // Wait for new pending tasks. | 415 // Wait for more tasks. |
| 269 has_pending_tasks_cv_.Wait(); | 416 has_ready_to_run_tasks_cv_.Wait(); |
| 270 continue; | 417 continue; |
| 271 } | 418 } |
| 272 | 419 |
| 273 // Get next task. | 420 // Take top priority task from |ready_to_run_tasks_|. |
| 274 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); | 421 scoped_refptr<internal::WorkerPoolTask> task( |
| 422 ready_to_run_tasks_.begin()->second); | |
| 423 ready_to_run_tasks_.erase(ready_to_run_tasks_.begin()); | |
| 275 | 424 |
| 276 // Increment |running_task_count_| before starting to run task. | 425 // Move task from |pending_tasks_| to |running_tasks_|. |
| 277 running_task_count_++; | 426 DCHECK(pending_tasks_.contains(task)); |
| 427 DCHECK(!running_tasks_.contains(task)); | |
| 428 running_tasks_.set(task, pending_tasks_.take_and_erase(task)); | |
| 278 | 429 |
| 279 // There may be more work available, so wake up another | 430 // There may be more work available, so wake up another worker thread. |
| 280 // worker thread. | 431 has_ready_to_run_tasks_cv_.Signal(); |
| 281 has_pending_tasks_cv_.Signal(); | 432 |
| 433 // Call WillRun() before releasing |lock_| and running task. | |
| 434 task->WillRun(); | |
| 282 | 435 |
| 283 { | 436 { |
| 284 base::AutoUnlock unlock(lock_); | 437 base::AutoUnlock unlock(lock_); |
| 285 | 438 |
| 286 task->RunOnThread(thread_index); | 439 task->RunOnThread(thread_index); |
| 287 } | 440 } |
| 288 | 441 |
| 289 completed_tasks_.push_back(task.Pass()); | 442 // This will mark task as finished running. |
| 443 task->DidRun(); | |
| 290 | 444 |
| 291 // Decrement |running_task_count_| now that we are done running task. | 445 // Now iterate over all dependents to check if they are ready to run. |
| 292 running_task_count_--; | 446 scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase( |
| 447 task); | |
| 448 if (scheduled_task) { | |
| 449 typedef internal::WorkerPoolTask::TaskVector TaskVector; | |
| 450 for (TaskVector::iterator it = scheduled_task->dependents().begin(); | |
| 451 it != scheduled_task->dependents().end(); ++it) { | |
| 452 internal::WorkerPoolTask* dependent = *it; | |
| 453 if (!dependent->IsReadyToRun()) | |
| 454 continue; | |
| 455 | |
| 456 // Task is ready. Add it to |ready_to_run_tasks_|. | |
| 457 DCHECK(pending_tasks_.contains(dependent)); | |
| 458 unsigned priority = pending_tasks_.get(dependent)->priority(); | |
| 459 DCHECK(!ready_to_run_tasks_.count(priority) || | |
| 460 ready_to_run_tasks_[priority] == dependent); | |
| 461 ready_to_run_tasks_[priority] = dependent; | |
| 462 } | |
| 463 } | |
| 464 | |
| 465 // Finally add task to |completed_tasks_|. | |
| 466 completed_tasks_.push_back(task); | |
| 293 } | 467 } |
| 294 | 468 |
| 295 // We noticed we should exit. Wake up the next worker so it knows it should | 469 // We noticed we should exit. Wake up the next worker so it knows it should |
| 296 // exit as well (because the Shutdown() code only signals once). | 470 // exit as well (because the Shutdown() code only signals once). |
| 297 has_pending_tasks_cv_.Signal(); | 471 has_ready_to_run_tasks_cv_.Signal(); |
| 472 } | |
| 473 | |
| 474 // static | |
| 475 unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive( | |
|
vmpstr
2013/05/24 17:01:37
Can you add a few comments in this function to jus
reveman
2013/05/24 18:53:48
Added some comments and an example graph to latest
| |
| 476 internal::WorkerPoolTask* task, | |
| 477 internal::WorkerPoolTask* dependent, | |
| 478 unsigned priority, | |
| 479 ScheduledTaskMap* scheduled_tasks) { | |
| 480 if (task->HasCompleted()) | |
| 481 return priority; | |
| 482 | |
| 483 ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task); | |
| 484 if (scheduled_it != scheduled_tasks->end()) { | |
| 485 DCHECK(dependent); | |
| 486 scheduled_it->second->dependents().push_back(dependent); | |
| 487 return priority; | |
| 488 } | |
| 489 | |
| 490 typedef internal::WorkerPoolTask::TaskVector TaskVector; | |
| 491 for (TaskVector::iterator it = task->dependencies().begin(); | |
| 492 it != task->dependencies().end(); ++it) { | |
| 493 internal::WorkerPoolTask* dependency = *it; | |
| 494 priority = BuildScheduledTaskMapRecursive( | |
| 495 dependency, task, priority, scheduled_tasks); | |
| 496 } | |
| 497 | |
| 498 scheduled_tasks->set(task, | |
| 499 make_scoped_ptr(new ScheduledTask(dependent, | |
| 500 priority))); | |
| 501 | |
| 502 return priority + 1; | |
| 503 } | |
| 504 | |
| 505 // static | |
| 506 void WorkerPool::Inner::BuildScheduledTaskMap( | |
| 507 internal::WorkerPoolTask* root, | |
| 508 ScheduledTaskMap* scheduled_tasks) { | |
| 509 const unsigned kBasePriority = 0u; | |
| 510 DCHECK(root); | |
| 511 BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks); | |
| 298 } | 512 } |
| 299 | 513 |
| 300 WorkerPool::WorkerPool(size_t num_threads, | 514 WorkerPool::WorkerPool(size_t num_threads, |
| 301 base::TimeDelta check_for_completed_tasks_delay, | 515 base::TimeDelta check_for_completed_tasks_delay, |
| 302 const std::string& thread_name_prefix) | 516 const std::string& thread_name_prefix) |
| 303 : client_(NULL), | 517 : client_(NULL), |
| 304 origin_loop_(base::MessageLoopProxy::current()), | 518 origin_loop_(base::MessageLoopProxy::current()), |
| 305 weak_ptr_factory_(this), | |
| 306 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), | 519 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
| 307 check_for_completed_tasks_pending_(false), | 520 check_for_completed_tasks_pending_(false), |
| 308 inner_(make_scoped_ptr(new Inner(this, | 521 inner_(make_scoped_ptr(new Inner(this, |
| 309 num_threads, | 522 num_threads, |
| 310 thread_name_prefix))) { | 523 thread_name_prefix))) { |
| 311 } | 524 } |
| 312 | 525 |
| 313 WorkerPool::~WorkerPool() { | 526 WorkerPool::~WorkerPool() { |
| 314 // Cancel all pending callbacks. | |
| 315 weak_ptr_factory_.InvalidateWeakPtrs(); | |
| 316 | |
| 317 DCHECK_EQ(0u, completed_tasks_.size()); | |
| 318 } | 527 } |
| 319 | 528 |
| 320 void WorkerPool::Shutdown() { | 529 void WorkerPool::Shutdown() { |
| 321 inner_->Shutdown(); | 530 inner_->Shutdown(); |
| 322 inner_->CollectCompletedTasks(); | 531 |
| 323 DispatchCompletionCallbacks(); | 532 TaskDeque completed_tasks; |
| 533 inner_->CollectCompletedTasks(&completed_tasks); | |
| 534 DispatchCompletionCallbacks(&completed_tasks); | |
| 324 } | 535 } |
| 325 | 536 |
| 326 void WorkerPool::PostTaskAndReply( | 537 void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
| 327 const Callback& task, const base::Closure& reply) { | |
| 328 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( | |
| 329 task, | |
| 330 reply)).PassAs<internal::WorkerPoolTask>()); | |
| 331 } | |
| 332 | |
| 333 void WorkerPool::OnIdle() { | |
| 334 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); | 538 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
| 335 | 539 |
| 336 DispatchCompletionCallbacks(); | 540 DispatchCompletionCallbacks(completed_tasks); |
| 541 | |
| 542 // Cancel any pending check for completed tasks. | |
| 543 check_for_completed_tasks_callback_.Cancel(); | |
| 544 check_for_completed_tasks_pending_ = false; | |
| 337 } | 545 } |
| 338 | 546 |
| 339 void WorkerPool::ScheduleCheckForCompletedTasks() { | 547 void WorkerPool::ScheduleCheckForCompletedTasks() { |
| 340 if (check_for_completed_tasks_pending_) | 548 if (check_for_completed_tasks_pending_) |
| 341 return; | 549 return; |
| 550 check_for_completed_tasks_callback_.Reset( | |
| 551 base::Bind(&WorkerPool::CheckForCompletedTasks, | |
| 552 base::Unretained(this))); | |
| 342 origin_loop_->PostDelayedTask( | 553 origin_loop_->PostDelayedTask( |
| 343 FROM_HERE, | 554 FROM_HERE, |
| 344 base::Bind(&WorkerPool::CheckForCompletedTasks, | 555 check_for_completed_tasks_callback_.callback(), |
| 345 weak_ptr_factory_.GetWeakPtr()), | |
| 346 check_for_completed_tasks_delay_); | 556 check_for_completed_tasks_delay_); |
| 347 check_for_completed_tasks_pending_ = true; | 557 check_for_completed_tasks_pending_ = true; |
| 348 } | 558 } |
| 349 | 559 |
| 350 void WorkerPool::CheckForCompletedTasks() { | 560 void WorkerPool::CheckForCompletedTasks() { |
| 351 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); | 561 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
| 352 DCHECK(check_for_completed_tasks_pending_); | 562 check_for_completed_tasks_callback_.Cancel(); |
| 353 check_for_completed_tasks_pending_ = false; | 563 check_for_completed_tasks_pending_ = false; |
| 354 | 564 |
| 565 TaskDeque completed_tasks; | |
| 566 | |
| 355 // Schedule another check for completed tasks if not idle. | 567 // Schedule another check for completed tasks if not idle. |
| 356 if (!inner_->CollectCompletedTasks()) | 568 if (!inner_->CollectCompletedTasks(&completed_tasks)) |
| 357 ScheduleCheckForCompletedTasks(); | 569 ScheduleCheckForCompletedTasks(); |
| 358 | 570 |
| 359 DispatchCompletionCallbacks(); | 571 DispatchCompletionCallbacks(&completed_tasks); |
| 360 } | 572 } |
| 361 | 573 |
| 362 void WorkerPool::DispatchCompletionCallbacks() { | 574 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
| 363 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); | 575 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
| 364 | 576 |
| 365 if (completed_tasks_.empty()) | 577 // Early out when |completed_tasks| is empty to prevent unnecessary |
| 578 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). | |
| 579 if (completed_tasks->empty()) | |
| 366 return; | 580 return; |
| 367 | 581 |
| 368 while (completed_tasks_.size()) { | 582 while (!completed_tasks->empty()) { |
| 369 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); | 583 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
| 584 completed_tasks->pop_front(); | |
| 370 task->DidComplete(); | 585 task->DidComplete(); |
| 586 task->DispatchCompletionCallback(); | |
| 371 } | 587 } |
| 372 | 588 |
| 373 DCHECK(client_); | 589 DCHECK(client_); |
| 374 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); | 590 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
| 375 } | 591 } |
| 376 | 592 |
| 377 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 593 void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) { |
| 378 // Schedule check for completed tasks if not pending. | 594 TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); |
| 379 ScheduleCheckForCompletedTasks(); | |
| 380 | 595 |
| 381 inner_->PostTask(task.Pass()); | 596 // Schedule check for completed tasks. |
| 597 if (root) | |
| 598 ScheduleCheckForCompletedTasks(); | |
| 599 | |
| 600 inner_->ScheduleTasks(root); | |
| 382 } | 601 } |
| 383 | 602 |
| 384 } // namespace cc | 603 } // namespace cc |
| OLD | NEW |