OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "cc/base/worker_pool.h" | 5 #include "cc/base/worker_pool.h" |
6 | 6 |
7 #if defined(OS_ANDROID) | 7 #if defined(OS_ANDROID) |
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
9 #include <sys/resource.h> | 9 #include <sys/resource.h> |
10 #endif | 10 #endif |
11 | 11 |
12 #include <algorithm> | 12 #include <map> |
13 | 13 |
14 #include "base/bind.h" | 14 #include "base/bind.h" |
15 #include "base/debug/trace_event.h" | 15 #include "base/debug/trace_event.h" |
16 #include "base/hash_tables.h" | |
16 #include "base/stringprintf.h" | 17 #include "base/stringprintf.h" |
17 #include "base/synchronization/condition_variable.h" | |
18 #include "base/threading/simple_thread.h" | 18 #include "base/threading/simple_thread.h" |
19 #include "base/threading/thread_restrictions.h" | 19 #include "base/threading/thread_restrictions.h" |
20 #include "cc/base/scoped_ptr_deque.h" | |
21 #include "cc/base/scoped_ptr_hash_map.h" | |
22 | |
23 #if defined(COMPILER_GCC) | |
vmpstr
2013/05/24 17:01:37
Does this part just work with clang?
reveman
2013/05/24 18:53:48
yes, you'll find the same throughout the chromium
| |
24 namespace BASE_HASH_NAMESPACE { | |
25 template <> struct hash<cc::internal::WorkerPoolTask*> { | |
26 size_t operator()(cc::internal::WorkerPoolTask* ptr) const { | |
27 return hash<size_t>()(reinterpret_cast<size_t>(ptr)); | |
28 } | |
29 }; | |
30 } // namespace BASE_HASH_NAMESPACE | |
31 #endif // COMPILER | |
20 | 32 |
21 namespace cc { | 33 namespace cc { |
22 | 34 |
23 namespace { | |
24 | |
25 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { | |
26 public: | |
27 WorkerPoolTaskImpl(const WorkerPool::Callback& task, | |
28 const base::Closure& reply) | |
29 : internal::WorkerPoolTask(reply), | |
30 task_(task) {} | |
31 | |
32 virtual void RunOnThread(unsigned thread_index) OVERRIDE { | |
33 task_.Run(); | |
34 } | |
35 | |
36 private: | |
37 WorkerPool::Callback task_; | |
38 }; | |
39 | |
40 } // namespace | |
41 | |
42 namespace internal { | 35 namespace internal { |
43 | 36 |
44 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { | 37 WorkerPoolTask::WorkerPoolTask() |
38 : did_schedule_(false), | |
39 did_run_(false), | |
40 did_complete_(false) { | |
41 } | |
42 | |
43 WorkerPoolTask::WorkerPoolTask(TaskVector* dependencies) | |
44 : did_schedule_(false), | |
45 did_run_(false), | |
46 did_complete_(false) { | |
47 dependencies_.swap(*dependencies); | |
45 } | 48 } |
46 | 49 |
47 WorkerPoolTask::~WorkerPoolTask() { | 50 WorkerPoolTask::~WorkerPoolTask() { |
51 DCHECK_EQ(did_schedule_, did_complete_); | |
52 DCHECK(!did_run_ || did_schedule_); | |
53 DCHECK(!did_run_ || did_complete_); | |
54 } | |
55 | |
56 void WorkerPoolTask::DidSchedule() { | |
57 DCHECK(!did_complete_); | |
58 did_schedule_ = true; | |
59 } | |
60 | |
61 void WorkerPoolTask::WillRun() { | |
62 DCHECK(did_schedule_); | |
63 DCHECK(!did_complete_); | |
64 DCHECK(!did_run_); | |
65 } | |
66 | |
67 void WorkerPoolTask::DidRun() { | |
68 did_run_ = true; | |
48 } | 69 } |
49 | 70 |
50 void WorkerPoolTask::DidComplete() { | 71 void WorkerPoolTask::DidComplete() { |
51 reply_.Run(); | 72 DCHECK(did_schedule_); |
73 DCHECK(!did_complete_); | |
74 did_complete_ = true; | |
75 } | |
76 | |
77 bool WorkerPoolTask::IsReadyToRun() const { | |
78 // TODO(reveman): Use counter to improve performance. | |
79 for (TaskVector::const_reverse_iterator it = dependencies_.rbegin(); | |
80 it != dependencies_.rend(); ++it) { | |
81 WorkerPoolTask* dependency = *it; | |
82 if (!dependency->HasFinishedRunning()) | |
83 return false; | |
84 } | |
85 return true; | |
86 } | |
87 | |
88 bool WorkerPoolTask::HasFinishedRunning() const { | |
89 return did_run_; | |
90 } | |
91 | |
92 bool WorkerPoolTask::HasCompleted() const { | |
93 return did_complete_; | |
52 } | 94 } |
53 | 95 |
54 } // namespace internal | 96 } // namespace internal |
55 | 97 |
56 // Internal to the worker pool. Any data or logic that needs to be | 98 // Internal to the worker pool. Any data or logic that needs to be |
57 // shared between threads lives in this class. All members are guarded | 99 // shared between threads lives in this class. All members are guarded |
58 // by |lock_|. | 100 // by |lock_|. |
59 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { | 101 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
60 public: | 102 public: |
61 Inner(WorkerPool* worker_pool, | 103 Inner(WorkerPool* worker_pool, |
62 size_t num_threads, | 104 size_t num_threads, |
63 const std::string& thread_name_prefix); | 105 const std::string& thread_name_prefix); |
64 virtual ~Inner(); | 106 virtual ~Inner(); |
65 | 107 |
66 void Shutdown(); | 108 void Shutdown(); |
67 | 109 |
68 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); | 110 // Schedule running of |root| task and all its dependencies. Tasks |
111 // previously scheduled but no longer needed to run |root| will be | |
112 // canceled unless already running. Canceled tasks are moved to | |
113 // |completed_tasks_| without being run. The result is that once | |
114 // scheduled, a task is guaranteed to end up in the |completed_tasks_| | |
115 // queue even if they later get canceled by another call to | |
116 // ScheduleTasks(). | |
117 void ScheduleTasks(internal::WorkerPoolTask* root); | |
69 | 118 |
70 // Appends all completed tasks to worker pool's completed tasks queue | 119 // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
71 // and returns true if idle. | 120 bool CollectCompletedTasks(TaskDeque* completed_tasks); |
72 bool CollectCompletedTasks(); | |
73 | 121 |
74 private: | 122 private: |
75 // Appends all completed tasks to |completed_tasks|. Lock must | 123 class ScheduledTask { |
76 // already be acquired before calling this function. | 124 public: |
77 bool AppendCompletedTasksWithLockAcquired( | 125 ScheduledTask(internal::WorkerPoolTask* dependent, unsigned priority) |
78 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); | 126 : priority_(priority) { |
127 if (dependent) | |
128 dependents_.push_back(dependent); | |
129 } | |
130 | |
131 internal::WorkerPoolTask::TaskVector& dependents() { return dependents_; } | |
132 unsigned priority() const { return priority_; } | |
133 | |
134 private: | |
135 internal::WorkerPoolTask::TaskVector dependents_; | |
136 unsigned priority_; | |
137 }; | |
138 typedef internal::WorkerPoolTask* ScheduledTaskMapKey; | |
139 typedef ScopedPtrHashMap<ScheduledTaskMapKey, ScheduledTask> | |
140 ScheduledTaskMap; | |
141 | |
142 // This builds a ScheduledTaskMap from a root task. | |
143 static unsigned BuildScheduledTaskMapRecursive( | |
144 internal::WorkerPoolTask* task, | |
145 internal::WorkerPoolTask* dependent, | |
146 unsigned priority, | |
147 ScheduledTaskMap* scheduled_tasks); | |
148 static void BuildScheduledTaskMap( | |
149 internal::WorkerPoolTask* root, ScheduledTaskMap* scheduled_tasks); | |
150 | |
151 // Collect all completed tasks by swapping the contents of | |
152 // |completed_tasks| and |completed_tasks_|. Lock must be acquired | |
153 // before calling this function. Returns true if idle. | |
154 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); | |
79 | 155 |
80 // Schedule an OnIdleOnOriginThread callback if not already pending. | 156 // Schedule an OnIdleOnOriginThread callback if not already pending. |
81 // Lock must already be acquired before calling this function. | 157 // Lock must already be acquired before calling this function. |
82 void ScheduleOnIdleWithLockAcquired(); | 158 void ScheduleOnIdleWithLockAcquired(); |
83 void OnIdleOnOriginThread(); | 159 void OnIdleOnOriginThread(); |
84 | 160 |
85 // Overridden from base::DelegateSimpleThread: | 161 // Overridden from base::DelegateSimpleThread: |
86 virtual void Run() OVERRIDE; | 162 virtual void Run() OVERRIDE; |
87 | 163 |
88 // Pointer to worker pool. Can only be used on origin thread. | 164 // Pointer to worker pool. Can only be used on origin thread. |
89 // Not guarded by |lock_|. | 165 // Not guarded by |lock_|. |
90 WorkerPool* worker_pool_on_origin_thread_; | 166 WorkerPool* worker_pool_on_origin_thread_; |
91 | 167 |
92 // This lock protects all members of this class except | 168 // This lock protects all members of this class except |
93 // |worker_pool_on_origin_thread_|. Do not read or modify anything | 169 // |worker_pool_on_origin_thread_|. Do not read or modify anything |
94 // without holding this lock. Do not block while holding this lock. | 170 // without holding this lock. Do not block while holding this lock. |
95 mutable base::Lock lock_; | 171 mutable base::Lock lock_; |
96 | 172 |
97 // Condition variable that is waited on by worker threads until new | 173 // Condition variable that is waited on by worker threads until new |
98 // tasks are posted or shutdown starts. | 174 // tasks are ready to run or shutdown starts. |
99 base::ConditionVariable has_pending_tasks_cv_; | 175 base::ConditionVariable has_ready_to_run_tasks_cv_; |
100 | 176 |
101 // Target message loop used for posting callbacks. | 177 // Target message loop used for posting callbacks. |
102 scoped_refptr<base::MessageLoopProxy> origin_loop_; | 178 scoped_refptr<base::MessageLoopProxy> origin_loop_; |
103 | 179 |
104 base::WeakPtrFactory<Inner> weak_ptr_factory_; | 180 base::WeakPtrFactory<Inner> weak_ptr_factory_; |
105 | 181 |
106 const base::Closure on_idle_callback_; | 182 const base::Closure on_idle_callback_; |
107 // Set when a OnIdleOnOriginThread() callback is pending. | 183 // Set when a OnIdleOnOriginThread() callback is pending. |
108 bool on_idle_pending_; | 184 bool on_idle_pending_; |
109 | 185 |
110 // Provides each running thread loop with a unique index. First thread | 186 // Provides each running thread loop with a unique index. First thread |
111 // loop index is 0. | 187 // loop index is 0. |
112 unsigned next_thread_index_; | 188 unsigned next_thread_index_; |
113 | 189 |
114 // Number of tasks currently running. | |
115 unsigned running_task_count_; | |
116 | |
117 // Set during shutdown. Tells workers to exit when no more tasks | 190 // Set during shutdown. Tells workers to exit when no more tasks |
118 // are pending. | 191 // are pending. |
119 bool shutdown_; | 192 bool shutdown_; |
120 | 193 |
121 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; | 194 // The root task that is a dependent of all other tasks. |
122 TaskDeque pending_tasks_; | 195 scoped_refptr<internal::WorkerPoolTask> root_; |
196 | |
197 // This set contains all pending tasks. | |
198 ScheduledTaskMap pending_tasks_; | |
199 | |
200 // Ordered set of tasks that are ready to run. | |
201 // TODO(reveman): priority_queue might be more efficient. | |
202 typedef std::map<unsigned, internal::WorkerPoolTask*> TaskMap; | |
203 TaskMap ready_to_run_tasks_; | |
204 | |
205 // This set contains all currently running tasks. | |
206 ScheduledTaskMap running_tasks_; | |
207 | |
208 // Completed tasks not yet collected by origin thread. | |
123 TaskDeque completed_tasks_; | 209 TaskDeque completed_tasks_; |
124 | 210 |
125 ScopedPtrDeque<base::DelegateSimpleThread> workers_; | 211 ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
126 | 212 |
127 DISALLOW_COPY_AND_ASSIGN(Inner); | 213 DISALLOW_COPY_AND_ASSIGN(Inner); |
128 }; | 214 }; |
129 | 215 |
130 WorkerPool::Inner::Inner(WorkerPool* worker_pool, | 216 WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
131 size_t num_threads, | 217 size_t num_threads, |
132 const std::string& thread_name_prefix) | 218 const std::string& thread_name_prefix) |
133 : worker_pool_on_origin_thread_(worker_pool), | 219 : worker_pool_on_origin_thread_(worker_pool), |
134 lock_(), | 220 lock_(), |
135 has_pending_tasks_cv_(&lock_), | 221 has_ready_to_run_tasks_cv_(&lock_), |
136 origin_loop_(base::MessageLoopProxy::current()), | 222 origin_loop_(base::MessageLoopProxy::current()), |
137 weak_ptr_factory_(this), | 223 weak_ptr_factory_(this), |
138 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, | 224 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
139 weak_ptr_factory_.GetWeakPtr())), | 225 weak_ptr_factory_.GetWeakPtr())), |
140 on_idle_pending_(false), | 226 on_idle_pending_(false), |
141 next_thread_index_(0), | 227 next_thread_index_(0), |
142 running_task_count_(0), | |
143 shutdown_(false) { | 228 shutdown_(false) { |
144 base::AutoLock lock(lock_); | 229 base::AutoLock lock(lock_); |
145 | 230 |
146 while (workers_.size() < num_threads) { | 231 while (workers_.size() < num_threads) { |
147 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | 232 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
148 new base::DelegateSimpleThread( | 233 new base::DelegateSimpleThread( |
149 this, | 234 this, |
150 thread_name_prefix + | 235 thread_name_prefix + |
151 base::StringPrintf( | 236 base::StringPrintf( |
152 "Worker%u", | 237 "Worker%u", |
153 static_cast<unsigned>(workers_.size() + 1)).c_str())); | 238 static_cast<unsigned>(workers_.size() + 1)).c_str())); |
154 worker->Start(); | 239 worker->Start(); |
155 workers_.push_back(worker.Pass()); | 240 workers_.push_back(worker.Pass()); |
156 } | 241 } |
157 } | 242 } |
158 | 243 |
159 WorkerPool::Inner::~Inner() { | 244 WorkerPool::Inner::~Inner() { |
160 base::AutoLock lock(lock_); | 245 base::AutoLock lock(lock_); |
161 | 246 |
162 DCHECK(shutdown_); | 247 DCHECK(shutdown_); |
163 | 248 |
164 // Cancel all pending callbacks. | |
165 weak_ptr_factory_.InvalidateWeakPtrs(); | |
166 | |
167 DCHECK_EQ(0u, pending_tasks_.size()); | 249 DCHECK_EQ(0u, pending_tasks_.size()); |
250 DCHECK_EQ(0u, ready_to_run_tasks_.size()); | |
251 DCHECK_EQ(0u, running_tasks_.size()); | |
168 DCHECK_EQ(0u, completed_tasks_.size()); | 252 DCHECK_EQ(0u, completed_tasks_.size()); |
169 DCHECK_EQ(0u, running_task_count_); | |
170 } | 253 } |
171 | 254 |
172 void WorkerPool::Inner::Shutdown() { | 255 void WorkerPool::Inner::Shutdown() { |
173 { | 256 { |
174 base::AutoLock lock(lock_); | 257 base::AutoLock lock(lock_); |
175 | 258 |
176 DCHECK(!shutdown_); | 259 DCHECK(!shutdown_); |
177 shutdown_ = true; | 260 shutdown_ = true; |
178 | 261 |
179 // Wake up a worker so it knows it should exit. This will cause all workers | 262 // Wake up a worker so it knows it should exit. This will cause all workers |
180 // to exit as each will wake up another worker before exiting. | 263 // to exit as each will wake up another worker before exiting. |
181 has_pending_tasks_cv_.Signal(); | 264 has_ready_to_run_tasks_cv_.Signal(); |
182 } | 265 } |
183 | 266 |
184 while (workers_.size()) { | 267 while (workers_.size()) { |
185 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | 268 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
186 // http://crbug.com/240453 - Join() is considered IO and will block this | 269 // http://crbug.com/240453 - Join() is considered IO and will block this |
187 // thread. See also http://crbug.com/239423 for further ideas. | 270 // thread. See also http://crbug.com/239423 for further ideas. |
188 base::ThreadRestrictions::ScopedAllowIO allow_io; | 271 base::ThreadRestrictions::ScopedAllowIO allow_io; |
189 worker->Join(); | 272 worker->Join(); |
190 } | 273 } |
274 | |
275 // Cancel any pending OnIdle callback. | |
276 weak_ptr_factory_.InvalidateWeakPtrs(); | |
191 } | 277 } |
192 | 278 |
193 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 279 void WorkerPool::Inner::ScheduleTasks(internal::WorkerPoolTask* root) { |
280 // It is OK to call ScheduleTasks() after shutdown if |root| is NULL. | |
281 DCHECK(!root || !shutdown_); | |
282 | |
283 ScheduledTaskMap new_pending_tasks; | |
284 if (root) | |
285 BuildScheduledTaskMap(root, &new_pending_tasks); | |
286 | |
287 scoped_refptr<internal::WorkerPoolTask> new_root(root); | |
288 | |
289 { | |
290 base::AutoLock lock(lock_); | |
291 | |
292 // First remove all completed tasks from |new_pending_tasks|. | |
293 for (TaskDeque::iterator it = completed_tasks_.begin(); | |
294 it != completed_tasks_.end(); ++it) { | |
295 internal::WorkerPoolTask* task = *it; | |
296 new_pending_tasks.take_and_erase(task); | |
297 } | |
298 | |
299 // Move tasks not present in |new_pending_tasks| to |completed_tasks_|. | |
300 for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); | |
301 it != pending_tasks_.end(); ++it) { | |
302 internal::WorkerPoolTask* task = it->first; | |
303 | |
304 // Task has completed if not present in |new_pending_tasks|. | |
305 if (!new_pending_tasks.contains(task)) | |
306 completed_tasks_.push_back(task); | |
307 } | |
308 | |
309 // Build new running task set. | |
310 ScheduledTaskMap new_running_tasks; | |
311 for (ScheduledTaskMap::iterator it = running_tasks_.begin(); | |
312 it != running_tasks_.end(); ++it) { | |
313 internal::WorkerPoolTask* task = it->first; | |
314 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); | |
vmpstr
2013/05/24 17:01:37
Are currently running tasks guaranteed to be in th
reveman
2013/05/24 18:53:48
No, but we need to set the ScheduledTask value to
| |
315 } | |
316 | |
317 // Swap root and task sets. | |
318 // Note: old tasks are intentionally destroyed after releasing |lock_|. | |
319 root_.swap(new_root); | |
320 pending_tasks_.swap(new_pending_tasks); | |
321 running_tasks_.swap(new_running_tasks); | |
322 | |
323 // Rebuild |ready_to_run_tasks_| before letting. | |
324 ready_to_run_tasks_.clear(); | |
325 for (ScheduledTaskMap::iterator it = pending_tasks_.begin(); | |
326 it != pending_tasks_.end(); ++it) { | |
327 internal::WorkerPoolTask* task = it->first; | |
328 | |
329 // Completed tasks should not exist in |pending_tasks_|. | |
330 DCHECK(!task->HasFinishedRunning()); | |
331 | |
332 // Call DidSchedule() to indicate that this task has been scheduled. | |
333 // Note: This is only for debugging purposes. | |
334 task->DidSchedule(); | |
335 | |
336 DCHECK_EQ(0u, ready_to_run_tasks_.count(it->second->priority())); | |
337 if (task->IsReadyToRun()) | |
338 ready_to_run_tasks_[it->second->priority()] = task; | |
339 } | |
340 | |
341 // If there is more work available, wake up worker thread. | |
342 if (!ready_to_run_tasks_.empty()) | |
343 has_ready_to_run_tasks_cv_.Signal(); | |
344 } | |
345 } | |
346 | |
347 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { | |
194 base::AutoLock lock(lock_); | 348 base::AutoLock lock(lock_); |
195 | 349 |
196 pending_tasks_.push_back(task.Pass()); | 350 return CollectCompletedTasksWithLockAcquired(completed_tasks); |
197 | |
198 // There is more work available, so wake up worker thread. | |
199 has_pending_tasks_cv_.Signal(); | |
200 } | 351 } |
201 | 352 |
202 bool WorkerPool::Inner::CollectCompletedTasks() { | 353 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
203 base::AutoLock lock(lock_); | 354 TaskDeque* completed_tasks) { |
204 | |
205 return AppendCompletedTasksWithLockAcquired( | |
206 &worker_pool_on_origin_thread_->completed_tasks_); | |
207 } | |
208 | |
209 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( | |
210 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { | |
211 lock_.AssertAcquired(); | 355 lock_.AssertAcquired(); |
212 | 356 |
213 while (completed_tasks_.size()) | 357 DCHECK_EQ(0u, completed_tasks->size()); |
214 completed_tasks->push_back(completed_tasks_.take_front().Pass()); | 358 completed_tasks->swap(completed_tasks_); |
215 | 359 |
216 return !running_task_count_ && pending_tasks_.empty(); | 360 return running_tasks_.empty() && pending_tasks_.empty(); |
217 } | 361 } |
218 | 362 |
219 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { | 363 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
220 lock_.AssertAcquired(); | 364 lock_.AssertAcquired(); |
221 | 365 |
222 if (on_idle_pending_) | 366 if (on_idle_pending_) |
223 return; | 367 return; |
224 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); | 368 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); |
225 on_idle_pending_ = true; | 369 on_idle_pending_ = true; |
226 } | 370 } |
227 | 371 |
228 void WorkerPool::Inner::OnIdleOnOriginThread() { | 372 void WorkerPool::Inner::OnIdleOnOriginThread() { |
373 TaskDeque completed_tasks; | |
374 | |
229 { | 375 { |
230 base::AutoLock lock(lock_); | 376 base::AutoLock lock(lock_); |
231 | 377 |
232 DCHECK(on_idle_pending_); | 378 DCHECK(on_idle_pending_); |
233 on_idle_pending_ = false; | 379 on_idle_pending_ = false; |
234 | 380 |
235 // Early out if no longer idle. | 381 // Early out if no longer idle. |
236 if (running_task_count_ || !pending_tasks_.empty()) | 382 if (!running_tasks_.empty() || !pending_tasks_.empty()) |
237 return; | 383 return; |
238 | 384 |
239 AppendCompletedTasksWithLockAcquired( | 385 CollectCompletedTasksWithLockAcquired(&completed_tasks); |
240 &worker_pool_on_origin_thread_->completed_tasks_); | |
241 } | 386 } |
242 | 387 |
243 worker_pool_on_origin_thread_->OnIdle(); | 388 worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
244 } | 389 } |
245 | 390 |
246 void WorkerPool::Inner::Run() { | 391 void WorkerPool::Inner::Run() { |
247 #if defined(OS_ANDROID) | 392 #if defined(OS_ANDROID) |
248 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 393 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
249 int nice_value = 10; // Idle priority. | 394 int nice_value = 10; // Idle priority. |
250 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); | 395 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); |
251 #endif | 396 #endif |
252 | 397 |
253 base::AutoLock lock(lock_); | 398 base::AutoLock lock(lock_); |
254 | 399 |
255 // Get a unique thread index. | 400 // Get a unique thread index. |
256 int thread_index = next_thread_index_++; | 401 int thread_index = next_thread_index_++; |
257 | 402 |
258 while (true) { | 403 while (true) { |
259 if (pending_tasks_.empty()) { | 404 if (ready_to_run_tasks_.empty()) { |
260 // Exit when shutdown is set and no more tasks are pending. | 405 if (pending_tasks_.empty()) { |
261 if (shutdown_) | 406 // Exit when shutdown is set and no more tasks are pending. |
262 break; | 407 if (shutdown_) |
408 break; | |
263 | 409 |
264 // Schedule an idle callback if requested and not pending. | 410 // Schedule an idle callback if no tasks are running. |
265 if (!running_task_count_) | 411 if (running_tasks_.empty()) |
266 ScheduleOnIdleWithLockAcquired(); | 412 ScheduleOnIdleWithLockAcquired(); |
413 } | |
267 | 414 |
268 // Wait for new pending tasks. | 415 // Wait for more tasks. |
269 has_pending_tasks_cv_.Wait(); | 416 has_ready_to_run_tasks_cv_.Wait(); |
270 continue; | 417 continue; |
271 } | 418 } |
272 | 419 |
273 // Get next task. | 420 // Take top priority task from |ready_to_run_tasks_|. |
274 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); | 421 scoped_refptr<internal::WorkerPoolTask> task( |
422 ready_to_run_tasks_.begin()->second); | |
423 ready_to_run_tasks_.erase(ready_to_run_tasks_.begin()); | |
275 | 424 |
276 // Increment |running_task_count_| before starting to run task. | 425 // Move task from |pending_tasks_| to |running_tasks_|. |
277 running_task_count_++; | 426 DCHECK(pending_tasks_.contains(task)); |
427 DCHECK(!running_tasks_.contains(task)); | |
428 running_tasks_.set(task, pending_tasks_.take_and_erase(task)); | |
278 | 429 |
279 // There may be more work available, so wake up another | 430 // There may be more work available, so wake up another worker thread. |
280 // worker thread. | 431 has_ready_to_run_tasks_cv_.Signal(); |
281 has_pending_tasks_cv_.Signal(); | 432 |
433 // Call WillRun() before releasing |lock_| and running task. | |
434 task->WillRun(); | |
282 | 435 |
283 { | 436 { |
284 base::AutoUnlock unlock(lock_); | 437 base::AutoUnlock unlock(lock_); |
285 | 438 |
286 task->RunOnThread(thread_index); | 439 task->RunOnThread(thread_index); |
287 } | 440 } |
288 | 441 |
289 completed_tasks_.push_back(task.Pass()); | 442 // This will mark task as finished running. |
443 task->DidRun(); | |
290 | 444 |
291 // Decrement |running_task_count_| now that we are done running task. | 445 // Now iterate over all dependents to check if they are ready to run. |
292 running_task_count_--; | 446 scoped_ptr<ScheduledTask> scheduled_task = running_tasks_.take_and_erase( |
447 task); | |
448 if (scheduled_task) { | |
449 typedef internal::WorkerPoolTask::TaskVector TaskVector; | |
450 for (TaskVector::iterator it = scheduled_task->dependents().begin(); | |
451 it != scheduled_task->dependents().end(); ++it) { | |
452 internal::WorkerPoolTask* dependent = *it; | |
453 if (!dependent->IsReadyToRun()) | |
454 continue; | |
455 | |
456 // Task is ready. Add it to |ready_to_run_tasks_|. | |
457 DCHECK(pending_tasks_.contains(dependent)); | |
458 unsigned priority = pending_tasks_.get(dependent)->priority(); | |
459 DCHECK(!ready_to_run_tasks_.count(priority) || | |
460 ready_to_run_tasks_[priority] == dependent); | |
461 ready_to_run_tasks_[priority] = dependent; | |
462 } | |
463 } | |
464 | |
465 // Finally add task to |completed_tasks_|. | |
466 completed_tasks_.push_back(task); | |
293 } | 467 } |
294 | 468 |
295 // We noticed we should exit. Wake up the next worker so it knows it should | 469 // We noticed we should exit. Wake up the next worker so it knows it should |
296 // exit as well (because the Shutdown() code only signals once). | 470 // exit as well (because the Shutdown() code only signals once). |
297 has_pending_tasks_cv_.Signal(); | 471 has_ready_to_run_tasks_cv_.Signal(); |
472 } | |
473 | |
474 // static | |
475 unsigned WorkerPool::Inner::BuildScheduledTaskMapRecursive( | |
vmpstr
2013/05/24 17:01:37
Can you add a few comments in this function to jus
reveman
2013/05/24 18:53:48
Added some comments and an example graph to latest
| |
476 internal::WorkerPoolTask* task, | |
477 internal::WorkerPoolTask* dependent, | |
478 unsigned priority, | |
479 ScheduledTaskMap* scheduled_tasks) { | |
480 if (task->HasCompleted()) | |
481 return priority; | |
482 | |
483 ScheduledTaskMap::iterator scheduled_it = scheduled_tasks->find(task); | |
484 if (scheduled_it != scheduled_tasks->end()) { | |
485 DCHECK(dependent); | |
486 scheduled_it->second->dependents().push_back(dependent); | |
487 return priority; | |
488 } | |
489 | |
490 typedef internal::WorkerPoolTask::TaskVector TaskVector; | |
491 for (TaskVector::iterator it = task->dependencies().begin(); | |
492 it != task->dependencies().end(); ++it) { | |
493 internal::WorkerPoolTask* dependency = *it; | |
494 priority = BuildScheduledTaskMapRecursive( | |
495 dependency, task, priority, scheduled_tasks); | |
496 } | |
497 | |
498 scheduled_tasks->set(task, | |
499 make_scoped_ptr(new ScheduledTask(dependent, | |
500 priority))); | |
501 | |
502 return priority + 1; | |
503 } | |
504 | |
505 // static | |
506 void WorkerPool::Inner::BuildScheduledTaskMap( | |
507 internal::WorkerPoolTask* root, | |
508 ScheduledTaskMap* scheduled_tasks) { | |
509 const unsigned kBasePriority = 0u; | |
510 DCHECK(root); | |
511 BuildScheduledTaskMapRecursive(root, NULL, kBasePriority, scheduled_tasks); | |
298 } | 512 } |
299 | 513 |
300 WorkerPool::WorkerPool(size_t num_threads, | 514 WorkerPool::WorkerPool(size_t num_threads, |
301 base::TimeDelta check_for_completed_tasks_delay, | 515 base::TimeDelta check_for_completed_tasks_delay, |
302 const std::string& thread_name_prefix) | 516 const std::string& thread_name_prefix) |
303 : client_(NULL), | 517 : client_(NULL), |
304 origin_loop_(base::MessageLoopProxy::current()), | 518 origin_loop_(base::MessageLoopProxy::current()), |
305 weak_ptr_factory_(this), | |
306 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), | 519 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
307 check_for_completed_tasks_pending_(false), | 520 check_for_completed_tasks_pending_(false), |
308 inner_(make_scoped_ptr(new Inner(this, | 521 inner_(make_scoped_ptr(new Inner(this, |
309 num_threads, | 522 num_threads, |
310 thread_name_prefix))) { | 523 thread_name_prefix))) { |
311 } | 524 } |
312 | 525 |
313 WorkerPool::~WorkerPool() { | 526 WorkerPool::~WorkerPool() { |
314 // Cancel all pending callbacks. | |
315 weak_ptr_factory_.InvalidateWeakPtrs(); | |
316 | |
317 DCHECK_EQ(0u, completed_tasks_.size()); | |
318 } | 527 } |
319 | 528 |
320 void WorkerPool::Shutdown() { | 529 void WorkerPool::Shutdown() { |
321 inner_->Shutdown(); | 530 inner_->Shutdown(); |
322 inner_->CollectCompletedTasks(); | 531 |
323 DispatchCompletionCallbacks(); | 532 TaskDeque completed_tasks; |
533 inner_->CollectCompletedTasks(&completed_tasks); | |
534 DispatchCompletionCallbacks(&completed_tasks); | |
324 } | 535 } |
325 | 536 |
326 void WorkerPool::PostTaskAndReply( | 537 void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
327 const Callback& task, const base::Closure& reply) { | |
328 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( | |
329 task, | |
330 reply)).PassAs<internal::WorkerPoolTask>()); | |
331 } | |
332 | |
333 void WorkerPool::OnIdle() { | |
334 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); | 538 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
335 | 539 |
336 DispatchCompletionCallbacks(); | 540 DispatchCompletionCallbacks(completed_tasks); |
541 | |
542 // Cancel any pending check for completed tasks. | |
543 check_for_completed_tasks_callback_.Cancel(); | |
544 check_for_completed_tasks_pending_ = false; | |
337 } | 545 } |
338 | 546 |
339 void WorkerPool::ScheduleCheckForCompletedTasks() { | 547 void WorkerPool::ScheduleCheckForCompletedTasks() { |
340 if (check_for_completed_tasks_pending_) | 548 if (check_for_completed_tasks_pending_) |
341 return; | 549 return; |
550 check_for_completed_tasks_callback_.Reset( | |
551 base::Bind(&WorkerPool::CheckForCompletedTasks, | |
552 base::Unretained(this))); | |
342 origin_loop_->PostDelayedTask( | 553 origin_loop_->PostDelayedTask( |
343 FROM_HERE, | 554 FROM_HERE, |
344 base::Bind(&WorkerPool::CheckForCompletedTasks, | 555 check_for_completed_tasks_callback_.callback(), |
345 weak_ptr_factory_.GetWeakPtr()), | |
346 check_for_completed_tasks_delay_); | 556 check_for_completed_tasks_delay_); |
347 check_for_completed_tasks_pending_ = true; | 557 check_for_completed_tasks_pending_ = true; |
348 } | 558 } |
349 | 559 |
350 void WorkerPool::CheckForCompletedTasks() { | 560 void WorkerPool::CheckForCompletedTasks() { |
351 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); | 561 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
352 DCHECK(check_for_completed_tasks_pending_); | 562 check_for_completed_tasks_callback_.Cancel(); |
353 check_for_completed_tasks_pending_ = false; | 563 check_for_completed_tasks_pending_ = false; |
354 | 564 |
565 TaskDeque completed_tasks; | |
566 | |
355 // Schedule another check for completed tasks if not idle. | 567 // Schedule another check for completed tasks if not idle. |
356 if (!inner_->CollectCompletedTasks()) | 568 if (!inner_->CollectCompletedTasks(&completed_tasks)) |
357 ScheduleCheckForCompletedTasks(); | 569 ScheduleCheckForCompletedTasks(); |
358 | 570 |
359 DispatchCompletionCallbacks(); | 571 DispatchCompletionCallbacks(&completed_tasks); |
360 } | 572 } |
361 | 573 |
362 void WorkerPool::DispatchCompletionCallbacks() { | 574 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
363 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); | 575 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
364 | 576 |
365 if (completed_tasks_.empty()) | 577 // Early out when |completed_tasks| is empty to prevent unnecessary |
578 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). | |
579 if (completed_tasks->empty()) | |
366 return; | 580 return; |
367 | 581 |
368 while (completed_tasks_.size()) { | 582 while (!completed_tasks->empty()) { |
369 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); | 583 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
584 completed_tasks->pop_front(); | |
370 task->DidComplete(); | 585 task->DidComplete(); |
586 task->DispatchCompletionCallback(); | |
371 } | 587 } |
372 | 588 |
373 DCHECK(client_); | 589 DCHECK(client_); |
374 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); | 590 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
375 } | 591 } |
376 | 592 |
377 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 593 void WorkerPool::ScheduleTasks(internal::WorkerPoolTask* root) { |
378 // Schedule check for completed tasks if not pending. | 594 TRACE_EVENT0("cc", "WorkerPool::ScheduleTasks"); |
379 ScheduleCheckForCompletedTasks(); | |
380 | 595 |
381 inner_->PostTask(task.Pass()); | 596 // Schedule check for completed tasks. |
597 if (root) | |
598 ScheduleCheckForCompletedTasks(); | |
599 | |
600 inner_->ScheduleTasks(root); | |
382 } | 601 } |
383 | 602 |
384 } // namespace cc | 603 } // namespace cc |
OLD | NEW |