Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(136)

Side by Side Diff: trunk/src/cc/base/worker_pool.cc

Issue 16178002: Revert 202363 "cc: Cancel and re-prioritize worker pool tasks." (Closed) Base URL: svn://svn.chromium.org/chrome/
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « trunk/src/cc/base/worker_pool.h ('k') | trunk/src/cc/base/worker_pool_perftest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "cc/base/worker_pool.h" 5 #include "cc/base/worker_pool.h"
6 6
7 #if defined(OS_ANDROID) 7 #include <algorithm>
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549)
9 #include <sys/resource.h>
10 #endif
11
12 #include <map>
13 8
14 #include "base/bind.h" 9 #include "base/bind.h"
15 #include "base/debug/trace_event.h" 10 #include "base/debug/trace_event.h"
16 #include "base/hash_tables.h"
17 #include "base/stringprintf.h" 11 #include "base/stringprintf.h"
12 #include "base/synchronization/condition_variable.h"
18 #include "base/threading/simple_thread.h" 13 #include "base/threading/simple_thread.h"
19 #include "base/threading/thread_restrictions.h" 14 #include "base/threading/thread_restrictions.h"
20 #include "cc/base/scoped_ptr_deque.h"
21 #include "cc/base/scoped_ptr_hash_map.h"
22
23 #if defined(COMPILER_GCC)
24 namespace BASE_HASH_NAMESPACE {
25 template <> struct hash<cc::internal::WorkerPoolTask*> {
26 size_t operator()(cc::internal::WorkerPoolTask* ptr) const {
27 return hash<size_t>()(reinterpret_cast<size_t>(ptr));
28 }
29 };
30 } // namespace BASE_HASH_NAMESPACE
31 #endif // COMPILER
32 15
33 namespace cc { 16 namespace cc {
34 17
18 namespace {
19
20 class WorkerPoolTaskImpl : public internal::WorkerPoolTask {
21 public:
22 WorkerPoolTaskImpl(const WorkerPool::Callback& task,
23 const base::Closure& reply)
24 : internal::WorkerPoolTask(reply),
25 task_(task) {}
26
27 virtual void RunOnThread(unsigned thread_index) OVERRIDE {
28 task_.Run();
29 }
30
31 private:
32 WorkerPool::Callback task_;
33 };
34
35 } // namespace
36
35 namespace internal { 37 namespace internal {
36 38
37 WorkerPoolTask::WorkerPoolTask() 39 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) {
38 : did_schedule_(false),
39 did_run_(false),
40 did_complete_(false) {
41 }
42
43 WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies)
44 : did_schedule_(false),
45 did_run_(false),
46 did_complete_(false) {
47 dependencies_.swap(*dependencies);
48 } 40 }
49 41
50 WorkerPoolTask::~WorkerPoolTask() { 42 WorkerPoolTask::~WorkerPoolTask() {
51 DCHECK_EQ(did_schedule_, did_complete_);
52 DCHECK(!did_run_ || did_schedule_);
53 DCHECK(!did_run_ || did_complete_);
54 }
55
56 void WorkerPoolTask::DidSchedule() {
57 DCHECK(!did_complete_);
58 did_schedule_ = true;
59 }
60
61 void WorkerPoolTask::WillRun() {
62 DCHECK(did_schedule_);
63 DCHECK(!did_complete_);
64 DCHECK(!did_run_);
65 }
66
67 void WorkerPoolTask::DidRun() {
68 did_run_ = true;
69 } 43 }
70 44
71 void WorkerPoolTask::DidComplete() { 45 void WorkerPoolTask::DidComplete() {
72 DCHECK(did_schedule_); 46 reply_.Run();
73 DCHECK(!did_complete_);
74 did_complete_ = true;
75 }
76
77 bool WorkerPoolTask::IsReadyToRun() const {
78 // TODO(reveman): Use counter to improve performance.
79 for (TaskVector::const_reverse_iterator it = dependencies_.rbegin();
80 it != dependencies_.rend(); ++it) {
81 WorkerPoolTask* dependency = *it;
82 if (!dependency->HasFinishedRunning())
83 return false;
84 }
85 return true;
86 }
87
88 bool WorkerPoolTask::HasFinishedRunning() const {
89 return did_run_;
90 }
91
92 bool WorkerPoolTask::HasCompleted() const {
93 return did_complete_;
94 } 47 }
95 48
96 } // namespace internal 49 } // namespace internal
97 50
98 // Internal to the worker pool. Any data or logic that needs to be 51 // Internal to the worker pool. Any data or logic that needs to be
99 // shared between threads lives in this class. All members are guarded 52 // shared between threads lives in this class. All members are guarded
100 // by |lock_|. 53 // by |lock_|.
101 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { 54 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
102 public: 55 public:
103 Inner(WorkerPool* worker_pool, 56 Inner(WorkerPool* worker_pool,
104 size_t num_threads, 57 size_t num_threads,
105 const std::string& thread_name_prefix); 58 const std::string& thread_name_prefix);
106 virtual ~Inner(); 59 virtual ~Inner();
107 60
108 void Shutdown(); 61 void Shutdown();
109 62
110 // Schedule running of |root| task and all its dependencies. Tasks 63 void PostTask(scoped_ptr<internal::WorkerPoolTask> task);
111 // previously scheduled but no longer needed to run |root| will be
112 // canceled unless already running. Canceled tasks are moved to
113 // |completed_tasks_| without being run. The result is that once
114 // scheduled, a task is guaranteed to end up in the |completed_tasks_|
115 // queue even if they later get canceled by another call to
116 // ScheduleTasks().
117 void ScheduleTasks(internal::WorkerPoolTask* root);
118 64
119 // Collect all completed tasks in |completed_tasks|. Returns true if idle. 65 // Appends all completed tasks to worker pool's completed tasks queue
120 bool CollectCompletedTasks(TaskDeque* completed_tasks); 66 // and returns true if idle.
67 bool CollectCompletedTasks();
121 68
122 private: 69 private:
123 class ScheduledTask { 70 // Appends all completed tasks to |completed_tasks|. Lock must
124 public: 71 // already be acquired before calling this function.
125 ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority) 72 bool AppendCompletedTasksWithLockAcquired(
126 : priority_(priority) { 73 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks);
127 if (dependent)
128 dependents_.push_back(dependent);
129 }
130
131 internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; }
132 unsigned priority() const { return priority_; }
133
134 private:
135 internal::WorkerPoolTask::TaskVector dependents_;
136 unsigned priority_;
137 };
138 typedef internal::WorkerPoolTask* ScheduledTaskMapKey;
139 typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask>
140 ScheduledTaskMap;
141
142 // This builds a ScheduledTaskMap from a root task.
143 static unsigned BuildScheduledTaskMapRecursive(
144 internal::WorkerPoolTask* task,
145 internal::WorkerPoolTask* dependent,
146 unsigned priority,
147 ScheduledTaskMap* scheduled_tasks);
148 static void BuildScheduledTaskMap(
149 internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks);
150
151 // Collect all completed tasks by swapping the contents of
152 // |completed_tasks| and |completed_tasks_|. Lock must be acquired
153 // before calling this function. Returns true if idle.
154 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks);
155 74
156 // Schedule an OnIdleOnOriginThread callback if not already pending. 75 // Schedule an OnIdleOnOriginThread callback if not already pending.
157 // Lock must already be acquired before calling this function. 76 // Lock must already be acquired before calling this function.
158 void ScheduleOnIdleWithLockAcquired(); 77 void ScheduleOnIdleWithLockAcquired();
159 void OnIdleOnOriginThread(); 78 void OnIdleOnOriginThread();
160 79
161 // Overridden from base::DelegateSimpleThread: 80 // Overridden from base::DelegateSimpleThread:
162 virtual void Run() OVERRIDE; 81 virtual void Run() OVERRIDE;
163 82
164 // Pointer to worker pool. Can only be used on origin thread. 83 // Pointer to worker pool. Can only be used on origin thread.
165 // Not guarded by |lock_|. 84 // Not guarded by |lock_|.
166 WorkerPool* worker_pool_on_origin_thread_; 85 WorkerPool* worker_pool_on_origin_thread_;
167 86
168 // This lock protects all members of this class except 87 // This lock protects all members of this class except
169 // |worker_pool_on_origin_thread_|. Do not read or modify anything 88 // |worker_pool_on_origin_thread_|. Do not read or modify anything
170 // without holding this lock. Do not block while holding this lock. 89 // without holding this lock. Do not block while holding this lock.
171 mutable base::Lock lock_; 90 mutable base::Lock lock_;
172 91
173 // Condition variable that is waited on by worker threads until new 92 // Condition variable that is waited on by worker threads until new
174 // tasks are ready to run or shutdown starts. 93 // tasks are posted or shutdown starts.
175 base::ConditionVariable has_ready_to_run_tasks_cv_; 94 base::ConditionVariable has_pending_tasks_cv_;
176 95
177 // Target message loop used for posting callbacks. 96 // Target message loop used for posting callbacks.
178 scoped_refptr<base::MessageLoopProxy> origin_loop_; 97 scoped_refptr<base::MessageLoopProxy> origin_loop_;
179 98
180 base::WeakPtrFactory<Inner> weak_ptr_factory_; 99 base::WeakPtrFactory<Inner> weak_ptr_factory_;
181 100
182 const base::Closure on_idle_callback_; 101 const base::Closure on_idle_callback_;
183 // Set when a OnIdleOnOriginThread() callback is pending. 102 // Set when a OnIdleOnOriginThread() callback is pending.
184 bool on_idle_pending_; 103 bool on_idle_pending_;
185 104
186 // Provides each running thread loop with a unique index. First thread 105 // Provides each running thread loop with a unique index. First thread
187 // loop index is 0. 106 // loop index is 0.
188 unsigned next_thread_index_; 107 unsigned next_thread_index_;
189 108
109 // Number of tasks currently running.
110 unsigned running_task_count_;
111
190 // Set during shutdown. Tells workers to exit when no more tasks 112 // Set during shutdown. Tells workers to exit when no more tasks
191 // are pending. 113 // are pending.
192 bool shutdown_; 114 bool shutdown_;
193 115
194 // The root task that is a dependent of all other tasks. 116 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque;
195 scoped_refptr<internal::WorkerPoolTask> root_; 117 TaskDeque pending_tasks_;
196
197 // This set contains all pending tasks.
198 ScheduledTaskMap pending_tasks_;
199
200 // Ordered set of tasks that are ready to run.
201 // TODO(reveman): priority_queue might be more efficient.
202 typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap;
203 TaskMap ready_to_run_tasks_;
204
205 // This set contains all currently running tasks.
206 ScheduledTaskMap running_tasks_;
207
208 // Completed tasks not yet collected by origin thread.
209 TaskDeque completed_tasks_; 118 TaskDeque completed_tasks_;
210 119
211 ScopedPtrDeque<base::DelegateSimpleThread> workers_; 120 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
212 121
213 DISALLOW_COPY_AND_ASSIGN(Inner); 122 DISALLOW_COPY_AND_ASSIGN(Inner);
214 }; 123 };
215 124
216 WorkerPool::Inner::Inner(WorkerPool* worker_pool, 125 WorkerPool::Inner::Inner(WorkerPool* worker_pool,
217 size_t num_threads, 126 size_t num_threads,
218 const std::string& thread_name_prefix) 127 const std::string& thread_name_prefix)
219 : worker_pool_on_origin_thread_(worker_pool), 128 : worker_pool_on_origin_thread_(worker_pool),
220 lock_(), 129 lock_(),
221 has_ready_to_run_tasks_cv_(&lock_), 130 has_pending_tasks_cv_(&lock_),
222 origin_loop_(base::MessageLoopProxy::current()), 131 origin_loop_(base::MessageLoopProxy::current()),
223 weak_ptr_factory_(this), 132 weak_ptr_factory_(this),
224 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, 133 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread,
225 weak_ptr_factory_.GetWeakPtr())), 134 weak_ptr_factory_.GetWeakPtr())),
226 on_idle_pending_(false), 135 on_idle_pending_(false),
227 next_thread_index_(0), 136 next_thread_index_(0),
137 running_task_count_(0),
228 shutdown_(false) { 138 shutdown_(false) {
229 base::AutoLock lock(lock_); 139 base::AutoLock lock(lock_);
230 140
231 while (workers_.size() < num_threads) { 141 while (workers_.size() < num_threads) {
232 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 142 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
233 new base::DelegateSimpleThread( 143 new base::DelegateSimpleThread(
234 this, 144 this,
235 thread_name_prefix + 145 thread_name_prefix +
236 base::StringPrintf( 146 base::StringPrintf(
237 "Worker%u", 147 "Worker%u",
238 static_cast<unsigned>(workers_.size() + 1)).c_str())); 148 static_cast<unsigned>(workers_.size() + 1)).c_str()));
239 worker->Start(); 149 worker->Start();
240 workers_.push_back(worker.Pass()); 150 workers_.push_back(worker.Pass());
241 } 151 }
242 } 152 }
243 153
244 WorkerPool::Inner::~Inner() { 154 WorkerPool::Inner::~Inner() {
245 base::AutoLock lock(lock_); 155 base::AutoLock lock(lock_);
246 156
247 DCHECK(shutdown_); 157 DCHECK(shutdown_);
248 158
159 // Cancel all pending callbacks.
160 weak_ptr_factory_.InvalidateWeakPtrs();
161
249 DCHECK_EQ(0u, pending_tasks_.size()); 162 DCHECK_EQ(0u, pending_tasks_.size());
250 DCHECK_EQ(0u, ready_to_run_tasks_.size());
251 DCHECK_EQ(0u, running_tasks_.size());
252 DCHECK_EQ(0u, completed_tasks_.size()); 163 DCHECK_EQ(0u, completed_tasks_.size());
164 DCHECK_EQ(0u, running_task_count_);
253 } 165 }
254 166
255 void WorkerPool::Inner::Shutdown() { 167 void WorkerPool::Inner::Shutdown() {
256 { 168 {
257 base::AutoLock lock(lock_); 169 base::AutoLock lock(lock_);
258 170
259 DCHECK(!shutdown_); 171 DCHECK(!shutdown_);
260 shutdown_ = true; 172 shutdown_ = true;
261 173
262 // Wake up a worker so it knows it should exit. This will cause all workers 174 // Wake up a worker so it knows it should exit. This will cause all workers
263 // to exit as each will wake up another worker before exiting. 175 // to exit as each will wake up another worker before exiting.
264 has_ready_to_run_tasks_cv_.Signal(); 176 has_pending_tasks_cv_.Signal();
265 } 177 }
266 178
267 while (workers_.size()) { 179 while (workers_.size()) {
268 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 180 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
269 // http://crbug.com/240453 - Join() is considered IO and will block this 181 // http://crbug.com/240453 - Join() is considered IO and will block this
270 // thread. See also http://crbug.com/239423 for further ideas. 182 // thread. See also http://crbug.com/239423 for further ideas.
271 base::ThreadRestrictions::ScopedAllowIO allow_io; 183 base::ThreadRestrictions::ScopedAllowIO allow_io;
272 worker->Join(); 184 worker->Join();
273 } 185 }
274
275 // Cancel any pending OnIdle callback.
276 weak_ptr_factory_.InvalidateWeakPtrs();
277 } 186 }
278 187
279 void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) { 188 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
280 // It is OK to call ScheduleTasks() after shutdown if |root| is NULL. 189 base::AutoLock lock(lock_);
281 DCHECK(!root || !shutdown_);
282 190
283 scoped_refptr<internal::WorkerPoolTask> new_root(root); 191 pending_tasks_.push_back(task.Pass());
284 192
285 ScheduledTaskMap new_pending_tasks; 193 // There is more work available, so wake up worker thread.
286 ScheduledTaskMap new_running_tasks; 194 has_pending_tasks_cv_.Signal();
287 TaskMap new_ready_to_run_tasks;
288
289 // Build scheduled task map before acquiring |lock_|.
290 if (root)
291 BuildScheduledTaskMap(root, &new_pending_tasks);
292
293 {
294 base::AutoLock lock(lock_);
295
296 // First remove all completed tasks from |new_pending_tasks|.
297 for (TaskDeque::iterator it = completed_tasks_.begin();
298 it != completed_tasks_.end(); ++it) {
299 internal::WorkerPoolTask* task = *it;
300 new_pending_tasks.take_and_erase(task);
301 }
302
303 // Move tasks not present in |new_pending_tasks| to |completed_tasks_|.
304 for (ScheduledTaskMap::iterator it = pending_tasks_.begin();
305 it != pending_tasks_.end(); ++it) {
306 internal::WorkerPoolTask* task = it->first;
307
308 // Task has completed if not present in |new_pending_tasks|.
309 if (!new_pending_tasks.contains(task))
310 completed_tasks_.push_back(task);
311 }
312
313 // Build new running task set.
314 for (ScheduledTaskMap::iterator it = running_tasks_.begin();
315 it != running_tasks_.end(); ++it) {
316 internal::WorkerPoolTask* task = it->first;
317 // Transfer scheduled task value from |new_pending_tasks| to
318 // |new_running_tasks| if currently running. Value must be set to
319 // NULL if |new_pending_tasks| doesn't contain task. This does
320 // the right in both cases.
321 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
322 }
323
324 // Build new "ready to run" tasks queue.
325 for (ScheduledTaskMap::iterator it = new_pending_tasks.begin();
326 it != new_pending_tasks.end(); ++it) {
327 internal::WorkerPoolTask* task = it->first;
328
329 // Completed tasks should not exist in |new_pending_tasks_|.
330 DCHECK(!task->HasFinishedRunning());
331
332 // Call DidSchedule() to indicate that this task has been scheduled.
333 // Note: This is only for debugging purposes.
334 task->DidSchedule();
335
336 DCHECK_EQ(0u, new_ready_to_run_tasks.count(it->second->priority()));
337 if (task->IsReadyToRun())
338 new_ready_to_run_tasks[it->second->priority()] = task;
339 }
340
341 // Swap root taskand task sets.
342 // Note: old tasks are intentionally destroyed after releasing |lock_|.
343 root_.swap(new_root);
344 pending_tasks_.swap(new_pending_tasks);
345 running_tasks_.swap(new_running_tasks);
346 ready_to_run_tasks_.swap(new_ready_to_run_tasks);
347
348 // If there is more work available, wake up worker thread.
349 if (!ready_to_run_tasks_.empty())
350 has_ready_to_run_tasks_cv_.Signal();
351 }
352 } 195 }
353 196
354 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { 197 bool WorkerPool::Inner::CollectCompletedTasks() {
355 base::AutoLock lock(lock_); 198 base::AutoLock lock(lock_);
356 199
357 return CollectCompletedTasksWithLockAcquired(completed_tasks); 200 return AppendCompletedTasksWithLockAcquired(
201 &worker_pool_on_origin_thread_->completed_tasks_);
358 } 202 }
359 203
360 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( 204 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired(
361 TaskDeque* completed_tasks) { 205 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) {
362 lock_.AssertAcquired(); 206 lock_.AssertAcquired();
363 207
364 DCHECK_EQ(0u, completed_tasks->size()); 208 while (completed_tasks_.size())
365 completed_tasks->swap(completed_tasks_); 209 completed_tasks->push_back(completed_tasks_.take_front().Pass());
366 210
367 return running_tasks_.empty() && pending_tasks_.empty(); 211 return !running_task_count_ && pending_tasks_.empty();
368 } 212 }
369 213
370 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { 214 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() {
371 lock_.AssertAcquired(); 215 lock_.AssertAcquired();
372 216
373 if (on_idle_pending_) 217 if (on_idle_pending_)
374 return; 218 return;
375 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); 219 origin_loop_->PostTask(FROM_HERE, on_idle_callback_);
376 on_idle_pending_ = true; 220 on_idle_pending_ = true;
377 } 221 }
378 222
379 void WorkerPool::Inner::OnIdleOnOriginThread() { 223 void WorkerPool::Inner::OnIdleOnOriginThread() {
380 TaskDeque completed_tasks;
381
382 { 224 {
383 base::AutoLock lock(lock_); 225 base::AutoLock lock(lock_);
384 226
385 DCHECK(on_idle_pending_); 227 DCHECK(on_idle_pending_);
386 on_idle_pending_ = false; 228 on_idle_pending_ = false;
387 229
388 // Early out if no longer idle. 230 // Early out if no longer idle.
389 if (!running_tasks_.empty() || !pending_tasks_.empty()) 231 if (running_task_count_ || !pending_tasks_.empty())
390 return; 232 return;
391 233
392 CollectCompletedTasksWithLockAcquired(&completed_tasks); 234 AppendCompletedTasksWithLockAcquired(
235 &worker_pool_on_origin_thread_->completed_tasks_);
393 } 236 }
394 237
395 worker_pool_on_origin_thread_->OnIdle(&completed_tasks); 238 worker_pool_on_origin_thread_->OnIdle();
396 } 239 }
397 240
398 void WorkerPool::Inner::Run() { 241 void WorkerPool::Inner::Run() {
399 #if defined(OS_ANDROID) 242 #if defined(OS_ANDROID)
400 base::PlatformThread::SetThreadPriority( 243 base::PlatformThread::SetThreadPriority(
401 base::PlatformThread::CurrentHandle(), 244 base::PlatformThread::CurrentHandle(),
402 base::kThreadPriority_Background); 245 base::kThreadPriority_Background);
403 #endif 246 #endif
404 247
405 base::AutoLock lock(lock_); 248 base::AutoLock lock(lock_);
406 249
407 // Get a unique thread index. 250 // Get a unique thread index.
408 int thread_index = next_thread_index_++; 251 int thread_index = next_thread_index_++;
409 252
410 while (true) { 253 while (true) {
411 if (ready_to_run_tasks_.empty()) { 254 if (pending_tasks_.empty()) {
412 if (pending_tasks_.empty()) { 255 // Exit when shutdown is set and no more tasks are pending.
413 // Exit when shutdown is set and no more tasks are pending. 256 if (shutdown_)
414 if (shutdown_) 257 break;
415 break;
416 258
417 // Schedule an idle callback if no tasks are running. 259 // Schedule an idle callback if requested and not pending.
418 if (running_tasks_.empty()) 260 if (!running_task_count_)
419 ScheduleOnIdleWithLockAcquired(); 261 ScheduleOnIdleWithLockAcquired();
420 }
421 262
422 // Wait for more tasks. 263 // Wait for new pending tasks.
423 has_ready_to_run_tasks_cv_.Wait(); 264 has_pending_tasks_cv_.Wait();
424 continue; 265 continue;
425 } 266 }
426 267
427 // Take top priority task from |ready_to_run_tasks_|. 268 // Get next task.
428 scoped_refptr<internal::WorkerPoolTask> task( 269 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front();
429 ready_to_run_tasks_.begin()->second);
430 ready_to_run_tasks_.erase(ready_to_run_tasks_.begin());
431 270
432 // Move task from |pending_tasks_| to |running_tasks_|. 271 // Increment |running_task_count_| before starting to run task.
433 DCHECK(pending_tasks_.contains(task)); 272 running_task_count_++;
434 DCHECK(!running_tasks_.contains(task));
435 running_tasks_.set(task, pending_tasks_.take_and_erase(task));
436 273
437 // There may be more work available, so wake up another worker thread. 274 // There may be more work available, so wake up another
438 has_ready_to_run_tasks_cv_.Signal(); 275 // worker thread.
439 276 has_pending_tasks_cv_.Signal();
440 // Call WillRun() before releasing |lock_| and running task.
441 task->WillRun();
442 277
443 { 278 {
444 base::AutoUnlock unlock(lock_); 279 base::AutoUnlock unlock(lock_);
445 280
446 task->RunOnThread(thread_index); 281 task->RunOnThread(thread_index);
447 } 282 }
448 283
449 // This will mark task as finished running. 284 completed_tasks_.push_back(task.Pass());
450 task->DidRun();
451 285
452 // Now iterate over all dependents to check if they are ready to run. 286 // Decrement |running_task_count_| now that we are done running task.
453 scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase( 287 running_task_count_--;
454 task);
455 if (scheduled_task) {
456 typedef internal::WorkerPoolTask::TaskVector TaskVector;
457 for (TaskVector::iterator it = scheduled_task->dependents().begin();
458 it != scheduled_task->dependents().end(); ++it) {
459 internal::WorkerPoolTask* dependent = *it;
460 if (!dependent->IsReadyToRun())
461 continue;
462
463 // Task is ready. Add it to |ready_to_run_tasks_|.
464 DCHECK(pending_tasks_.contains(dependent));
465 unsigned priority = pending_tasks_.get(dependent)->priority();
466 DCHECK(!ready_to_run_tasks_.count(priority) ||
467 ready_to_run_tasks_[priority] == dependent);
468 ready_to_run_tasks_[priority] = dependent;
469 }
470 }
471
472 // Finally add task to |completed_tasks_|.
473 completed_tasks_.push_back(task);
474 } 288 }
475 289
476 // We noticed we should exit. Wake up the next worker so it knows it should 290 // We noticed we should exit. Wake up the next worker so it knows it should
477 // exit as well (because the Shutdown() code only signals once). 291 // exit as well (because the Shutdown() code only signals once).
478 has_ready_to_run_tasks_cv_.Signal(); 292 has_pending_tasks_cv_.Signal();
479 }
480
481 // BuildScheduledTaskMap() takes a task tree as input and constructs
482 // a unique set of tasks with edges between dependencies pointing in
483 // the direction of the dependents. Each task is given a unique priority
484 // which is currently the same as the DFS traversal order.
485 //
486 // Input: Output:
487 //
488 // root task4 Task | Priority (lower is better)
489 // / \ / \ -------+---------------------------
490 // task1 task2 task3 task2 root | 4
491 // | | | | task1 | 2
492 // task3 | task1 | task2 | 3
493 // | | \ / task3 | 1
494 // task4 task4 root task4 | 0
495 //
496 // The output can be used to efficiently maintain a queue of
497 // "ready to run" tasks.
498
499 // static
500 unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive(
501 internal::WorkerPoolTask* task,
502 internal::WorkerPoolTask* dependent,
503 unsigned priority,
504 ScheduledTaskMap* scheduled_tasks) {
505 // Skip sub-tree if task has already completed.
506 if (task->HasCompleted())
507 return priority;
508
509 ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task);
510 if (scheduled_it != scheduled_tasks->end()) {
511 DCHECK(dependent);
512 scheduled_it->second->dependents().push_back(dependent);
513 return priority;
514 }
515
516 typedef internal::WorkerPoolTask::TaskVector TaskVector;
517 for (TaskVector::iterator it = task->dependencies().begin();
518 it != task->dependencies().end(); ++it) {
519 internal::WorkerPoolTask* dependency = *it;
520 priority = BuildScheduledTaskMapRecursive(
521 dependency, task, priority, scheduled_tasks);
522 }
523
524 scheduled_tasks->set(task,
525 make_scoped_ptr(new ScheduledTask(dependent,
526 priority)));
527
528 return priority + 1;
529 }
530
531 // static
532 void WorkerPool::Inner::BuildScheduledTaskMap(
533 internal::WorkerPoolTask* root,
534 ScheduledTaskMap* scheduled_tasks) {
535 const unsigned kBasePriority = 0u;
536 DCHECK(root);
537 BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks);
538 } 293 }
539 294
540 WorkerPool::WorkerPool(size_t num_threads, 295 WorkerPool::WorkerPool(size_t num_threads,
541 base::TimeDelta check_for_completed_tasks_delay, 296 base::TimeDelta check_for_completed_tasks_delay,
542 const std::string& thread_name_prefix) 297 const std::string& thread_name_prefix)
543 : client_(NULL), 298 : client_(NULL),
544 origin_loop_(base::MessageLoopProxy::current()), 299 origin_loop_(base::MessageLoopProxy::current()),
300 weak_ptr_factory_(this),
545 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), 301 check_for_completed_tasks_delay_(check_for_completed_tasks_delay),
546 check_for_completed_tasks_pending_(false), 302 check_for_completed_tasks_pending_(false),
547 inner_(make_scoped_ptr(new Inner(this, 303 inner_(make_scoped_ptr(new Inner(this,
548 num_threads, 304 num_threads,
549 thread_name_prefix))) { 305 thread_name_prefix))) {
550 } 306 }
551 307
552 WorkerPool::~WorkerPool() { 308 WorkerPool::~WorkerPool() {
309 // Cancel all pending callbacks.
310 weak_ptr_factory_.InvalidateWeakPtrs();
311
312 DCHECK_EQ(0u, completed_tasks_.size());
553 } 313 }
554 314
555 void WorkerPool::Shutdown() { 315 void WorkerPool::Shutdown() {
556 inner_->Shutdown(); 316 inner_->Shutdown();
557 317 inner_->CollectCompletedTasks();
558 TaskDeque completed_tasks; 318 DispatchCompletionCallbacks();
559 inner_->CollectCompletedTasks(&completed_tasks);
560 DispatchCompletionCallbacks(&completed_tasks);
561 } 319 }
562 320
563 void WorkerPool::OnIdle(TaskDeque* completed_tasks) { 321 void WorkerPool::PostTaskAndReply(
322 const Callback& task, const base::Closure& reply) {
323 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl(
324 task,
325 reply)).PassAs<internal::WorkerPoolTask>());
326 }
327
328 void WorkerPool::OnIdle() {
564 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); 329 TRACE_EVENT0("cc", "WorkerPool::OnIdle");
565 330
566 DispatchCompletionCallbacks(completed_tasks); 331 DispatchCompletionCallbacks();
567
568 // Cancel any pending check for completed tasks.
569 check_for_completed_tasks_callback_.Cancel();
570 check_for_completed_tasks_pending_ = false;
571 } 332 }
572 333
573 void WorkerPool::ScheduleCheckForCompletedTasks() { 334 void WorkerPool::ScheduleCheckForCompletedTasks() {
574 if (check_for_completed_tasks_pending_) 335 if (check_for_completed_tasks_pending_)
575 return; 336 return;
576 check_for_completed_tasks_callback_.Reset(
577 base::Bind(&WorkerPool::CheckForCompletedTasks,
578 base::Unretained(this)));
579 origin_loop_->PostDelayedTask( 337 origin_loop_->PostDelayedTask(
580 FROM_HERE, 338 FROM_HERE,
581 check_for_completed_tasks_callback_.callback(), 339 base::Bind(&WorkerPool::CheckForCompletedTasks,
340 weak_ptr_factory_.GetWeakPtr()),
582 check_for_completed_tasks_delay_); 341 check_for_completed_tasks_delay_);
583 check_for_completed_tasks_pending_ = true; 342 check_for_completed_tasks_pending_ = true;
584 } 343 }
585 344
586 void WorkerPool::CheckForCompletedTasks() { 345 void WorkerPool::CheckForCompletedTasks() {
587 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); 346 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
588 check_for_completed_tasks_callback_.Cancel(); 347 DCHECK(check_for_completed_tasks_pending_);
589 check_for_completed_tasks_pending_ = false; 348 check_for_completed_tasks_pending_ = false;
590 349
591 TaskDeque completed_tasks;
592
593 // Schedule another check for completed tasks if not idle. 350 // Schedule another check for completed tasks if not idle.
594 if (!inner_->CollectCompletedTasks(&completed_tasks)) 351 if (!inner_->CollectCompletedTasks())
595 ScheduleCheckForCompletedTasks(); 352 ScheduleCheckForCompletedTasks();
596 353
597 DispatchCompletionCallbacks(&completed_tasks); 354 DispatchCompletionCallbacks();
598 } 355 }
599 356
600 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { 357 void WorkerPool::DispatchCompletionCallbacks() {
601 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); 358 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks");
602 359
603 // Early out when |completed_tasks| is empty to prevent unnecessary 360 if (completed_tasks_.empty())
604 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks().
605 if (completed_tasks->empty())
606 return; 361 return;
607 362
608 while (!completed_tasks->empty()) { 363 while (completed_tasks_.size()) {
609 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); 364 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front();
610 completed_tasks->pop_front();
611 task->DidComplete(); 365 task->DidComplete();
612 task->DispatchCompletionCallback();
613 } 366 }
614 367
615 DCHECK(client_); 368 DCHECK(client_);
616 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); 369 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks();
617 } 370 }
618 371
619 void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) { 372 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) {
620 TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); 373 // Schedule check for completed tasks if not pending.
374 ScheduleCheckForCompletedTasks();
621 375
622 // Schedule check for completed tasks. 376 inner_->PostTask(task.Pass());
623 if (root)
624 ScheduleCheckForCompletedTasks();
625
626 inner_->ScheduleTasks(root);
627 } 377 }
628 378
629 } // namespace cc 379 } // namespace cc
OLDNEW
« no previous file with comments | « trunk/src/cc/base/worker_pool.h ('k') | trunk/src/cc/base/worker_pool_perftest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698