OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "cc/base/worker_pool.h" | 5 #include "cc/base/worker_pool.h" |
6 | 6 |
7 #if defined(OS_ANDROID) | |
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | |
9 #include <sys/resource.h> | |
10 #endif | |
11 | |
7 #include <algorithm> | 12 #include <algorithm> |
8 | 13 |
9 #include "base/bind.h" | 14 #include "base/bind.h" |
10 #include "base/debug/trace_event.h" | 15 #include "base/debug/trace_event.h" |
11 #include "base/stringprintf.h" | 16 #include "base/stringprintf.h" |
12 #include "base/synchronization/condition_variable.h" | 17 #include "base/synchronization/condition_variable.h" |
13 #include "base/threading/simple_thread.h" | 18 #include "base/threading/simple_thread.h" |
14 | 19 |
15 #if defined(OS_ANDROID) | |
16 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | |
17 #include <sys/resource.h> | |
18 #endif | |
19 | |
20 namespace cc { | 20 namespace cc { |
21 | 21 |
22 namespace { | 22 namespace { |
23 | 23 |
24 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { | 24 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
25 public: | 25 public: |
26 WorkerPoolTaskImpl(const WorkerPool::Callback& task, | 26 WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
27 const base::Closure& reply) | 27 const base::Closure& reply) |
28 : internal::WorkerPoolTask(reply), | 28 : internal::WorkerPoolTask(reply), |
29 task_(task) {} | 29 task_(task) {} |
30 | 30 |
31 virtual bool IsCheap() OVERRIDE { return false; } | |
32 | |
33 virtual void Run() OVERRIDE { | |
34 task_.Run(); | |
35 } | |
36 | |
37 virtual void RunOnThread(unsigned thread_index) OVERRIDE { | 31 virtual void RunOnThread(unsigned thread_index) OVERRIDE { |
38 task_.Run(); | 32 task_.Run(); |
39 } | 33 } |
40 | 34 |
41 private: | 35 private: |
42 WorkerPool::Callback task_; | 36 WorkerPool::Callback task_; |
43 }; | 37 }; |
44 | 38 |
45 } // namespace | 39 } // namespace |
46 | 40 |
(...skipping 11 matching lines...) Expand all Loading... | |
58 | 52 |
59 } // namespace internal | 53 } // namespace internal |
60 | 54 |
61 // Internal to the worker pool. Any data or logic that needs to be | 55 // Internal to the worker pool. Any data or logic that needs to be |
62 // shared between threads lives in this class. All members are guarded | 56 // shared between threads lives in this class. All members are guarded |
63 // by |lock_|. | 57 // by |lock_|. |
64 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { | 58 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
65 public: | 59 public: |
66 Inner(WorkerPool* worker_pool, | 60 Inner(WorkerPool* worker_pool, |
67 size_t num_threads, | 61 size_t num_threads, |
68 const std::string& thread_name_prefix, | 62 const std::string& thread_name_prefix); |
69 bool need_on_task_completed_callback); | |
70 virtual ~Inner(); | 63 virtual ~Inner(); |
71 | 64 |
72 void Shutdown(); | 65 void Shutdown(); |
73 | 66 |
74 void PostTask(scoped_ptr<internal::WorkerPoolTask> task, bool signal_workers); | 67 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); |
75 | 68 |
76 // Appends all completed tasks to worker pool's completed tasks queue | 69 // Appends all completed tasks to worker pool's completed tasks queue |
77 // and returns true if idle. | 70 // and returns true if idle. |
78 bool CollectCompletedTasks(); | 71 bool CollectCompletedTasks(); |
79 | 72 |
80 // Runs cheap tasks on caller thread until |time_limit| is reached | |
81 // and returns true if idle. | |
82 bool RunCheapTasksUntilTimeLimit(base::TimeTicks time_limit); | |
83 | |
84 private: | 73 private: |
85 // Appends all completed tasks to |completed_tasks|. Lock must | 74 // Appends all completed tasks to |completed_tasks|. Lock must |
86 // already be acquired before calling this function. | 75 // already be acquired before calling this function. |
87 bool AppendCompletedTasksWithLockAcquired( | 76 bool AppendCompletedTasksWithLockAcquired( |
88 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); | 77 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); |
89 | 78 |
90 // Schedule a OnTaskCompletedOnOriginThread callback if not already | |
91 // pending. Lock must already be acquired before calling this function. | |
92 void ScheduleOnTaskCompletedWithLockAcquired(); | |
93 void OnTaskCompletedOnOriginThread(); | |
94 | |
95 // Schedule an OnIdleOnOriginThread callback if not already pending. | 79 // Schedule an OnIdleOnOriginThread callback if not already pending. |
96 // Lock must already be acquired before calling this function. | 80 // Lock must already be acquired before calling this function. |
97 void ScheduleOnIdleWithLockAcquired(); | 81 void ScheduleOnIdleWithLockAcquired(); |
98 void OnIdleOnOriginThread(); | 82 void OnIdleOnOriginThread(); |
99 | 83 |
100 // Overridden from base::DelegateSimpleThread: | 84 // Overridden from base::DelegateSimpleThread: |
101 virtual void Run() OVERRIDE; | 85 virtual void Run() OVERRIDE; |
102 | 86 |
103 // Pointer to worker pool. Can only be used on origin thread. | 87 // Pointer to worker pool. Can only be used on origin thread. |
104 // Not guarded by |lock_|. | 88 // Not guarded by |lock_|. |
105 WorkerPool* worker_pool_on_origin_thread_; | 89 WorkerPool* worker_pool_on_origin_thread_; |
106 | 90 |
107 // This lock protects all members of this class except | 91 // This lock protects all members of this class except |
108 // |worker_pool_on_origin_thread_|. Do not read or modify anything | 92 // |worker_pool_on_origin_thread_|. Do not read or modify anything |
109 // without holding this lock. Do not block while holding this lock. | 93 // without holding this lock. Do not block while holding this lock. |
110 mutable base::Lock lock_; | 94 mutable base::Lock lock_; |
111 | 95 |
112 // Condition variable that is waited on by worker threads until new | 96 // Condition variable that is waited on by worker threads until new |
113 // tasks are posted or shutdown starts. | 97 // tasks are posted or shutdown starts. |
114 base::ConditionVariable has_pending_tasks_cv_; | 98 base::ConditionVariable has_pending_tasks_cv_; |
115 | 99 |
116 // Target message loop used for posting callbacks. | 100 // Target message loop used for posting callbacks. |
117 scoped_refptr<base::MessageLoopProxy> origin_loop_; | 101 scoped_refptr<base::MessageLoopProxy> origin_loop_; |
118 | 102 |
119 base::WeakPtrFactory<Inner> weak_ptr_factory_; | 103 base::WeakPtrFactory<Inner> weak_ptr_factory_; |
120 | 104 |
121 // Set to true when worker pool requires a callback for each | |
122 // completed task. | |
123 bool need_on_task_completed_callback_; | |
124 | |
125 const base::Closure on_task_completed_callback_; | |
126 // Set when a OnTaskCompletedOnOriginThread() callback is pending. | |
127 bool on_task_completed_pending_; | |
128 | |
129 const base::Closure on_idle_callback_; | 105 const base::Closure on_idle_callback_; |
130 // Set when a OnIdleOnOriginThread() callback is pending. | 106 // Set when a OnIdleOnOriginThread() callback is pending. |
131 bool on_idle_pending_; | 107 bool on_idle_pending_; |
132 | 108 |
133 // Provides each running thread loop with a unique index. First thread | 109 // Provides each running thread loop with a unique index. First thread |
134 // loop index is 0. | 110 // loop index is 0. |
135 unsigned next_thread_index_; | 111 unsigned next_thread_index_; |
136 | 112 |
137 // Number of tasks currently running. | 113 // Number of tasks currently running. |
138 unsigned running_task_count_; | 114 unsigned running_task_count_; |
139 | 115 |
140 // Set during shutdown. Tells workers to exit when no more tasks | 116 // Set during shutdown. Tells workers to exit when no more tasks |
141 // are pending. | 117 // are pending. |
142 bool shutdown_; | 118 bool shutdown_; |
143 | 119 |
144 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; | 120 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; |
145 TaskDeque pending_tasks_; | 121 TaskDeque pending_tasks_; |
146 TaskDeque completed_tasks_; | 122 TaskDeque completed_tasks_; |
147 | 123 |
148 ScopedPtrDeque<base::DelegateSimpleThread> workers_; | 124 ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
149 | 125 |
150 DISALLOW_COPY_AND_ASSIGN(Inner); | 126 DISALLOW_COPY_AND_ASSIGN(Inner); |
151 }; | 127 }; |
152 | 128 |
153 WorkerPool::Inner::Inner(WorkerPool* worker_pool, | 129 WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
154 size_t num_threads, | 130 size_t num_threads, |
155 const std::string& thread_name_prefix, | 131 const std::string& thread_name_prefix) |
156 bool need_on_task_completed_callback) | |
157 : worker_pool_on_origin_thread_(worker_pool), | 132 : worker_pool_on_origin_thread_(worker_pool), |
158 lock_(), | 133 lock_(), |
159 has_pending_tasks_cv_(&lock_), | 134 has_pending_tasks_cv_(&lock_), |
160 origin_loop_(base::MessageLoopProxy::current()), | 135 origin_loop_(base::MessageLoopProxy::current()), |
161 weak_ptr_factory_(this), | 136 weak_ptr_factory_(this), |
162 need_on_task_completed_callback_(need_on_task_completed_callback), | |
163 on_task_completed_callback_( | |
164 base::Bind(&WorkerPool::Inner::OnTaskCompletedOnOriginThread, | |
165 weak_ptr_factory_.GetWeakPtr())), | |
166 on_task_completed_pending_(false), | |
167 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, | 137 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
168 weak_ptr_factory_.GetWeakPtr())), | 138 weak_ptr_factory_.GetWeakPtr())), |
169 on_idle_pending_(false), | 139 on_idle_pending_(false), |
170 next_thread_index_(0), | 140 next_thread_index_(0), |
171 running_task_count_(0), | 141 running_task_count_(0), |
172 shutdown_(false) { | 142 shutdown_(false) { |
173 base::AutoLock lock(lock_); | 143 base::AutoLock lock(lock_); |
174 | 144 |
175 while (workers_.size() < num_threads) { | 145 while (workers_.size() < num_threads) { |
176 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | 146 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
177 new base::DelegateSimpleThread( | 147 new base::DelegateSimpleThread( |
178 this, | 148 this, |
179 thread_name_prefix + | 149 thread_name_prefix + |
180 base::StringPrintf( | 150 base::StringPrintf( |
181 "Worker%lu", | 151 "Worker%u", |
182 static_cast<unsigned long>(workers_.size() + 1)).c_str())); | 152 static_cast<unsigned>(workers_.size() + 1)).c_str())); |
vmpstr
2013/04/26 22:51:35
nit: is the static_cast required here now?
reveman
2013/04/26 23:09:42
Done.
| |
183 worker->Start(); | 153 worker->Start(); |
184 workers_.push_back(worker.Pass()); | 154 workers_.push_back(worker.Pass()); |
185 } | 155 } |
186 } | 156 } |
187 | 157 |
188 WorkerPool::Inner::~Inner() { | 158 WorkerPool::Inner::~Inner() { |
189 base::AutoLock lock(lock_); | 159 base::AutoLock lock(lock_); |
190 | 160 |
191 DCHECK(shutdown_); | 161 DCHECK(shutdown_); |
192 | 162 |
(...skipping 16 matching lines...) Expand all Loading... | |
209 // to exit as each will wake up another worker before exiting. | 179 // to exit as each will wake up another worker before exiting. |
210 has_pending_tasks_cv_.Signal(); | 180 has_pending_tasks_cv_.Signal(); |
211 } | 181 } |
212 | 182 |
213 while (workers_.size()) { | 183 while (workers_.size()) { |
214 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | 184 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
215 worker->Join(); | 185 worker->Join(); |
216 } | 186 } |
217 } | 187 } |
218 | 188 |
219 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task, | 189 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
220 bool signal_workers) { | |
221 base::AutoLock lock(lock_); | 190 base::AutoLock lock(lock_); |
222 | 191 |
223 pending_tasks_.push_back(task.Pass()); | 192 pending_tasks_.push_back(task.Pass()); |
224 | 193 |
225 // There is more work available, so wake up worker thread. | 194 // There is more work available, so wake up worker thread. |
226 if (signal_workers) | 195 has_pending_tasks_cv_.Signal(); |
227 has_pending_tasks_cv_.Signal(); | |
228 } | 196 } |
229 | 197 |
230 bool WorkerPool::Inner::CollectCompletedTasks() { | 198 bool WorkerPool::Inner::CollectCompletedTasks() { |
231 base::AutoLock lock(lock_); | 199 base::AutoLock lock(lock_); |
232 | 200 |
233 return AppendCompletedTasksWithLockAcquired( | 201 return AppendCompletedTasksWithLockAcquired( |
234 &worker_pool_on_origin_thread_->completed_tasks_); | 202 &worker_pool_on_origin_thread_->completed_tasks_); |
235 } | 203 } |
236 | 204 |
237 bool WorkerPool::Inner::RunCheapTasksUntilTimeLimit( | |
238 base::TimeTicks time_limit) { | |
239 base::AutoLock lock(lock_); | |
240 | |
241 while (base::TimeTicks::Now() < time_limit) { | |
242 scoped_ptr<internal::WorkerPoolTask> task; | |
243 | |
244 // Find next cheap task. | |
245 for (TaskDeque::iterator iter = pending_tasks_.begin(); | |
246 iter != pending_tasks_.end(); ++iter) { | |
247 if ((*iter)->IsCheap()) { | |
248 task = pending_tasks_.take(iter); | |
249 break; | |
250 } | |
251 } | |
252 | |
253 if (!task) { | |
254 // Schedule an idle callback if requested and not pending. | |
255 if (!running_task_count_ && pending_tasks_.empty()) | |
256 ScheduleOnIdleWithLockAcquired(); | |
257 | |
258 // Exit when no more cheap tasks are pending. | |
259 break; | |
260 } | |
261 | |
262 // Increment |running_task_count_| before starting to run task. | |
263 running_task_count_++; | |
264 | |
265 { | |
266 base::AutoUnlock unlock(lock_); | |
267 | |
268 task->Run(); | |
269 | |
270 // Append tasks directly to worker pool's completed tasks queue. | |
271 worker_pool_on_origin_thread_->completed_tasks_.push_back(task.Pass()); | |
272 if (need_on_task_completed_callback_) | |
273 worker_pool_on_origin_thread_->OnTaskCompleted(); | |
274 } | |
275 | |
276 // Decrement |running_task_count_| now that we are done running task. | |
277 running_task_count_--; | |
278 } | |
279 | |
280 if (!pending_tasks_.empty()) | |
281 has_pending_tasks_cv_.Signal(); | |
282 | |
283 // Append any other completed tasks before releasing lock. | |
284 return AppendCompletedTasksWithLockAcquired( | |
285 &worker_pool_on_origin_thread_->completed_tasks_); | |
286 } | |
287 | |
288 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( | 205 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( |
289 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { | 206 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { |
290 lock_.AssertAcquired(); | 207 lock_.AssertAcquired(); |
291 | 208 |
292 while (completed_tasks_.size()) | 209 while (completed_tasks_.size()) |
293 completed_tasks->push_back(completed_tasks_.take_front().Pass()); | 210 completed_tasks->push_back(completed_tasks_.take_front().Pass()); |
294 | 211 |
295 return !running_task_count_ && pending_tasks_.empty(); | 212 return !running_task_count_ && pending_tasks_.empty(); |
296 } | 213 } |
297 | 214 |
298 void WorkerPool::Inner::ScheduleOnTaskCompletedWithLockAcquired() { | |
299 lock_.AssertAcquired(); | |
300 | |
301 if (on_task_completed_pending_ || !need_on_task_completed_callback_) | |
302 return; | |
303 origin_loop_->PostTask(FROM_HERE, on_task_completed_callback_); | |
304 on_task_completed_pending_ = true; | |
305 } | |
306 | |
307 void WorkerPool::Inner::OnTaskCompletedOnOriginThread() { | |
308 { | |
309 base::AutoLock lock(lock_); | |
310 | |
311 DCHECK(on_task_completed_pending_); | |
312 on_task_completed_pending_ = false; | |
313 | |
314 AppendCompletedTasksWithLockAcquired( | |
315 &worker_pool_on_origin_thread_->completed_tasks_); | |
316 } | |
317 | |
318 worker_pool_on_origin_thread_->OnTaskCompleted(); | |
319 } | |
320 | |
321 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { | 215 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
322 lock_.AssertAcquired(); | 216 lock_.AssertAcquired(); |
323 | 217 |
324 if (on_idle_pending_) | 218 if (on_idle_pending_) |
325 return; | 219 return; |
326 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); | 220 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); |
327 on_idle_pending_ = true; | 221 on_idle_pending_ = true; |
328 } | 222 } |
329 | 223 |
330 void WorkerPool::Inner::OnIdleOnOriginThread() { | 224 void WorkerPool::Inner::OnIdleOnOriginThread() { |
(...skipping 13 matching lines...) Expand all Loading... | |
344 | 238 |
345 worker_pool_on_origin_thread_->OnIdle(); | 239 worker_pool_on_origin_thread_->OnIdle(); |
346 } | 240 } |
347 | 241 |
348 void WorkerPool::Inner::Run() { | 242 void WorkerPool::Inner::Run() { |
349 #if defined(OS_ANDROID) | 243 #if defined(OS_ANDROID) |
350 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 244 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
351 int nice_value = 10; // Idle priority. | 245 int nice_value = 10; // Idle priority. |
352 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); | 246 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); |
353 #endif | 247 #endif |
354 { | |
355 base::AutoLock lock(lock_); | |
356 | 248 |
357 // Get a unique thread index. | 249 base::AutoLock lock(lock_); |
358 int thread_index = next_thread_index_++; | |
359 | 250 |
360 while (true) { | 251 // Get a unique thread index. |
361 if (pending_tasks_.empty()) { | 252 int thread_index = next_thread_index_++; |
362 // Exit when shutdown is set and no more tasks are pending. | |
363 if (shutdown_) | |
364 break; | |
365 | 253 |
366 // Schedule an idle callback if requested and not pending. | 254 while (true) { |
367 if (!running_task_count_) | 255 if (pending_tasks_.empty()) { |
368 ScheduleOnIdleWithLockAcquired(); | 256 // Exit when shutdown is set and no more tasks are pending. |
257 if (shutdown_) | |
258 break; | |
369 | 259 |
370 // Wait for new pending tasks. | 260 // Schedule an idle callback if requested and not pending. |
371 has_pending_tasks_cv_.Wait(); | 261 if (!running_task_count_) |
372 continue; | 262 ScheduleOnIdleWithLockAcquired(); |
373 } | |
374 | 263 |
375 // Get next task. | 264 // Wait for new pending tasks. |
376 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); | 265 has_pending_tasks_cv_.Wait(); |
377 | 266 continue; |
378 // Increment |running_task_count_| before starting to run task. | |
379 running_task_count_++; | |
380 | |
381 // There may be more work available, so wake up another | |
382 // worker thread. | |
383 has_pending_tasks_cv_.Signal(); | |
384 | |
385 { | |
386 base::AutoUnlock unlock(lock_); | |
387 | |
388 task->RunOnThread(thread_index); | |
389 } | |
390 | |
391 completed_tasks_.push_back(task.Pass()); | |
392 | |
393 // Decrement |running_task_count_| now that we are done running task. | |
394 running_task_count_--; | |
395 | |
396 // Schedule a task completed callback if requested and not pending. | |
397 ScheduleOnTaskCompletedWithLockAcquired(); | |
398 } | 267 } |
399 | 268 |
400 // We noticed we should exit. Wake up the next worker so it knows it should | 269 // Get next task. |
401 // exit as well (because the Shutdown() code only signals once). | 270 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
271 | |
272 // Increment |running_task_count_| before starting to run task. | |
273 running_task_count_++; | |
274 | |
275 // There may be more work available, so wake up another | |
276 // worker thread. | |
402 has_pending_tasks_cv_.Signal(); | 277 has_pending_tasks_cv_.Signal(); |
278 | |
279 { | |
280 base::AutoUnlock unlock(lock_); | |
281 | |
282 task->RunOnThread(thread_index); | |
283 } | |
284 | |
285 completed_tasks_.push_back(task.Pass()); | |
286 | |
287 // Decrement |running_task_count_| now that we are done running task. | |
288 running_task_count_--; | |
403 } | 289 } |
290 | |
291 // We noticed we should exit. Wake up the next worker so it knows it should | |
292 // exit as well (because the Shutdown() code only signals once). | |
293 has_pending_tasks_cv_.Signal(); | |
404 } | 294 } |
405 | 295 |
406 WorkerPool::WorkerPool(WorkerPoolClient* client, | 296 WorkerPool::WorkerPool(WorkerPoolClient* client, |
407 size_t num_threads, | 297 size_t num_threads, |
408 base::TimeDelta check_for_completed_tasks_delay, | 298 base::TimeDelta check_for_completed_tasks_delay, |
409 const std::string& thread_name_prefix) | 299 const std::string& thread_name_prefix) |
410 : client_(client), | 300 : client_(client), |
411 origin_loop_(base::MessageLoopProxy::current()), | 301 origin_loop_(base::MessageLoopProxy::current()), |
412 weak_ptr_factory_(this), | 302 weak_ptr_factory_(this), |
413 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), | 303 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
414 check_for_completed_tasks_pending_(false), | 304 check_for_completed_tasks_pending_(false), |
415 run_cheap_tasks_callback_( | 305 inner_(make_scoped_ptr(new Inner(this, |
416 base::Bind(&WorkerPool::RunCheapTasks, | 306 num_threads, |
417 weak_ptr_factory_.GetWeakPtr())), | 307 thread_name_prefix))) { |
418 run_cheap_tasks_pending_(false), | |
419 inner_(make_scoped_ptr( | |
420 new Inner( | |
421 this, | |
422 num_threads, | |
423 thread_name_prefix, | |
424 // Request OnTaskCompleted() callback when check | |
425 // for completed tasks delay is 0. | |
426 check_for_completed_tasks_delay == base::TimeDelta()))) { | |
427 } | 308 } |
428 | 309 |
429 WorkerPool::~WorkerPool() { | 310 WorkerPool::~WorkerPool() { |
430 Shutdown(); | 311 Shutdown(); |
431 | 312 |
432 // Cancel all pending callbacks. | 313 // Cancel all pending callbacks. |
433 weak_ptr_factory_.InvalidateWeakPtrs(); | 314 weak_ptr_factory_.InvalidateWeakPtrs(); |
434 | 315 |
435 DCHECK_EQ(0u, completed_tasks_.size()); | 316 DCHECK_EQ(0u, completed_tasks_.size()); |
436 } | 317 } |
437 | 318 |
438 void WorkerPool::Shutdown() { | 319 void WorkerPool::Shutdown() { |
439 inner_->Shutdown(); | 320 inner_->Shutdown(); |
440 DispatchCompletionCallbacks(); | 321 DispatchCompletionCallbacks(); |
441 } | 322 } |
442 | 323 |
443 void WorkerPool::PostTaskAndReply( | 324 void WorkerPool::PostTaskAndReply( |
444 const Callback& task, const base::Closure& reply) { | 325 const Callback& task, const base::Closure& reply) { |
445 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( | 326 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( |
446 task, | 327 task, |
447 reply)).PassAs<internal::WorkerPoolTask>()); | 328 reply)).PassAs<internal::WorkerPoolTask>()); |
448 } | 329 } |
449 | 330 |
450 void WorkerPool::SetRunCheapTasksTimeLimit( | |
451 base::TimeTicks run_cheap_tasks_time_limit) { | |
452 run_cheap_tasks_time_limit_ = run_cheap_tasks_time_limit; | |
453 ScheduleRunCheapTasks(); | |
454 } | |
455 | |
456 void WorkerPool::OnIdle() { | 331 void WorkerPool::OnIdle() { |
457 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); | 332 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
458 | 333 |
459 DispatchCompletionCallbacks(); | 334 DispatchCompletionCallbacks(); |
460 } | 335 } |
461 | 336 |
462 void WorkerPool::OnTaskCompleted() { | |
463 TRACE_EVENT0("cc", "WorkerPool::OnTaskCompleted"); | |
464 | |
465 DispatchCompletionCallbacks(); | |
466 } | |
467 | |
468 void WorkerPool::ScheduleCheckForCompletedTasks() { | 337 void WorkerPool::ScheduleCheckForCompletedTasks() { |
469 if (check_for_completed_tasks_pending_ || | 338 if (check_for_completed_tasks_pending_) |
470 check_for_completed_tasks_delay_ == base::TimeDelta()) | |
471 return; | 339 return; |
472 check_for_completed_tasks_callback_.Reset( | |
473 base::Bind(&WorkerPool::CheckForCompletedTasks, | |
474 weak_ptr_factory_.GetWeakPtr())); | |
475 check_for_completed_tasks_time_ = base::TimeTicks::Now() + | |
476 check_for_completed_tasks_delay_; | |
477 origin_loop_->PostDelayedTask( | 340 origin_loop_->PostDelayedTask( |
478 FROM_HERE, | 341 FROM_HERE, |
479 check_for_completed_tasks_callback_.callback(), | 342 base::Bind(&WorkerPool::CheckForCompletedTasks, |
343 weak_ptr_factory_.GetWeakPtr()), | |
480 check_for_completed_tasks_delay_); | 344 check_for_completed_tasks_delay_); |
481 check_for_completed_tasks_pending_ = true; | 345 check_for_completed_tasks_pending_ = true; |
482 } | 346 } |
483 | 347 |
484 void WorkerPool::CheckForCompletedTasks() { | 348 void WorkerPool::CheckForCompletedTasks() { |
485 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); | 349 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
486 DCHECK(check_for_completed_tasks_pending_); | 350 DCHECK(check_for_completed_tasks_pending_); |
487 check_for_completed_tasks_pending_ = false; | 351 check_for_completed_tasks_pending_ = false; |
488 | 352 |
489 // Schedule another check for completed tasks if not idle. | 353 // Schedule another check for completed tasks if not idle. |
490 if (!inner_->CollectCompletedTasks()) | 354 if (!inner_->CollectCompletedTasks()) |
491 ScheduleCheckForCompletedTasks(); | 355 ScheduleCheckForCompletedTasks(); |
492 | 356 |
493 DispatchCompletionCallbacks(); | 357 DispatchCompletionCallbacks(); |
494 } | 358 } |
495 | 359 |
496 void WorkerPool::CancelCheckForCompletedTasks() { | |
497 if (!check_for_completed_tasks_pending_) | |
498 return; | |
499 | |
500 check_for_completed_tasks_callback_.Cancel(); | |
501 check_for_completed_tasks_pending_ = false; | |
502 } | |
503 | |
504 void WorkerPool::DispatchCompletionCallbacks() { | 360 void WorkerPool::DispatchCompletionCallbacks() { |
505 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); | 361 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
506 | 362 |
507 if (completed_tasks_.empty()) | 363 if (completed_tasks_.empty()) |
508 return; | 364 return; |
509 | 365 |
510 while (completed_tasks_.size()) { | 366 while (completed_tasks_.size()) { |
511 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); | 367 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); |
512 task->DidComplete(); | 368 task->DidComplete(); |
513 } | 369 } |
514 | 370 |
515 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); | 371 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
516 } | 372 } |
517 | 373 |
518 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 374 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
519 bool signal_workers = true; | |
520 if (task->IsCheap()) { | |
521 // To make cheap tasks more likely to run on the origin thread, don't wake | |
522 // workers when posting them. | |
523 signal_workers = false; | |
524 ScheduleRunCheapTasks(); | |
525 } | |
526 | |
527 // Schedule check for completed tasks if not pending. | 375 // Schedule check for completed tasks if not pending. |
528 ScheduleCheckForCompletedTasks(); | 376 ScheduleCheckForCompletedTasks(); |
529 | 377 |
530 inner_->PostTask(task.Pass(), signal_workers); | 378 inner_->PostTask(task.Pass()); |
531 } | |
532 | |
533 void WorkerPool::ScheduleRunCheapTasks() { | |
534 if (run_cheap_tasks_pending_) | |
535 return; | |
536 origin_loop_->PostTask(FROM_HERE, run_cheap_tasks_callback_); | |
537 run_cheap_tasks_pending_ = true; | |
538 } | |
539 | |
540 void WorkerPool::RunCheapTasks() { | |
541 TRACE_EVENT0("cc", "WorkerPool::RunCheapTasks"); | |
542 DCHECK(run_cheap_tasks_pending_); | |
543 run_cheap_tasks_pending_ = false; | |
544 | |
545 while (true) { | |
546 base::TimeTicks time_limit = run_cheap_tasks_time_limit_; | |
547 | |
548 if (!check_for_completed_tasks_time_.is_null()) | |
549 time_limit = std::min(time_limit, check_for_completed_tasks_time_); | |
550 | |
551 bool is_idle = inner_->RunCheapTasksUntilTimeLimit(time_limit); | |
552 | |
553 base::TimeTicks now = base::TimeTicks::Now(); | |
554 if (now >= run_cheap_tasks_time_limit_) { | |
555 TRACE_EVENT_INSTANT0("cc", "WorkerPool::RunCheapTasks out of time", | |
556 TRACE_EVENT_SCOPE_THREAD); | |
557 break; | |
558 } | |
559 | |
560 // We must be out of cheap tasks if this happens. | |
561 if (!check_for_completed_tasks_pending_ || | |
562 now < check_for_completed_tasks_time_) | |
563 break; | |
564 | |
565 TRACE_EVENT_INSTANT0("cc", "WorkerPool::RunCheapTasks check time", | |
566 TRACE_EVENT_SCOPE_THREAD); | |
567 CancelCheckForCompletedTasks(); | |
568 DispatchCompletionCallbacks(); | |
569 // Schedule another check for completed tasks if not idle. | |
570 if (!is_idle) | |
571 ScheduleCheckForCompletedTasks(); | |
572 } | |
573 } | 379 } |
574 | 380 |
575 } // namespace cc | 381 } // namespace cc |
OLD | NEW |