Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "cc/base/worker_pool.h" | 5 #include "cc/base/worker_pool.h" |
| 6 | 6 |
| 7 #if defined(OS_ANDROID) | 7 #if defined(OS_ANDROID) |
| 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
| 9 #include <sys/resource.h> | 9 #include <sys/resource.h> |
| 10 #endif | 10 #endif |
| 11 | 11 |
| 12 #include <algorithm> | 12 #include <set> |
| 13 | 13 |
| 14 #include "base/bind.h" | 14 #include "base/bind.h" |
| 15 #include "base/debug/trace_event.h" | 15 #include "base/debug/trace_event.h" |
| 16 #include "base/stringprintf.h" | 16 #include "base/stringprintf.h" |
| 17 #include "base/synchronization/condition_variable.h" | 17 #include "base/synchronization/condition_variable.h" |
| 18 #include "base/threading/simple_thread.h" | 18 #include "base/threading/simple_thread.h" |
| 19 #include "cc/base/scoped_ptr_deque.h" | |
| 19 | 20 |
| 20 namespace cc { | 21 namespace cc { |
| 21 | 22 |
| 22 namespace { | 23 namespace { |
| 23 | 24 |
| 24 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { | 25 class WorkerPoolTaskGraphImpl : public internal::WorkerPoolTaskGraph { |
| 25 public: | 26 public: |
| 26 WorkerPoolTaskImpl(const WorkerPool::Callback& task, | 27 WorkerPoolTaskGraphImpl() {} |
| 27 const base::Closure& reply) | |
| 28 : internal::WorkerPoolTask(reply), | |
| 29 task_(task) {} | |
| 30 | 28 |
| 31 virtual void RunOnThread(unsigned thread_index) OVERRIDE { | 29 // Overridden from internal::WorkerPoolTaskGraph: |
| 32 task_.Run(); | 30 virtual bool HasMoreTasks() OVERRIDE { return false; } |
| 31 virtual bool HasTask(internal::WorkerPoolTask* task) OVERRIDE { | |
| 32 return false; | |
| 33 } | |
| 34 virtual internal::WorkerPoolTask* TopTask() OVERRIDE { | |
| 35 NOTREACHED(); | |
| 36 return NULL; | |
| 37 } | |
| 38 virtual scoped_refptr<internal::WorkerPoolTask> TakeTask( | |
| 39 internal::WorkerPoolTask* task) OVERRIDE { | |
| 40 NOTREACHED(); | |
| 41 return NULL; | |
| 33 } | 42 } |
| 34 | 43 |
| 35 private: | 44 private: |
| 36 WorkerPool::Callback task_; | 45 virtual ~WorkerPoolTaskGraphImpl() {} |
| 37 }; | 46 }; |
| 38 | 47 |
| 39 } // namespace | 48 } // namespace |
| 40 | 49 |
| 41 namespace internal { | |
| 42 | |
| 43 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { | |
| 44 } | |
| 45 | |
| 46 WorkerPoolTask::~WorkerPoolTask() { | |
| 47 } | |
| 48 | |
| 49 void WorkerPoolTask::DidComplete() { | |
| 50 reply_.Run(); | |
| 51 } | |
| 52 | |
| 53 } // namespace internal | |
| 54 | |
| 55 // Internal to the worker pool. Any data or logic that needs to be | 50 // Internal to the worker pool. Any data or logic that needs to be |
| 56 // shared between threads lives in this class. All members are guarded | 51 // shared between threads lives in this class. All members are guarded |
| 57 // by |lock_|. | 52 // by |lock_|. |
| 58 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { | 53 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
| 59 public: | 54 public: |
| 60 Inner(WorkerPool* worker_pool, | 55 Inner(WorkerPool* worker_pool, |
| 61 size_t num_threads, | 56 size_t num_threads, |
| 62 const std::string& thread_name_prefix); | 57 const std::string& thread_name_prefix); |
| 63 virtual ~Inner(); | 58 virtual ~Inner(); |
| 64 | 59 |
| 65 void Shutdown(); | 60 void Shutdown(); |
| 66 | 61 |
| 67 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); | 62 // Schedule running of tasks in |task_graph|. All tasks previously |
| 63 // scheduled but not present in |task_graph| will be canceled unless | |
| 64 // already running. Canceled tasks are moved to |completed_tasks_| | |
| 65 // without being run. The result is that once scheduled, a task is | |
| 66 // guaranteed to end up in the |completed_tasks_| queue even if they | |
| 67 // later get canceled by another call to ScheduleTasks(). | |
| 68 void ScheduleTasks(scoped_ptr<internal::WorkerPoolTaskGraph> task_graph); | |
| 68 | 69 |
| 69 // Appends all completed tasks to worker pool's completed tasks queue | 70 // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
| 70 // and returns true if idle. | 71 bool CollectCompletedTasks(TaskDeque* completed_tasks); |
| 71 bool CollectCompletedTasks(); | |
| 72 | 72 |
| 73 private: | 73 private: |
| 74 // Appends all completed tasks to |completed_tasks|. Lock must | 74 // Collect all completed tasks by swapping the contents of |
| 75 // already be acquired before calling this function. | 75 // |completed_tasks| and |completed_tasks_|. Lock must be acquired |
| 76 bool AppendCompletedTasksWithLockAcquired( | 76 // before calling this function. Returns true if idle. |
| 77 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); | 77 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); |
| 78 | 78 |
| 79 // Schedule an OnIdleOnOriginThread callback if not already pending. | 79 // Schedule an OnIdleOnOriginThread callback if not already pending. |
| 80 // Lock must already be acquired before calling this function. | 80 // Lock must already be acquired before calling this function. |
| 81 void ScheduleOnIdleWithLockAcquired(); | 81 void ScheduleOnIdleWithLockAcquired(); |
| 82 void OnIdleOnOriginThread(); | 82 void OnIdleOnOriginThread(); |
| 83 | 83 |
| 84 // Overridden from base::DelegateSimpleThread: | 84 // Overridden from base::DelegateSimpleThread: |
| 85 virtual void Run() OVERRIDE; | 85 virtual void Run() OVERRIDE; |
| 86 | 86 |
| 87 // Pointer to worker pool. Can only be used on origin thread. | 87 // Pointer to worker pool. Can only be used on origin thread. |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 103 base::WeakPtrFactory<Inner> weak_ptr_factory_; | 103 base::WeakPtrFactory<Inner> weak_ptr_factory_; |
| 104 | 104 |
| 105 const base::Closure on_idle_callback_; | 105 const base::Closure on_idle_callback_; |
| 106 // Set when a OnIdleOnOriginThread() callback is pending. | 106 // Set when a OnIdleOnOriginThread() callback is pending. |
| 107 bool on_idle_pending_; | 107 bool on_idle_pending_; |
| 108 | 108 |
| 109 // Provides each running thread loop with a unique index. First thread | 109 // Provides each running thread loop with a unique index. First thread |
| 110 // loop index is 0. | 110 // loop index is 0. |
| 111 unsigned next_thread_index_; | 111 unsigned next_thread_index_; |
| 112 | 112 |
| 113 // Number of tasks currently running. | |
| 114 unsigned running_task_count_; | |
| 115 | |
| 116 // Set during shutdown. Tells workers to exit when no more tasks | 113 // Set during shutdown. Tells workers to exit when no more tasks |
| 117 // are pending. | 114 // are pending. |
| 118 bool shutdown_; | 115 bool shutdown_; |
| 119 | 116 |
| 120 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; | 117 // The task graph. Provides tasks in order of priority. |
| 121 TaskDeque pending_tasks_; | 118 scoped_ptr<internal::WorkerPoolTaskGraph> task_graph_; |
| 119 | |
| 120 // This set contains all currently running tasks. | |
| 121 typedef std::set<internal::WorkerPoolTask*> TaskSet; | |
| 122 TaskSet running_tasks_; | |
| 123 | |
| 124 // Completed tasks not yet collected by origin thread. | |
| 122 TaskDeque completed_tasks_; | 125 TaskDeque completed_tasks_; |
| 123 | 126 |
| 124 ScopedPtrDeque<base::DelegateSimpleThread> workers_; | 127 ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
| 125 | 128 |
| 126 DISALLOW_COPY_AND_ASSIGN(Inner); | 129 DISALLOW_COPY_AND_ASSIGN(Inner); |
| 127 }; | 130 }; |
| 128 | 131 |
| 129 WorkerPool::Inner::Inner(WorkerPool* worker_pool, | 132 WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
| 130 size_t num_threads, | 133 size_t num_threads, |
| 131 const std::string& thread_name_prefix) | 134 const std::string& thread_name_prefix) |
| 132 : worker_pool_on_origin_thread_(worker_pool), | 135 : worker_pool_on_origin_thread_(worker_pool), |
| 133 lock_(), | 136 lock_(), |
| 134 has_pending_tasks_cv_(&lock_), | 137 has_pending_tasks_cv_(&lock_), |
| 135 origin_loop_(base::MessageLoopProxy::current()), | 138 origin_loop_(base::MessageLoopProxy::current()), |
| 136 weak_ptr_factory_(this), | 139 weak_ptr_factory_(this), |
| 137 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, | 140 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
| 138 weak_ptr_factory_.GetWeakPtr())), | 141 weak_ptr_factory_.GetWeakPtr())), |
| 139 on_idle_pending_(false), | 142 on_idle_pending_(false), |
| 140 next_thread_index_(0), | 143 next_thread_index_(0), |
| 141 running_task_count_(0), | 144 shutdown_(false), |
| 142 shutdown_(false) { | 145 task_graph_(new WorkerPoolTaskGraphImpl) { |
| 143 base::AutoLock lock(lock_); | 146 base::AutoLock lock(lock_); |
| 144 | 147 |
| 145 while (workers_.size() < num_threads) { | 148 while (workers_.size() < num_threads) { |
| 146 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | 149 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
| 147 new base::DelegateSimpleThread( | 150 new base::DelegateSimpleThread( |
| 148 this, | 151 this, |
| 149 thread_name_prefix + | 152 thread_name_prefix + |
| 150 base::StringPrintf( | 153 base::StringPrintf( |
| 151 "Worker%u", | 154 "Worker%u", |
| 152 static_cast<unsigned>(workers_.size() + 1)).c_str())); | 155 static_cast<unsigned>(workers_.size() + 1)).c_str())); |
| 153 worker->Start(); | 156 worker->Start(); |
| 154 workers_.push_back(worker.Pass()); | 157 workers_.push_back(worker.Pass()); |
| 155 } | 158 } |
| 156 } | 159 } |
| 157 | 160 |
| 158 WorkerPool::Inner::~Inner() { | 161 WorkerPool::Inner::~Inner() { |
| 159 base::AutoLock lock(lock_); | 162 base::AutoLock lock(lock_); |
| 160 | 163 |
| 161 DCHECK(shutdown_); | 164 DCHECK(shutdown_); |
| 162 | 165 |
| 163 // Cancel all pending callbacks. | 166 DCHECK(!task_graph_->HasMoreTasks()); |
| 164 weak_ptr_factory_.InvalidateWeakPtrs(); | 167 DCHECK_EQ(0u, running_tasks_.size()); |
| 165 | |
| 166 DCHECK_EQ(0u, pending_tasks_.size()); | |
| 167 DCHECK_EQ(0u, completed_tasks_.size()); | 168 DCHECK_EQ(0u, completed_tasks_.size()); |
| 168 DCHECK_EQ(0u, running_task_count_); | |
| 169 } | 169 } |
| 170 | 170 |
| 171 void WorkerPool::Inner::Shutdown() { | 171 void WorkerPool::Inner::Shutdown() { |
| 172 { | 172 { |
| 173 base::AutoLock lock(lock_); | 173 base::AutoLock lock(lock_); |
| 174 | 174 |
| 175 DCHECK(!shutdown_); | 175 DCHECK(!shutdown_); |
| 176 shutdown_ = true; | 176 shutdown_ = true; |
| 177 | 177 |
| 178 // Wake up a worker so it knows it should exit. This will cause all workers | 178 // Wake up a worker so it knows it should exit. This will cause all workers |
| 179 // to exit as each will wake up another worker before exiting. | 179 // to exit as each will wake up another worker before exiting. |
| 180 has_pending_tasks_cv_.Signal(); | 180 has_pending_tasks_cv_.Signal(); |
| 181 } | 181 } |
| 182 | 182 |
| 183 while (workers_.size()) { | 183 while (workers_.size()) { |
| 184 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | 184 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
| 185 worker->Join(); | 185 worker->Join(); |
| 186 } | 186 } |
| 187 | |
| 188 // Cancel any pending OnIdle callback. | |
| 189 weak_ptr_factory_.InvalidateWeakPtrs(); | |
| 187 } | 190 } |
| 188 | 191 |
| 189 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 192 void WorkerPool::Inner::ScheduleTasks( |
| 193 scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) { | |
| 190 base::AutoLock lock(lock_); | 194 base::AutoLock lock(lock_); |
| 191 | 195 |
| 192 pending_tasks_.push_back(task.Pass()); | 196 // Move tasks not present in new graph to |completed_tasks_|. |
| 197 while (task_graph_->HasMoreTasks()) { | |
| 198 scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask( | |
| 199 task_graph_->TopTask()); | |
| 200 | |
| 201 // Task has not completed if present in new graph. | |
| 202 if (task_graph->HasTask(task)) | |
| 203 continue; | |
| 204 | |
| 205 completed_tasks_.push_back(task); | |
| 206 } | |
| 207 | |
| 208 // Take any running tasks from new graph. | |
| 209 for (TaskSet::iterator it = running_tasks_.begin(); | |
| 210 it != running_tasks_.end(); ++it) { | |
| 211 if (task_graph->HasTask(*it)) | |
| 212 task_graph->TakeTask(*it); | |
| 213 } | |
| 214 | |
| 215 // And take any completed tasks from new graph. | |
| 216 for (TaskDeque::iterator it = completed_tasks_.begin(); | |
| 217 it != completed_tasks_.end(); ++it) { | |
| 218 if (task_graph->HasTask(*it)) | |
| 219 task_graph->TakeTask(*it); | |
| 220 } | |
| 221 | |
| 222 // Finally switch to the new graph. | |
| 223 task_graph_ = task_graph.Pass(); | |
| 193 | 224 |
| 194 // There is more work available, so wake up worker thread. | 225 // There is more work available, so wake up worker thread. |
| 195 has_pending_tasks_cv_.Signal(); | 226 has_pending_tasks_cv_.Signal(); |
| 196 } | 227 } |
| 197 | 228 |
| 198 bool WorkerPool::Inner::CollectCompletedTasks() { | 229 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { |
| 199 base::AutoLock lock(lock_); | 230 base::AutoLock lock(lock_); |
| 200 | 231 |
| 201 return AppendCompletedTasksWithLockAcquired( | 232 return CollectCompletedTasksWithLockAcquired(completed_tasks); |
| 202 &worker_pool_on_origin_thread_->completed_tasks_); | |
| 203 } | 233 } |
| 204 | 234 |
| 205 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( | 235 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
| 206 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { | 236 TaskDeque* completed_tasks) { |
| 207 lock_.AssertAcquired(); | 237 lock_.AssertAcquired(); |
| 208 | 238 |
| 209 while (completed_tasks_.size()) | 239 DCHECK_EQ(0u, completed_tasks->size()); |
| 210 completed_tasks->push_back(completed_tasks_.take_front().Pass()); | 240 completed_tasks->swap(completed_tasks_); |
| 211 | 241 |
| 212 return !running_task_count_ && pending_tasks_.empty(); | 242 return running_tasks_.empty() && !task_graph_->HasMoreTasks(); |
| 213 } | 243 } |
| 214 | 244 |
| 215 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { | 245 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
| 216 lock_.AssertAcquired(); | 246 lock_.AssertAcquired(); |
| 217 | 247 |
| 218 if (on_idle_pending_) | 248 if (on_idle_pending_) |
| 219 return; | 249 return; |
| 220 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); | 250 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); |
| 221 on_idle_pending_ = true; | 251 on_idle_pending_ = true; |
| 222 } | 252 } |
| 223 | 253 |
| 224 void WorkerPool::Inner::OnIdleOnOriginThread() { | 254 void WorkerPool::Inner::OnIdleOnOriginThread() { |
| 255 TaskDeque completed_tasks; | |
| 256 | |
| 225 { | 257 { |
| 226 base::AutoLock lock(lock_); | 258 base::AutoLock lock(lock_); |
| 227 | 259 |
| 228 DCHECK(on_idle_pending_); | 260 DCHECK(on_idle_pending_); |
| 229 on_idle_pending_ = false; | 261 on_idle_pending_ = false; |
| 230 | 262 |
| 231 // Early out if no longer idle. | 263 // Early out if no longer idle. |
| 232 if (running_task_count_ || !pending_tasks_.empty()) | 264 if (!running_tasks_.empty() || task_graph_->HasMoreTasks()) |
| 233 return; | 265 return; |
| 234 | 266 |
| 235 AppendCompletedTasksWithLockAcquired( | 267 CollectCompletedTasksWithLockAcquired(&completed_tasks); |
| 236 &worker_pool_on_origin_thread_->completed_tasks_); | |
| 237 } | 268 } |
| 238 | 269 |
| 239 worker_pool_on_origin_thread_->OnIdle(); | 270 worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
| 240 } | 271 } |
| 241 | 272 |
| 242 void WorkerPool::Inner::Run() { | 273 void WorkerPool::Inner::Run() { |
| 243 #if defined(OS_ANDROID) | 274 #if defined(OS_ANDROID) |
| 244 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 275 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
| 245 int nice_value = 10; // Idle priority. | 276 int nice_value = 10; // Idle priority. |
| 246 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); | 277 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); |
| 247 #endif | 278 #endif |
| 248 | 279 |
| 249 base::AutoLock lock(lock_); | 280 base::AutoLock lock(lock_); |
| 250 | 281 |
| 251 // Get a unique thread index. | 282 // Get a unique thread index. |
| 252 int thread_index = next_thread_index_++; | 283 int thread_index = next_thread_index_++; |
| 253 | 284 |
| 254 while (true) { | 285 while (true) { |
| 255 if (pending_tasks_.empty()) { | 286 if (!task_graph_->HasMoreTasks()) { |
| 256 // Exit when shutdown is set and no more tasks are pending. | 287 // Exit when shutdown is set and no more tasks are pending. |
| 257 if (shutdown_) | 288 if (shutdown_) |
|
vmpstr
2013/05/06 23:08:48
Now that task_graph_ contains all work that needs
reveman
2013/05/07 01:33:54
Yes, we could cancel all pending tasks at shutdown
| |
| 258 break; | 289 break; |
| 259 | 290 |
| 260 // Schedule an idle callback if requested and not pending. | 291 // Schedule an idle callback if not tasks are running. |
|
vmpstr
2013/05/06 23:08:48
nit: not -> no
reveman
2013/05/07 01:33:54
Done.
| |
| 261 if (!running_task_count_) | 292 if (running_tasks_.empty()) |
| 262 ScheduleOnIdleWithLockAcquired(); | 293 ScheduleOnIdleWithLockAcquired(); |
| 263 | 294 |
| 264 // Wait for new pending tasks. | 295 // Wait for more tasks. |
| 265 has_pending_tasks_cv_.Wait(); | 296 has_pending_tasks_cv_.Wait(); |
| 266 continue; | 297 continue; |
| 267 } | 298 } |
| 268 | 299 |
| 269 // Get next task. | 300 // Take the highest priority task from the task graph. |
| 270 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); | 301 scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask( |
| 302 task_graph_->TopTask()); | |
| 271 | 303 |
| 272 // Increment |running_task_count_| before starting to run task. | 304 // Insert task in |running_tasks_| before starting to run it. |
| 273 running_task_count_++; | 305 running_tasks_.insert(task); |
| 274 | 306 |
| 275 // There may be more work available, so wake up another | 307 // There may be more work available, so wake up another worker thread. |
| 276 // worker thread. | |
| 277 has_pending_tasks_cv_.Signal(); | 308 has_pending_tasks_cv_.Signal(); |
| 278 | 309 |
| 279 { | 310 { |
| 280 base::AutoUnlock unlock(lock_); | 311 base::AutoUnlock unlock(lock_); |
| 281 | 312 |
| 282 task->RunOnThread(thread_index); | 313 task->RunOnThread(thread_index); |
| 283 } | 314 } |
| 284 | 315 |
| 285 completed_tasks_.push_back(task.Pass()); | 316 // Remove task from |running_tasks_| now that we are done running it. |
| 317 running_tasks_.erase(task); | |
| 286 | 318 |
| 287 // Decrement |running_task_count_| now that we are done running task. | 319 // Finally add task to |completed_tasks_|. |
| 288 running_task_count_--; | 320 completed_tasks_.push_back(task); |
| 289 } | 321 } |
| 290 | 322 |
| 291 // We noticed we should exit. Wake up the next worker so it knows it should | 323 // We noticed we should exit. Wake up the next worker so it knows it should |
| 292 // exit as well (because the Shutdown() code only signals once). | 324 // exit as well (because the Shutdown() code only signals once). |
| 293 has_pending_tasks_cv_.Signal(); | 325 has_pending_tasks_cv_.Signal(); |
| 294 } | 326 } |
| 295 | 327 |
| 296 WorkerPool::WorkerPool(WorkerPoolClient* client, | 328 WorkerPool::WorkerPool(WorkerPoolClient* client, |
| 297 size_t num_threads, | 329 size_t num_threads, |
| 298 base::TimeDelta check_for_completed_tasks_delay, | 330 base::TimeDelta check_for_completed_tasks_delay, |
| 299 const std::string& thread_name_prefix) | 331 const std::string& thread_name_prefix) |
| 300 : client_(client), | 332 : client_(client), |
| 301 origin_loop_(base::MessageLoopProxy::current()), | 333 origin_loop_(base::MessageLoopProxy::current()), |
| 302 weak_ptr_factory_(this), | |
| 303 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), | 334 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
| 304 check_for_completed_tasks_pending_(false), | 335 check_for_completed_tasks_pending_(false), |
| 305 inner_(make_scoped_ptr(new Inner(this, | 336 inner_(make_scoped_ptr(new Inner(this, |
| 306 num_threads, | 337 num_threads, |
| 307 thread_name_prefix))) { | 338 thread_name_prefix))) { |
| 308 } | 339 } |
| 309 | 340 |
| 310 WorkerPool::~WorkerPool() { | 341 WorkerPool::~WorkerPool() { |
| 311 // Cancel all pending callbacks. | |
| 312 weak_ptr_factory_.InvalidateWeakPtrs(); | |
| 313 | |
| 314 DCHECK_EQ(0u, completed_tasks_.size()); | |
| 315 } | 342 } |
| 316 | 343 |
| 317 void WorkerPool::Shutdown() { | 344 void WorkerPool::Shutdown() { |
| 318 inner_->Shutdown(); | 345 inner_->Shutdown(); |
| 319 inner_->CollectCompletedTasks(); | 346 |
| 320 DispatchCompletionCallbacks(); | 347 TaskDeque completed_tasks; |
| 348 inner_->CollectCompletedTasks(&completed_tasks); | |
| 349 DispatchCompletionCallbacks(&completed_tasks); | |
| 321 } | 350 } |
| 322 | 351 |
| 323 void WorkerPool::PostTaskAndReply( | 352 void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
| 324 const Callback& task, const base::Closure& reply) { | |
| 325 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( | |
| 326 task, | |
| 327 reply)).PassAs<internal::WorkerPoolTask>()); | |
| 328 } | |
| 329 | |
| 330 void WorkerPool::OnIdle() { | |
| 331 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); | 353 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
| 332 | 354 |
| 333 DispatchCompletionCallbacks(); | 355 DispatchCompletionCallbacks(completed_tasks); |
| 356 | |
| 357 // Cancel any pending check for completed tasks. | |
| 358 check_for_completed_tasks_callback_.Cancel(); | |
| 359 check_for_completed_tasks_pending_ = false; | |
| 334 } | 360 } |
| 335 | 361 |
| 336 void WorkerPool::ScheduleCheckForCompletedTasks() { | 362 void WorkerPool::ScheduleCheckForCompletedTasks() { |
| 337 if (check_for_completed_tasks_pending_) | 363 if (check_for_completed_tasks_pending_) |
| 338 return; | 364 return; |
| 365 check_for_completed_tasks_callback_.Reset( | |
| 366 base::Bind(&WorkerPool::CheckForCompletedTasks, | |
| 367 base::Unretained(this))); | |
| 339 origin_loop_->PostDelayedTask( | 368 origin_loop_->PostDelayedTask( |
| 340 FROM_HERE, | 369 FROM_HERE, |
| 341 base::Bind(&WorkerPool::CheckForCompletedTasks, | 370 check_for_completed_tasks_callback_.callback(), |
| 342 weak_ptr_factory_.GetWeakPtr()), | |
| 343 check_for_completed_tasks_delay_); | 371 check_for_completed_tasks_delay_); |
| 344 check_for_completed_tasks_pending_ = true; | 372 check_for_completed_tasks_pending_ = true; |
| 345 } | 373 } |
| 346 | 374 |
| 347 void WorkerPool::CheckForCompletedTasks() { | 375 void WorkerPool::CheckForCompletedTasks() { |
| 348 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); | 376 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
| 349 DCHECK(check_for_completed_tasks_pending_); | 377 DCHECK(check_for_completed_tasks_pending_); |
| 350 check_for_completed_tasks_pending_ = false; | 378 check_for_completed_tasks_pending_ = false; |
| 351 | 379 |
| 380 TaskDeque completed_tasks; | |
| 381 | |
| 352 // Schedule another check for completed tasks if not idle. | 382 // Schedule another check for completed tasks if not idle. |
| 353 if (!inner_->CollectCompletedTasks()) | 383 if (!inner_->CollectCompletedTasks(&completed_tasks)) |
| 354 ScheduleCheckForCompletedTasks(); | 384 ScheduleCheckForCompletedTasks(); |
| 355 | 385 |
| 356 DispatchCompletionCallbacks(); | 386 DispatchCompletionCallbacks(&completed_tasks); |
| 357 } | 387 } |
| 358 | 388 |
| 359 void WorkerPool::DispatchCompletionCallbacks() { | 389 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
| 360 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); | 390 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
| 361 | 391 |
| 362 if (completed_tasks_.empty()) | 392 // Early out when |completed_tasks| is empty to prevent unnecessary |
| 393 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). | |
| 394 if (completed_tasks->empty()) | |
| 363 return; | 395 return; |
| 364 | 396 |
| 365 while (completed_tasks_.size()) { | 397 while (completed_tasks->size()) { |
|
vmpstr
2013/05/06 23:08:48
nit: consider !empty
reveman
2013/05/07 01:33:54
Done.
| |
| 366 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); | 398 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
| 367 task->DidComplete(); | 399 completed_tasks->pop_front(); |
| 400 task->DispatchCompletionCallback(); | |
| 368 } | 401 } |
| 369 | 402 |
| 370 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); | 403 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
| 371 } | 404 } |
| 372 | 405 |
| 373 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 406 void WorkerPool::ScheduleTasks( |
| 374 // Schedule check for completed tasks if not pending. | 407 scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) { |
| 375 ScheduleCheckForCompletedTasks(); | 408 if (task_graph->HasMoreTasks()) |
| 376 | 409 ScheduleCheckForCompletedTasks(); |
| 377 inner_->PostTask(task.Pass()); | 410 inner_->ScheduleTasks(task_graph.Pass()); |
| 378 } | 411 } |
| 379 | 412 |
| 380 } // namespace cc | 413 } // namespace cc |
| OLD | NEW |