Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(111)

Side by Side Diff: cc/base/worker_pool.cc

Issue 14689004: Re-land: cc: Cancel and re-prioritize worker pool tasks. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: keep image cache check Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "cc/base/worker_pool.h" 5 #include "cc/base/worker_pool.h"
6 6
7 #if defined(OS_ANDROID) 7 #if defined(OS_ANDROID)
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
9 #include <sys/resource.h> 9 #include <sys/resource.h>
10 #endif 10 #endif
11 11
12 #include <algorithm> 12 #include <map>
13 13
14 #include "base/bind.h" 14 #include "base/bind.h"
15 #include "base/debug/trace_event.h" 15 #include "base/debug/trace_event.h"
16 #include "base/hash_tables.h"
16 #include "base/stringprintf.h" 17 #include "base/stringprintf.h"
17 #include "base/synchronization/condition_variable.h"
18 #include "base/threading/simple_thread.h" 18 #include "base/threading/simple_thread.h"
19 #include "base/threading/thread_restrictions.h" 19 #include "base/threading/thread_restrictions.h"
20 #include "cc/base/scoped_ptr_deque.h"
21 #include "cc/base/scoped_ptr_hash_map.h"
22
23 #if defined(COMPILER_GCC)
vmpstr 2013/05/24 17:01:37 Does this part just work with clang?
reveman 2013/05/24 18:53:48 yes, you'll find the same throughout the chromium
24 namespace BASE_HASH_NAMESPACE {
25 template <> struct hash<cc::internal::WorkerPoolTask*> {
26 size_t operator()(cc::internal::WorkerPoolTask* ptr) const {
27 return hash<size_t>()(reinterpret_cast<size_t>(ptr));
28 }
29 };
30 } // namespace BASE_HASH_NAMESPACE
31 #endif // COMPILER
20 32
21 namespace cc { 33 namespace cc {
22 34
23 namespace {
24
25 class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
26 public:
27 WorkerPoolTaskImpl(const WorkerPool::Callback& task,
28 const base::Closure& reply)
29 : internal::WorkerPoolTask(reply),
30 task_(task) {}
31
32 virtual void RunOnThread(unsigned thread_index) OVERRIDE {
33 task_.Run();
34 }
35
36 private:
37 WorkerPool::Callback task_;
38 };
39
40 } // namespace
41
42 namespace internal { 35 namespace internal {
43 36
44 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { 37 WorkerPoolTask::WorkerPoolTask()
38 : did_schedule_(false),
39 did_run_(false),
40 did_complete_(false) {
41 }
42
43 WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies)
44 : did_schedule_(false),
45 did_run_(false),
46 did_complete_(false) {
47 dependencies_.swap(*dependencies);
45 } 48 }
46 49
47 WorkerPoolTask::~WorkerPoolTask() { 50 WorkerPoolTask::~WorkerPoolTask() {
51 DCHECK_EQ(did_schedule_, did_complete_);
52 DCHECK(!did_run_ || did_schedule_);
53 DCHECK(!did_run_ || did_complete_);
54 }
55
56 void WorkerPoolTask::DidSchedule() {
57 DCHECK(!did_complete_);
58 did_schedule_ = true;
59 }
60
61 void WorkerPoolTask::WillRun() {
62 DCHECK(did_schedule_);
63 DCHECK(!did_complete_);
64 DCHECK(!did_run_);
65 }
66
67 void WorkerPoolTask::DidRun() {
68 did_run_ = true;
48 } 69 }
49 70
50 void WorkerPoolTask::DidComplete() { 71 void WorkerPoolTask::DidComplete() {
51 reply_.Run(); 72 DCHECK(did_schedule_);
73 DCHECK(!did_complete_);
74 did_complete_ = true;
75 }
76
77 bool WorkerPoolTask::IsReadyToRun() const {
78 // TODO(reveman): Use counter to improve performance.
79 for (TaskVector::const_reverse_iterator it = dependencies_.rbegin();
80 it != dependencies_.rend(); ++it) {
81 WorkerPoolTask* dependency = *it;
82 if (!dependency->HasFinishedRunning())
83 return false;
84 }
85 return true;
86 }
87
88 bool WorkerPoolTask::HasFinishedRunning() const {
89 return did_run_;
90 }
91
92 bool WorkerPoolTask::HasCompleted() const {
93 return did_complete_;
52 } 94 }
53 95
54 } // namespace internal 96 } // namespace internal
55 97
56 // Internal to the worker pool. Any data or logic that needs to be 98 // Internal to the worker pool. Any data or logic that needs to be
57 // shared between threads lives in this class. All members are guarded 99 // shared between threads lives in this class. All members are guarded
58 // by |lock_|. 100 // by |lock_|.
59 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { 101 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
60 public: 102 public:
61 Inner(WorkerPool* worker_pool, 103 Inner(WorkerPool* worker_pool,
62 size_t num_threads, 104 size_t num_threads,
63 const std::string& thread_name_prefix); 105 const std::string& thread_name_prefix);
64 virtual ~Inner(); 106 virtual ~Inner();
65 107
66 void Shutdown(); 108 void Shutdown();
67 109
68 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); 110 // Schedule running of |root| task and all its dependencies. Tasks
111 // previously scheduled but no longer needed to run |root| will be
112 // canceled unless already running. Canceled tasks are moved to
113 // |completed_tasks_| without being run. The result is that once
114 // scheduled, a task is guaranteed to end up in the |completed_tasks_|
115 // queue even if they later get canceled by another call to
116 // ScheduleTasks().
117 void ScheduleTasks(internal::WorkerPoolTask* root);
69 118
70 // Appends all completed tasks to worker pool's completed tasks queue 119 // Collect all completed tasks in |completed_tasks|. Returns true if idle.
71 // and returns true if idle. 120 bool CollectCompletedTasks(TaskDeque* completed_tasks);
72 bool CollectCompletedTasks();
73 121
74 private: 122 private:
75 // Appends all completed tasks to |completed_tasks|. Lock must 123 class ScheduledTask {
76 // already be acquired before calling this function. 124 public:
77 bool AppendCompletedTasksWithLockAcquired( 125 ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority)
78 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); 126 : priority_(priority) {
127 if (dependent)
128 dependents_.push_back(dependent);
129 }
130
131 internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; }
132 unsigned priority() const { return priority_; }
133
134 private:
135 internal::WorkerPoolTask::TaskVector dependents_;
136 unsigned priority_;
137 };
138 typedef internal::WorkerPoolTask* ScheduledTaskMapKey;
139 typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask>
140 ScheduledTaskMap;
141
142 // This builds a ScheduledTaskMap from a root task.
143 static unsigned BuildScheduledTaskMapRecursive(
144 internal::WorkerPoolTask* task,
145 internal::WorkerPoolTask* dependent,
146 unsigned priority,
147 ScheduledTaskMap* scheduled_tasks);
148 static void BuildScheduledTaskMap(
149 internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks);
150
151 // Collect all completed tasks by swapping the contents of
152 // |completed_tasks| and |completed_tasks_|. Lock must be acquired
153 // before calling this function. Returns true if idle.
154 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
79 155
80 // Schedule an OnIdleOnOriginThread callback if not already pending. 156 // Schedule an OnIdleOnOriginThread callback if not already pending.
81 // Lock must already be acquired before calling this function. 157 // Lock must already be acquired before calling this function.
82 void ScheduleOnIdleWithLockAcquired(); 158 void ScheduleOnIdleWithLockAcquired();
83 void OnIdleOnOriginThread(); 159 void OnIdleOnOriginThread();
84 160
85 // Overridden from base::DelegateSimpleThread: 161 // Overridden from base::DelegateSimpleThread:
86 virtual void Run() OVERRIDE; 162 virtual void Run() OVERRIDE;
87 163
88 // Pointer to worker pool. Can only be used on origin thread. 164 // Pointer to worker pool. Can only be used on origin thread.
89 // Not guarded by |lock_|. 165 // Not guarded by |lock_|.
90 WorkerPool* worker_pool_on_origin_thread_; 166 WorkerPool* worker_pool_on_origin_thread_;
91 167
92 // This lock protects all members of this class except 168 // This lock protects all members of this class except
93 // |worker_pool_on_origin_thread_|. Do not read or modify anything 169 // |worker_pool_on_origin_thread_|. Do not read or modify anything
94 // without holding this lock. Do not block while holding this lock. 170 // without holding this lock. Do not block while holding this lock.
95 mutable base::Lock lock_; 171 mutable base::Lock lock_;
96 172
97 // Condition variable that is waited on by worker threads until new 173 // Condition variable that is waited on by worker threads until new
98 // tasks are posted or shutdown starts. 174 // tasks are ready to run or shutdown starts.
99 base::ConditionVariable has_pending_tasks_cv_; 175 base::ConditionVariable has_ready_to_run_tasks_cv_;
100 176
101 // Target message loop used for posting callbacks. 177 // Target message loop used for posting callbacks.
102 scoped_refptr<base::MessageLoopProxy> origin_loop_; 178 scoped_refptr<base::MessageLoopProxy> origin_loop_;
103 179
104 base::WeakPtrFactory<Inner> weak_ptr_factory_; 180 base::WeakPtrFactory<Inner> weak_ptr_factory_;
105 181
106 const base::Closure on_idle_callback_; 182 const base::Closure on_idle_callback_;
107 // Set when a OnIdleOnOriginThread() callback is pending. 183 // Set when a OnIdleOnOriginThread() callback is pending.
108 bool on_idle_pending_; 184 bool on_idle_pending_;
109 185
110 // Provides each running thread loop with a unique index. First thread 186 // Provides each running thread loop with a unique index. First thread
111 // loop index is 0. 187 // loop index is 0.
112 unsigned next_thread_index_; 188 unsigned next_thread_index_;
113 189
114 // Number of tasks currently running.
115 unsigned running_task_count_;
116
117 // Set during shutdown. Tells workers to exit when no more tasks 190 // Set during shutdown. Tells workers to exit when no more tasks
118 // are pending. 191 // are pending.
119 bool shutdown_; 192 bool shutdown_;
120 193
121 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; 194 // The root task that is a dependent of all other tasks.
122 TaskDeque pending_tasks_; 195 scoped_refptr<internal::WorkerPoolTask> root_;
196
197 // This set contains all pending tasks.
198 ScheduledTaskMap pending_tasks_;
199
200 // Ordered set of tasks that are ready to run.
201 // TODO(reveman): priority_queue might be more efficient.
202 typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap;
203 TaskMap ready_to_run_tasks_;
204
205 // This set contains all currently running tasks.
206 ScheduledTaskMap running_tasks_;
207
208 // Completed tasks not yet collected by origin thread.
123 TaskDeque completed_tasks_; 209 TaskDeque completed_tasks_;
124 210
125 ScopedPtrDeque<base::DelegateSimpleThread> workers_; 211 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
126 212
127 DISALLOW_COPY_AND_ASSIGN(Inner); 213 DISALLOW_COPY_AND_ASSIGN(Inner);
128 }; 214 };
129 215
130 WorkerPool::Inner::Inner(WorkerPool* worker_pool, 216 WorkerPool::Inner::Inner(WorkerPool* worker_pool,
131 size_t num_threads, 217 size_t num_threads,
132 const std::string& thread_name_prefix) 218 const std::string& thread_name_prefix)
133 : worker_pool_on_origin_thread_(worker_pool), 219 : worker_pool_on_origin_thread_(worker_pool),
134 lock_(), 220 lock_(),
135 has_pending_tasks_cv_(&lock_), 221 has_ready_to_run_tasks_cv_(&lock_),
136 origin_loop_(base::MessageLoopProxy::current()), 222 origin_loop_(base::MessageLoopProxy::current()),
137 weak_ptr_factory_(this), 223 weak_ptr_factory_(this),
138 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, 224 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
139 weak_ptr_factory_.GetWeakPtr())), 225 weak_ptr_factory_.GetWeakPtr())),
140 on_idle_pending_(false), 226 on_idle_pending_(false),
141 next_thread_index_(0), 227 next_thread_index_(0),
142 running_task_count_(0),
143 shutdown_(false) { 228 shutdown_(false) {
144 base::AutoLock lock(lock_); 229 base::AutoLock lock(lock_);
145 230
146 while (workers_.size() < num_threads) { 231 while (workers_.size() < num_threads) {
147 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 232 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
148 new base::DelegateSimpleThread( 233 new base::DelegateSimpleThread(
149 this, 234 this,
150 thread_name_prefix + 235 thread_name_prefix +
151 base::StringPrintf( 236 base::StringPrintf(
152 "Worker%u", 237 "Worker%u",
153 static_cast<unsigned>(workers_.size() + 1)).c_str())); 238 static_cast<unsigned>(workers_.size() + 1)).c_str()));
154 worker->Start(); 239 worker->Start();
155 workers_.push_back(worker.Pass()); 240 workers_.push_back(worker.Pass());
156 } 241 }
157 } 242 }
158 243
159 WorkerPool::Inner::~Inner() { 244 WorkerPool::Inner::~Inner() {
160 base::AutoLock lock(lock_); 245 base::AutoLock lock(lock_);
161 246
162 DCHECK(shutdown_); 247 DCHECK(shutdown_);
163 248
164 // Cancel all pending callbacks.
165 weak_ptr_factory_.InvalidateWeakPtrs();
166
167 DCHECK_EQ(0u, pending_tasks_.size()); 249 DCHECK_EQ(0u, pending_tasks_.size());
250 DCHECK_EQ(0u, ready_to_run_tasks_.size());
251 DCHECK_EQ(0u, running_tasks_.size());
168 DCHECK_EQ(0u, completed_tasks_.size()); 252 DCHECK_EQ(0u, completed_tasks_.size());
169 DCHECK_EQ(0u, running_task_count_);
170 } 253 }
171 254
172 void WorkerPool::Inner::Shutdown() { 255 void WorkerPool::Inner::Shutdown() {
173 { 256 {
174 base::AutoLock lock(lock_); 257 base::AutoLock lock(lock_);
175 258
176 DCHECK(!shutdown_); 259 DCHECK(!shutdown_);
177 shutdown_ = true; 260 shutdown_ = true;
178 261
179 // Wake up a worker so it knows it should exit. This will cause all workers 262 // Wake up a worker so it knows it should exit. This will cause all workers
180 // to exit as each will wake up another worker before exiting. 263 // to exit as each will wake up another worker before exiting.
181 has_pending_tasks_cv_.Signal(); 264 has_ready_to_run_tasks_cv_.Signal();
182 } 265 }
183 266
184 while (workers_.size()) { 267 while (workers_.size()) {
185 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 268 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
186 // http://crbug.com/240453 - Join() is considered IO and will block this 269 // http://crbug.com/240453 - Join() is considered IO and will block this
187 // thread. See also http://crbug.com/239423 for further ideas. 270 // thread. See also http://crbug.com/239423 for further ideas.
188 base::ThreadRestrictions::ScopedAllowIO allow_io; 271 base::ThreadRestrictions::ScopedAllowIO allow_io;
189 worker->Join(); 272 worker->Join();
190 } 273 }
274
275 // Cancel any pending OnIdle callback.
276 weak_ptr_factory_.InvalidateWeakPtrs();
191 } 277 }
192 278
193 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { 279 void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) {
280 // It is OK to call ScheduleTasks() after shutdown if |root| is NULL.
281 DCHECK(!root || !shutdown_);
282
283 ScheduledTaskMap new_pending_tasks;
284 if (root)
285 BuildScheduledTaskMap(root, &new_pending_tasks);
286
287 scoped_refptr<internal::WorkerPoolTask> new_root(root);
288
289 {
290 base::AutoLock lock(lock_);
291
292 // First remove all completed tasks from |new_pending_tasks|.
293 for (TaskDeque::iterator it = completed_tasks_.begin();
294 it != completed_tasks_.end(); ++it) {
295 internal::WorkerPoolTask* task = *it;
296 new_pending_tasks.take_and_erase(task);
297 }
298
299 // Move tasks not present in |new_pending_tasks| to |completed_tasks_|.
300 for (ScheduledTaskMap::iterator it = pending_tasks_.begin();
301 it != pending_tasks_.end(); ++it) {
302 internal::WorkerPoolTask* task = it->first;
303
304 // Task has completed if not present in |new_pending_tasks|.
305 if (!new_pending_tasks.contains(task))
306 completed_tasks_.push_back(task);
307 }
308
309 // Build new running task set.
310 ScheduledTaskMap new_running_tasks;
311 for (ScheduledTaskMap::iterator it = running_tasks_.begin();
312 it != running_tasks_.end(); ++it) {
313 internal::WorkerPoolTask* task = it->first;
314 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
vmpstr 2013/05/24 17:01:37 Are currently running tasks guaranteed to be in th
reveman 2013/05/24 18:53:48 No, but we need to set the ScheduledTask value to
315 }
316
317 // Swap root and task sets.
318 // Note: old tasks are intentionally destroyed after releasing |lock_|.
319 root_.swap(new_root);
320 pending_tasks_.swap(new_pending_tasks);
321 running_tasks_.swap(new_running_tasks);
322
323 // Rebuild |ready_to_run_tasks_| before letting.
324 ready_to_run_tasks_.clear();
325 for (ScheduledTaskMap::iterator it = pending_tasks_.begin();
326 it != pending_tasks_.end(); ++it) {
327 internal::WorkerPoolTask* task = it->first;
328
329 // Completed tasks should not exist in |pending_tasks_|.
330 DCHECK(!task->HasFinishedRunning());
331
332 // Call DidSchedule() to indicate that this task has been scheduled.
333 // Note: This is only for debugging purposes.
334 task->DidSchedule();
335
336 DCHECK_EQ(0u, ready_to_run_tasks_.count(it->second->priority()));
337 if (task->IsReadyToRun())
338 ready_to_run_tasks_[it->second->priority()] = task;
339 }
340
341 // If there is more work available, wake up worker thread.
342 if (!ready_to_run_tasks_.empty())
343 has_ready_to_run_tasks_cv_.Signal();
344 }
345 }
346
347 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) {
194 base::AutoLock lock(lock_); 348 base::AutoLock lock(lock_);
195 349
196 pending_tasks_.push_back(task.Pass()); 350 return CollectCompletedTasksWithLockAcquired(completed_tasks);
197
198 // There is more work available, so wake up worker thread.
199 has_pending_tasks_cv_.Signal();
200 } 351 }
201 352
202 bool WorkerPool::Inner::CollectCompletedTasks() { 353 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired(
203 base::AutoLock lock(lock_); 354 TaskDeque* completed_tasks) {
204
205 return AppendCompletedTasksWithLockAcquired(
206 &worker_pool_on_origin_thread_->completed_tasks_);
207 }
208
209 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
210 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
211 lock_.AssertAcquired(); 355 lock_.AssertAcquired();
212 356
213 while (completed_tasks_.size()) 357 DCHECK_EQ(0u, completed_tasks->size());
214 completed_tasks->push_back(completed_tasks_.take_front().Pass()); 358 completed_tasks->swap(completed_tasks_);
215 359
216 return !running_task_count_ && pending_tasks_.empty(); 360 return running_tasks_.empty() && pending_tasks_.empty();
217 } 361 }
218 362
219 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { 363 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
220 lock_.AssertAcquired(); 364 lock_.AssertAcquired();
221 365
222 if (on_idle_pending_) 366 if (on_idle_pending_)
223 return; 367 return;
224 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); 368 origin_loop_->PostTask(FROM_HERE, on_idle_callback_);
225 on_idle_pending_ = true; 369 on_idle_pending_ = true;
226 } 370 }
227 371
228 void WorkerPool::Inner::OnIdleOnOriginThread() { 372 void WorkerPool::Inner::OnIdleOnOriginThread() {
373 TaskDeque completed_tasks;
374
229 { 375 {
230 base::AutoLock lock(lock_); 376 base::AutoLock lock(lock_);
231 377
232 DCHECK(on_idle_pending_); 378 DCHECK(on_idle_pending_);
233 on_idle_pending_ = false; 379 on_idle_pending_ = false;
234 380
235 // Early out if no longer idle. 381 // Early out if no longer idle.
236 if (running_task_count_ || !pending_tasks_.empty()) 382 if (!running_tasks_.empty() || !pending_tasks_.empty())
237 return; 383 return;
238 384
239 AppendCompletedTasksWithLockAcquired( 385 CollectCompletedTasksWithLockAcquired(&completed_tasks);
240 &worker_pool_on_origin_thread_->completed_tasks_);
241 } 386 }
242 387
243 worker_pool_on_origin_thread_->OnIdle(); 388 worker_pool_on_origin_thread_->OnIdle(&completed_tasks);
244 } 389 }
245 390
246 void WorkerPool::Inner::Run() { 391 void WorkerPool::Inner::Run() {
247 #if defined(OS_ANDROID) 392 #if defined(OS_ANDROID)
248 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) 393 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
249 int nice_value = 10; // Idle priority. 394 int nice_value = 10; // Idle priority.
250 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); 395 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value);
251 #endif 396 #endif
252 397
253 base::AutoLock lock(lock_); 398 base::AutoLock lock(lock_);
254 399
255 // Get a unique thread index. 400 // Get a unique thread index.
256 int thread_index = next_thread_index_++; 401 int thread_index = next_thread_index_++;
257 402
258 while (true) { 403 while (true) {
259 if (pending_tasks_.empty()) { 404 if (ready_to_run_tasks_.empty()) {
260 // Exit when shutdown is set and no more tasks are pending. 405 if (pending_tasks_.empty()) {
261 if (shutdown_) 406 // Exit when shutdown is set and no more tasks are pending.
262 break; 407 if (shutdown_)
408 break;
263 409
264 // Schedule an idle callback if requested and not pending. 410 // Schedule an idle callback if no tasks are running.
265 if (!running_task_count_) 411 if (running_tasks_.empty())
266 ScheduleOnIdleWithLockAcquired(); 412 ScheduleOnIdleWithLockAcquired();
413 }
267 414
268 // Wait for new pending tasks. 415 // Wait for more tasks.
269 has_pending_tasks_cv_.Wait(); 416 has_ready_to_run_tasks_cv_.Wait();
270 continue; 417 continue;
271 } 418 }
272 419
273 // Get next task. 420 // Take top priority task from |ready_to_run_tasks_|.
274 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); 421 scoped_refptr<internal::WorkerPoolTask> task(
422 ready_to_run_tasks_.begin()->second);
423 ready_to_run_tasks_.erase(ready_to_run_tasks_.begin());
275 424
276 // Increment |running_task_count_| before starting to run task. 425 // Move task from |pending_tasks_| to |running_tasks_|.
277 running_task_count_++; 426 DCHECK(pending_tasks_.contains(task));
427 DCHECK(!running_tasks_.contains(task));
428 running_tasks_.set(task, pending_tasks_.take_and_erase(task));
278 429
279 // There may be more work available, so wake up another 430 // There may be more work available, so wake up another worker thread.
280 // worker thread. 431 has_ready_to_run_tasks_cv_.Signal();
281 has_pending_tasks_cv_.Signal(); 432
433 // Call WillRun() before releasing |lock_| and running task.
434 task->WillRun();
282 435
283 { 436 {
284 base::AutoUnlock unlock(lock_); 437 base::AutoUnlock unlock(lock_);
285 438
286 task->RunOnThread(thread_index); 439 task->RunOnThread(thread_index);
287 } 440 }
288 441
289 completed_tasks_.push_back(task.Pass()); 442 // This will mark task as finished running.
443 task->DidRun();
290 444
291 // Decrement |running_task_count_| now that we are done running task. 445 // Now iterate over all dependents to check if they are ready to run.
292 running_task_count_--; 446 scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase(
447 task);
448 if (scheduled_task) {
449 typedef internal::WorkerPoolTask::TaskVector TaskVector;
450 for (TaskVector::iterator it = scheduled_task->dependents().begin();
451 it != scheduled_task->dependents().end(); ++it) {
452 internal::WorkerPoolTask* dependent = *it;
453 if (!dependent->IsReadyToRun())
454 continue;
455
456 // Task is ready. Add it to |ready_to_run_tasks_|.
457 DCHECK(pending_tasks_.contains(dependent));
458 unsigned priority = pending_tasks_.get(dependent)->priority();
459 DCHECK(!ready_to_run_tasks_.count(priority) ||
460 ready_to_run_tasks_[priority] == dependent);
461 ready_to_run_tasks_[priority] = dependent;
462 }
463 }
464
465 // Finally add task to |completed_tasks_|.
466 completed_tasks_.push_back(task);
293 } 467 }
294 468
295 // We noticed we should exit. Wake up the next worker so it knows it should 469 // We noticed we should exit. Wake up the next worker so it knows it should
296 // exit as well (because the Shutdown() code only signals once). 470 // exit as well (because the Shutdown() code only signals once).
297 has_pending_tasks_cv_.Signal(); 471 has_ready_to_run_tasks_cv_.Signal();
472 }
473
474 // static
475 unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive(
vmpstr 2013/05/24 17:01:37 Can you add a few comments in this function to jus
reveman 2013/05/24 18:53:48 Added some comments and an example graph to latest
476 internal::WorkerPoolTask* task,
477 internal::WorkerPoolTask* dependent,
478 unsigned priority,
479 ScheduledTaskMap* scheduled_tasks) {
480 if (task->HasCompleted())
481 return priority;
482
483 ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task);
484 if (scheduled_it != scheduled_tasks->end()) {
485 DCHECK(dependent);
486 scheduled_it->second->dependents().push_back(dependent);
487 return priority;
488 }
489
490 typedef internal::WorkerPoolTask::TaskVector TaskVector;
491 for (TaskVector::iterator it = task->dependencies().begin();
492 it != task->dependencies().end(); ++it) {
493 internal::WorkerPoolTask* dependency = *it;
494 priority = BuildScheduledTaskMapRecursive(
495 dependency, task, priority, scheduled_tasks);
496 }
497
498 scheduled_tasks->set(task,
499 make_scoped_ptr(new ScheduledTask(dependent,
500 priority)));
501
502 return priority + 1;
503 }
504
505 // static
506 void WorkerPool::Inner::BuildScheduledTaskMap(
507 internal::WorkerPoolTask* root,
508 ScheduledTaskMap* scheduled_tasks) {
509 const unsigned kBasePriority = 0u;
510 DCHECK(root);
511 BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks);
298 } 512 }
299 513
300 WorkerPool::WorkerPool(size_t num_threads, 514 WorkerPool::WorkerPool(size_t num_threads,
301 base::TimeDelta check_for_completed_tasks_delay, 515 base::TimeDelta check_for_completed_tasks_delay,
302 const std::string& thread_name_prefix) 516 const std::string& thread_name_prefix)
303 : client_(NULL), 517 : client_(NULL),
304 origin_loop_(base::MessageLoopProxy::current()), 518 origin_loop_(base::MessageLoopProxy::current()),
305 weak_ptr_factory_(this),
306 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), 519 check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
307 check_for_completed_tasks_pending_(false), 520 check_for_completed_tasks_pending_(false),
308 inner_(make_scoped_ptr(new Inner(this, 521 inner_(make_scoped_ptr(new Inner(this,
309 num_threads, 522 num_threads,
310 thread_name_prefix))) { 523 thread_name_prefix))) {
311 } 524 }
312 525
313 WorkerPool::~WorkerPool() { 526 WorkerPool::~WorkerPool() {
314 // Cancel all pending callbacks.
315 weak_ptr_factory_.InvalidateWeakPtrs();
316
317 DCHECK_EQ(0u, completed_tasks_.size());
318 } 527 }
319 528
320 void WorkerPool::Shutdown() { 529 void WorkerPool::Shutdown() {
321 inner_->Shutdown(); 530 inner_->Shutdown();
322 inner_->CollectCompletedTasks(); 531
323 DispatchCompletionCallbacks(); 532 TaskDeque completed_tasks;
533 inner_->CollectCompletedTasks(&completed_tasks);
534 DispatchCompletionCallbacks(&completed_tasks);
324 } 535 }
325 536
326 void WorkerPool::PostTaskAndReply( 537 void WorkerPool::OnIdle(TaskDeque* completed_tasks) {
327 const Callback& task, const base::Closure& reply) {
328 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
329 task,
330 reply)).PassAs<internal::WorkerPoolTask>());
331 }
332
333 void WorkerPool::OnIdle() {
334 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); 538 TRACE_EVENT0("cc", "WorkerPool::OnIdle");
335 539
336 DispatchCompletionCallbacks(); 540 DispatchCompletionCallbacks(completed_tasks);
541
542 // Cancel any pending check for completed tasks.
543 check_for_completed_tasks_callback_.Cancel();
544 check_for_completed_tasks_pending_ = false;
337 } 545 }
338 546
339 void WorkerPool::ScheduleCheckForCompletedTasks() { 547 void WorkerPool::ScheduleCheckForCompletedTasks() {
340 if (check_for_completed_tasks_pending_) 548 if (check_for_completed_tasks_pending_)
341 return; 549 return;
550 check_for_completed_tasks_callback_.Reset(
551 base::Bind(&WorkerPool::CheckForCompletedTasks,
552 base::Unretained(this)));
342 origin_loop_->PostDelayedTask( 553 origin_loop_->PostDelayedTask(
343 FROM_HERE, 554 FROM_HERE,
344 base::Bind(&WorkerPool::CheckForCompletedTasks, 555 check_for_completed_tasks_callback_.callback(),
345 weak_ptr_factory_.GetWeakPtr()),
346 check_for_completed_tasks_delay_); 556 check_for_completed_tasks_delay_);
347 check_for_completed_tasks_pending_ = true; 557 check_for_completed_tasks_pending_ = true;
348 } 558 }
349 559
350 void WorkerPool::CheckForCompletedTasks() { 560 void WorkerPool::CheckForCompletedTasks() {
351 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); 561 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
352 DCHECK(check_for_completed_tasks_pending_); 562 check_for_completed_tasks_callback_.Cancel();
353 check_for_completed_tasks_pending_ = false; 563 check_for_completed_tasks_pending_ = false;
354 564
565 TaskDeque completed_tasks;
566
355 // Schedule another check for completed tasks if not idle. 567 // Schedule another check for completed tasks if not idle.
356 if (!inner_->CollectCompletedTasks()) 568 if (!inner_->CollectCompletedTasks(&completed_tasks))
357 ScheduleCheckForCompletedTasks(); 569 ScheduleCheckForCompletedTasks();
358 570
359 DispatchCompletionCallbacks(); 571 DispatchCompletionCallbacks(&completed_tasks);
360 } 572 }
361 573
362 void WorkerPool::DispatchCompletionCallbacks() { 574 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) {
363 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); 575 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
364 576
365 if (completed_tasks_.empty()) 577 // Early out when |completed_tasks| is empty to prevent unnecessary
578 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
579 if (completed_tasks->empty())
366 return; 580 return;
367 581
368 while (completed_tasks_.size()) { 582 while (!completed_tasks->empty()) {
369 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); 583 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front();
584 completed_tasks->pop_front();
370 task->DidComplete(); 585 task->DidComplete();
586 task->DispatchCompletionCallback();
371 } 587 }
372 588
373 DCHECK(client_); 589 DCHECK(client_);
374 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); 590 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
375 } 591 }
376 592
377 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { 593 void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) {
378 // Schedule check for completed tasks if not pending. 594 TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks");
379 ScheduleCheckForCompletedTasks();
380 595
381 inner_->PostTask(task.Pass()); 596 // Schedule check for completed tasks.
597 if (root)
598 ScheduleCheckForCompletedTasks();
599
600 inner_->ScheduleTasks(root);
382 } 601 }
383 602
384 } // namespace cc 603 } // namespace cc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698