OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "cc/base/worker_pool.h" | 5 #include "cc/base/worker_pool.h" |
6 | 6 |
7 #if defined(OS_ANDROID) | 7 #if defined(OS_ANDROID) |
8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 8 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
9 #include <sys/resource.h> | 9 #include <sys/resource.h> |
10 #endif | 10 #endif |
11 | 11 |
12 #include <algorithm> | 12 #include <set> |
13 | 13 |
14 #include "base/bind.h" | 14 #include "base/bind.h" |
15 #include "base/debug/trace_event.h" | 15 #include "base/debug/trace_event.h" |
16 #include "base/stringprintf.h" | 16 #include "base/stringprintf.h" |
17 #include "base/synchronization/condition_variable.h" | 17 #include "base/synchronization/condition_variable.h" |
18 #include "base/threading/simple_thread.h" | 18 #include "base/threading/simple_thread.h" |
| 19 #include "cc/base/scoped_ptr_deque.h" |
19 | 20 |
20 namespace cc { | 21 namespace cc { |
21 | 22 |
22 namespace { | 23 namespace { |
23 | 24 |
24 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { | 25 class WorkerPoolTaskGraphImpl : public internal::WorkerPoolTaskGraph { |
25 public: | 26 public: |
26 WorkerPoolTaskImpl(const WorkerPool::Callback& task, | 27 WorkerPoolTaskGraphImpl() {} |
27 const base::Closure& reply) | |
28 : internal::WorkerPoolTask(reply), | |
29 task_(task) {} | |
30 | 28 |
31 virtual void RunOnThread(unsigned thread_index) OVERRIDE { | 29 // Overridden from internal::WorkerPoolTaskGraph: |
32 task_.Run(); | 30 virtual bool HasMoreTasks() OVERRIDE { return false; } |
| 31 virtual bool HasTask(internal::WorkerPoolTask* task) OVERRIDE { |
| 32 return false; |
| 33 } |
| 34 virtual internal::WorkerPoolTask* TopTask() OVERRIDE { |
| 35 NOTREACHED(); |
| 36 return NULL; |
| 37 } |
| 38 virtual scoped_refptr<internal::WorkerPoolTask> TakeTask( |
| 39 internal::WorkerPoolTask* task) OVERRIDE { |
| 40 return make_scoped_refptr(task); |
33 } | 41 } |
34 | 42 |
35 private: | 43 private: |
36 WorkerPool::Callback task_; | 44 virtual ~WorkerPoolTaskGraphImpl() {} |
37 }; | 45 }; |
38 | 46 |
39 } // namespace | 47 } // namespace |
40 | 48 |
41 namespace internal { | |
42 | |
43 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) : reply_(reply) { | |
44 } | |
45 | |
46 WorkerPoolTask::~WorkerPoolTask() { | |
47 } | |
48 | |
49 void WorkerPoolTask::DidComplete() { | |
50 reply_.Run(); | |
51 } | |
52 | |
53 } // namespace internal | |
54 | |
55 // Internal to the worker pool. Any data or logic that needs to be | 49 // Internal to the worker pool. Any data or logic that needs to be |
56 // shared between threads lives in this class. All members are guarded | 50 // shared between threads lives in this class. All members are guarded |
57 // by |lock_|. | 51 // by |lock_|. |
58 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { | 52 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { |
59 public: | 53 public: |
60 Inner(WorkerPool* worker_pool, | 54 Inner(WorkerPool* worker_pool, |
61 size_t num_threads, | 55 size_t num_threads, |
62 const std::string& thread_name_prefix); | 56 const std::string& thread_name_prefix); |
63 virtual ~Inner(); | 57 virtual ~Inner(); |
64 | 58 |
65 void Shutdown(); | 59 void Shutdown(); |
66 | 60 |
67 void PostTask(scoped_ptr<internal::WorkerPoolTask> task); | 61 // Schedule running of tasks in |task_graph|. All tasks previously |
| 62 // scheduled but not present in |task_graph| will be canceled unless |
| 63 // already running. Canceled tasks are moved to |completed_tasks_| |
| 64 // without being run. The result is that once scheduled, a task is |
| 65 // guaranteed to end up in the |completed_tasks_| queue even if they |
| 66 // later get canceled by another call to ScheduleTasks(). |
| 67 void ScheduleTasks(scoped_ptr<internal::WorkerPoolTaskGraph> task_graph); |
68 | 68 |
69 // Appends all completed tasks to worker pool's completed tasks queue | 69 // Collect all completed tasks in |completed_tasks|. Returns true if idle. |
70 // and returns true if idle. | 70 bool CollectCompletedTasks(TaskDeque* completed_tasks); |
71 bool CollectCompletedTasks(); | |
72 | 71 |
73 private: | 72 private: |
74 // Appends all completed tasks to |completed_tasks|. Lock must | 73 // Collect all completed tasks by swapping the contents of |
75 // already be acquired before calling this function. | 74 // |completed_tasks| and |completed_tasks_|. Lock must be acquired |
76 bool AppendCompletedTasksWithLockAcquired( | 75 // before calling this function. Returns true if idle. |
77 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks); | 76 bool CollectCompletedTasksWithLockAcquired(TaskDeque* completed_tasks); |
78 | 77 |
79 // Schedule an OnIdleOnOriginThread callback if not already pending. | 78 // Schedule an OnIdleOnOriginThread callback if not already pending. |
80 // Lock must already be acquired before calling this function. | 79 // Lock must already be acquired before calling this function. |
81 void ScheduleOnIdleWithLockAcquired(); | 80 void ScheduleOnIdleWithLockAcquired(); |
82 void OnIdleOnOriginThread(); | 81 void OnIdleOnOriginThread(); |
83 | 82 |
84 // Overridden from base::DelegateSimpleThread: | 83 // Overridden from base::DelegateSimpleThread: |
85 virtual void Run() OVERRIDE; | 84 virtual void Run() OVERRIDE; |
86 | 85 |
87 // Pointer to worker pool. Can only be used on origin thread. | 86 // Pointer to worker pool. Can only be used on origin thread. |
(...skipping 15 matching lines...) Expand all Loading... |
103 base::WeakPtrFactory<Inner> weak_ptr_factory_; | 102 base::WeakPtrFactory<Inner> weak_ptr_factory_; |
104 | 103 |
105 const base::Closure on_idle_callback_; | 104 const base::Closure on_idle_callback_; |
106 // Set when a OnIdleOnOriginThread() callback is pending. | 105 // Set when a OnIdleOnOriginThread() callback is pending. |
107 bool on_idle_pending_; | 106 bool on_idle_pending_; |
108 | 107 |
109 // Provides each running thread loop with a unique index. First thread | 108 // Provides each running thread loop with a unique index. First thread |
110 // loop index is 0. | 109 // loop index is 0. |
111 unsigned next_thread_index_; | 110 unsigned next_thread_index_; |
112 | 111 |
113 // Number of tasks currently running. | |
114 unsigned running_task_count_; | |
115 | |
116 // Set during shutdown. Tells workers to exit when no more tasks | 112 // Set during shutdown. Tells workers to exit when no more tasks |
117 // are pending. | 113 // are pending. |
118 bool shutdown_; | 114 bool shutdown_; |
119 | 115 |
120 typedef ScopedPtrDeque<internal::WorkerPoolTask> TaskDeque; | 116 // The task graph. Provides tasks in order of priority. |
121 TaskDeque pending_tasks_; | 117 scoped_ptr<internal::WorkerPoolTaskGraph> task_graph_; |
| 118 |
| 119 // This set contains all currently running tasks. |
| 120 typedef std::set<internal::WorkerPoolTask*> TaskSet; |
| 121 TaskSet running_tasks_; |
| 122 |
| 123 // Completed tasks not yet collected by origin thread. |
122 TaskDeque completed_tasks_; | 124 TaskDeque completed_tasks_; |
123 | 125 |
124 ScopedPtrDeque<base::DelegateSimpleThread> workers_; | 126 ScopedPtrDeque<base::DelegateSimpleThread> workers_; |
125 | 127 |
126 DISALLOW_COPY_AND_ASSIGN(Inner); | 128 DISALLOW_COPY_AND_ASSIGN(Inner); |
127 }; | 129 }; |
128 | 130 |
129 WorkerPool::Inner::Inner(WorkerPool* worker_pool, | 131 WorkerPool::Inner::Inner(WorkerPool* worker_pool, |
130 size_t num_threads, | 132 size_t num_threads, |
131 const std::string& thread_name_prefix) | 133 const std::string& thread_name_prefix) |
132 : worker_pool_on_origin_thread_(worker_pool), | 134 : worker_pool_on_origin_thread_(worker_pool), |
133 lock_(), | 135 lock_(), |
134 has_pending_tasks_cv_(&lock_), | 136 has_pending_tasks_cv_(&lock_), |
135 origin_loop_(base::MessageLoopProxy::current()), | 137 origin_loop_(base::MessageLoopProxy::current()), |
136 weak_ptr_factory_(this), | 138 weak_ptr_factory_(this), |
137 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, | 139 on_idle_callback_(base::Bind(&WorkerPool::Inner::OnIdleOnOriginThread, |
138 weak_ptr_factory_.GetWeakPtr())), | 140 weak_ptr_factory_.GetWeakPtr())), |
139 on_idle_pending_(false), | 141 on_idle_pending_(false), |
140 next_thread_index_(0), | 142 next_thread_index_(0), |
141 running_task_count_(0), | 143 shutdown_(false), |
142 shutdown_(false) { | 144 task_graph_(new WorkerPoolTaskGraphImpl) { |
143 base::AutoLock lock(lock_); | 145 base::AutoLock lock(lock_); |
144 | 146 |
145 while (workers_.size() < num_threads) { | 147 while (workers_.size() < num_threads) { |
146 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( | 148 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( |
147 new base::DelegateSimpleThread( | 149 new base::DelegateSimpleThread( |
148 this, | 150 this, |
149 thread_name_prefix + | 151 thread_name_prefix + |
150 base::StringPrintf( | 152 base::StringPrintf( |
151 "Worker%u", | 153 "Worker%u", |
152 static_cast<unsigned>(workers_.size() + 1)).c_str())); | 154 static_cast<unsigned>(workers_.size() + 1)).c_str())); |
153 worker->Start(); | 155 worker->Start(); |
154 workers_.push_back(worker.Pass()); | 156 workers_.push_back(worker.Pass()); |
155 } | 157 } |
156 } | 158 } |
157 | 159 |
158 WorkerPool::Inner::~Inner() { | 160 WorkerPool::Inner::~Inner() { |
159 base::AutoLock lock(lock_); | 161 base::AutoLock lock(lock_); |
160 | 162 |
161 DCHECK(shutdown_); | 163 DCHECK(shutdown_); |
162 | 164 |
163 // Cancel all pending callbacks. | 165 DCHECK(!task_graph_->HasMoreTasks()); |
164 weak_ptr_factory_.InvalidateWeakPtrs(); | 166 DCHECK_EQ(0u, running_tasks_.size()); |
165 | |
166 DCHECK_EQ(0u, pending_tasks_.size()); | |
167 DCHECK_EQ(0u, completed_tasks_.size()); | 167 DCHECK_EQ(0u, completed_tasks_.size()); |
168 DCHECK_EQ(0u, running_task_count_); | |
169 } | 168 } |
170 | 169 |
171 void WorkerPool::Inner::Shutdown() { | 170 void WorkerPool::Inner::Shutdown() { |
172 { | 171 { |
173 base::AutoLock lock(lock_); | 172 base::AutoLock lock(lock_); |
174 | 173 |
175 DCHECK(!shutdown_); | 174 DCHECK(!shutdown_); |
176 shutdown_ = true; | 175 shutdown_ = true; |
177 | 176 |
178 // Wake up a worker so it knows it should exit. This will cause all workers | 177 // Wake up a worker so it knows it should exit. This will cause all workers |
179 // to exit as each will wake up another worker before exiting. | 178 // to exit as each will wake up another worker before exiting. |
180 has_pending_tasks_cv_.Signal(); | 179 has_pending_tasks_cv_.Signal(); |
181 } | 180 } |
182 | 181 |
183 while (workers_.size()) { | 182 while (workers_.size()) { |
184 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); | 183 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); |
185 worker->Join(); | 184 worker->Join(); |
186 } | 185 } |
| 186 |
| 187 // Cancel any pending OnIdle callback. |
| 188 weak_ptr_factory_.InvalidateWeakPtrs(); |
187 } | 189 } |
188 | 190 |
189 void WorkerPool::Inner::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 191 void WorkerPool::Inner::ScheduleTasks( |
| 192 scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) { |
190 base::AutoLock lock(lock_); | 193 base::AutoLock lock(lock_); |
191 | 194 |
192 pending_tasks_.push_back(task.Pass()); | 195 // Move tasks not present in new graph to |completed_tasks_|. |
| 196 while (task_graph_->HasMoreTasks()) { |
| 197 scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask( |
| 198 task_graph_->TopTask()); |
| 199 |
| 200 // Task has not completed if present in new graph. |
| 201 if (task_graph->HasTask(task)) |
| 202 continue; |
| 203 |
| 204 completed_tasks_.push_back(task); |
| 205 } |
| 206 |
| 207 // Take any running tasks from new graph. |
| 208 for (TaskSet::iterator it = running_tasks_.begin(); |
| 209 it != running_tasks_.end(); ++it) |
| 210 task_graph->TakeTask(*it); |
| 211 |
| 212 // And take any completed tasks from new graph. |
| 213 for (TaskDeque::iterator it = completed_tasks_.begin(); |
| 214 it != completed_tasks_.end(); ++it) |
| 215 task_graph->TakeTask(*it); |
| 216 |
| 217 // Finally switch to the new graph. |
| 218 task_graph_ = task_graph.Pass(); |
193 | 219 |
194 // There is more work available, so wake up worker thread. | 220 // There is more work available, so wake up worker thread. |
195 has_pending_tasks_cv_.Signal(); | 221 has_pending_tasks_cv_.Signal(); |
196 } | 222 } |
197 | 223 |
198 bool WorkerPool::Inner::CollectCompletedTasks() { | 224 bool WorkerPool::Inner::CollectCompletedTasks(TaskDeque* completed_tasks) { |
199 base::AutoLock lock(lock_); | 225 base::AutoLock lock(lock_); |
200 | 226 |
201 return AppendCompletedTasksWithLockAcquired( | 227 return CollectCompletedTasksWithLockAcquired(completed_tasks); |
202 &worker_pool_on_origin_thread_->completed_tasks_); | |
203 } | 228 } |
204 | 229 |
205 bool WorkerPool::Inner::AppendCompletedTasksWithLockAcquired( | 230 bool WorkerPool::Inner::CollectCompletedTasksWithLockAcquired( |
206 ScopedPtrDeque<internal::WorkerPoolTask>* completed_tasks) { | 231 TaskDeque* completed_tasks) { |
207 lock_.AssertAcquired(); | 232 lock_.AssertAcquired(); |
208 | 233 |
209 while (completed_tasks_.size()) | 234 DCHECK_EQ(0u, completed_tasks->size()); |
210 completed_tasks->push_back(completed_tasks_.take_front().Pass()); | 235 completed_tasks->swap(completed_tasks_); |
211 | 236 |
212 return !running_task_count_ && pending_tasks_.empty(); | 237 return running_tasks_.empty() && !task_graph_->HasMoreTasks(); |
213 } | 238 } |
214 | 239 |
215 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { | 240 void WorkerPool::Inner::ScheduleOnIdleWithLockAcquired() { |
216 lock_.AssertAcquired(); | 241 lock_.AssertAcquired(); |
217 | 242 |
218 if (on_idle_pending_) | 243 if (on_idle_pending_) |
219 return; | 244 return; |
220 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); | 245 origin_loop_->PostTask(FROM_HERE, on_idle_callback_); |
221 on_idle_pending_ = true; | 246 on_idle_pending_ = true; |
222 } | 247 } |
223 | 248 |
224 void WorkerPool::Inner::OnIdleOnOriginThread() { | 249 void WorkerPool::Inner::OnIdleOnOriginThread() { |
| 250 TaskDeque completed_tasks; |
| 251 |
225 { | 252 { |
226 base::AutoLock lock(lock_); | 253 base::AutoLock lock(lock_); |
227 | 254 |
228 DCHECK(on_idle_pending_); | 255 DCHECK(on_idle_pending_); |
229 on_idle_pending_ = false; | 256 on_idle_pending_ = false; |
230 | 257 |
231 // Early out if no longer idle. | 258 // Early out if no longer idle. |
232 if (running_task_count_ || !pending_tasks_.empty()) | 259 if (!running_tasks_.empty() || task_graph_->HasMoreTasks()) |
233 return; | 260 return; |
234 | 261 |
235 AppendCompletedTasksWithLockAcquired( | 262 CollectCompletedTasksWithLockAcquired(&completed_tasks); |
236 &worker_pool_on_origin_thread_->completed_tasks_); | |
237 } | 263 } |
238 | 264 |
239 worker_pool_on_origin_thread_->OnIdle(); | 265 worker_pool_on_origin_thread_->OnIdle(&completed_tasks); |
240 } | 266 } |
241 | 267 |
242 void WorkerPool::Inner::Run() { | 268 void WorkerPool::Inner::Run() { |
243 #if defined(OS_ANDROID) | 269 #if defined(OS_ANDROID) |
244 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 270 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
245 int nice_value = 10; // Idle priority. | 271 int nice_value = 10; // Idle priority. |
246 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); | 272 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); |
247 #endif | 273 #endif |
248 | 274 |
249 base::AutoLock lock(lock_); | 275 base::AutoLock lock(lock_); |
250 | 276 |
251 // Get a unique thread index. | 277 // Get a unique thread index. |
252 int thread_index = next_thread_index_++; | 278 int thread_index = next_thread_index_++; |
253 | 279 |
254 while (true) { | 280 while (true) { |
255 if (pending_tasks_.empty()) { | 281 if (!task_graph_->HasMoreTasks()) { |
256 // Exit when shutdown is set and no more tasks are pending. | 282 // Exit when shutdown is set and no more tasks are pending. |
257 if (shutdown_) | 283 if (shutdown_) |
258 break; | 284 break; |
259 | 285 |
260 // Schedule an idle callback if requested and not pending. | 286 // Schedule an idle callback if not tasks are running. |
261 if (!running_task_count_) | 287 if (running_tasks_.empty()) |
262 ScheduleOnIdleWithLockAcquired(); | 288 ScheduleOnIdleWithLockAcquired(); |
263 | 289 |
264 // Wait for new pending tasks. | 290 // Wait for more tasks. |
265 has_pending_tasks_cv_.Wait(); | 291 has_pending_tasks_cv_.Wait(); |
266 continue; | 292 continue; |
267 } | 293 } |
268 | 294 |
269 // Get next task. | 295 // Take the highest priority task from the task graph. |
270 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); | 296 scoped_refptr<internal::WorkerPoolTask> task = task_graph_->TakeTask( |
| 297 task_graph_->TopTask()); |
271 | 298 |
272 // Increment |running_task_count_| before starting to run task. | 299 // Insert task in |running_tasks_| before starting to run it. |
273 running_task_count_++; | 300 running_tasks_.insert(task); |
274 | 301 |
275 // There may be more work available, so wake up another | 302 // There may be more work available, so wake up another worker thread. |
276 // worker thread. | |
277 has_pending_tasks_cv_.Signal(); | 303 has_pending_tasks_cv_.Signal(); |
278 | 304 |
279 { | 305 { |
280 base::AutoUnlock unlock(lock_); | 306 base::AutoUnlock unlock(lock_); |
281 | 307 |
282 task->RunOnThread(thread_index); | 308 task->RunOnThread(thread_index); |
283 } | 309 } |
284 | 310 |
285 completed_tasks_.push_back(task.Pass()); | 311 // Remove task from |running_tasks_| now that we are done running it. |
| 312 running_tasks_.erase(task); |
286 | 313 |
287 // Decrement |running_task_count_| now that we are done running task. | 314 // Finally add task to |completed_tasks_|. |
288 running_task_count_--; | 315 completed_tasks_.push_back(task); |
289 } | 316 } |
290 | 317 |
291 // We noticed we should exit. Wake up the next worker so it knows it should | 318 // We noticed we should exit. Wake up the next worker so it knows it should |
292 // exit as well (because the Shutdown() code only signals once). | 319 // exit as well (because the Shutdown() code only signals once). |
293 has_pending_tasks_cv_.Signal(); | 320 has_pending_tasks_cv_.Signal(); |
294 } | 321 } |
295 | 322 |
296 WorkerPool::WorkerPool(WorkerPoolClient* client, | 323 WorkerPool::WorkerPool(WorkerPoolClient* client, |
297 size_t num_threads, | 324 size_t num_threads, |
298 base::TimeDelta check_for_completed_tasks_delay, | 325 base::TimeDelta check_for_completed_tasks_delay, |
299 const std::string& thread_name_prefix) | 326 const std::string& thread_name_prefix) |
300 : client_(client), | 327 : client_(client), |
301 origin_loop_(base::MessageLoopProxy::current()), | 328 origin_loop_(base::MessageLoopProxy::current()), |
302 weak_ptr_factory_(this), | |
303 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), | 329 check_for_completed_tasks_delay_(check_for_completed_tasks_delay), |
304 check_for_completed_tasks_pending_(false), | 330 check_for_completed_tasks_pending_(false), |
305 inner_(make_scoped_ptr(new Inner(this, | 331 inner_(make_scoped_ptr(new Inner(this, |
306 num_threads, | 332 num_threads, |
307 thread_name_prefix))) { | 333 thread_name_prefix))) { |
308 } | 334 } |
309 | 335 |
310 WorkerPool::~WorkerPool() { | 336 WorkerPool::~WorkerPool() { |
311 // Cancel all pending callbacks. | |
312 weak_ptr_factory_.InvalidateWeakPtrs(); | |
313 | |
314 DCHECK_EQ(0u, completed_tasks_.size()); | |
315 } | 337 } |
316 | 338 |
317 void WorkerPool::Shutdown() { | 339 void WorkerPool::Shutdown() { |
318 inner_->Shutdown(); | 340 inner_->Shutdown(); |
319 inner_->CollectCompletedTasks(); | 341 |
320 DispatchCompletionCallbacks(); | 342 TaskDeque completed_tasks; |
| 343 inner_->CollectCompletedTasks(&completed_tasks); |
| 344 DispatchCompletionCallbacks(&completed_tasks); |
321 } | 345 } |
322 | 346 |
323 void WorkerPool::PostTaskAndReply( | 347 void WorkerPool::OnIdle(TaskDeque* completed_tasks) { |
324 const Callback& task, const base::Closure& reply) { | |
325 PostTask(make_scoped_ptr(new WorkerPoolTaskImpl( | |
326 task, | |
327 reply)).PassAs<internal::WorkerPoolTask>()); | |
328 } | |
329 | |
330 void WorkerPool::OnIdle() { | |
331 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); | 348 TRACE_EVENT0("cc", "WorkerPool::OnIdle"); |
332 | 349 |
333 DispatchCompletionCallbacks(); | 350 DispatchCompletionCallbacks(completed_tasks); |
| 351 |
| 352 // Cancel any pending check for completed tasks. |
| 353 check_for_completed_tasks_callback_.Cancel(); |
| 354 check_for_completed_tasks_pending_ = false; |
334 } | 355 } |
335 | 356 |
336 void WorkerPool::ScheduleCheckForCompletedTasks() { | 357 void WorkerPool::ScheduleCheckForCompletedTasks() { |
337 if (check_for_completed_tasks_pending_) | 358 if (check_for_completed_tasks_pending_) |
338 return; | 359 return; |
| 360 check_for_completed_tasks_callback_.Reset( |
| 361 base::Bind(&WorkerPool::CheckForCompletedTasks, |
| 362 base::Unretained(this))); |
339 origin_loop_->PostDelayedTask( | 363 origin_loop_->PostDelayedTask( |
340 FROM_HERE, | 364 FROM_HERE, |
341 base::Bind(&WorkerPool::CheckForCompletedTasks, | 365 check_for_completed_tasks_callback_.callback(), |
342 weak_ptr_factory_.GetWeakPtr()), | |
343 check_for_completed_tasks_delay_); | 366 check_for_completed_tasks_delay_); |
344 check_for_completed_tasks_pending_ = true; | 367 check_for_completed_tasks_pending_ = true; |
345 } | 368 } |
346 | 369 |
347 void WorkerPool::CheckForCompletedTasks() { | 370 void WorkerPool::CheckForCompletedTasks() { |
348 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); | 371 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); |
349 DCHECK(check_for_completed_tasks_pending_); | 372 DCHECK(check_for_completed_tasks_pending_); |
350 check_for_completed_tasks_pending_ = false; | 373 check_for_completed_tasks_pending_ = false; |
351 | 374 |
| 375 TaskDeque completed_tasks; |
| 376 |
352 // Schedule another check for completed tasks if not idle. | 377 // Schedule another check for completed tasks if not idle. |
353 if (!inner_->CollectCompletedTasks()) | 378 if (!inner_->CollectCompletedTasks(&completed_tasks)) |
354 ScheduleCheckForCompletedTasks(); | 379 ScheduleCheckForCompletedTasks(); |
355 | 380 |
356 DispatchCompletionCallbacks(); | 381 DispatchCompletionCallbacks(&completed_tasks); |
357 } | 382 } |
358 | 383 |
359 void WorkerPool::DispatchCompletionCallbacks() { | 384 void WorkerPool::DispatchCompletionCallbacks(TaskDeque* completed_tasks) { |
360 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); | 385 TRACE_EVENT0("cc", "WorkerPool::DispatchCompletionCallbacks"); |
361 | 386 |
362 if (completed_tasks_.empty()) | 387 // Early out when |completed_tasks| is empty to prevent unnecessary |
| 388 // call to DidFinishDispatchingWorkerPoolCompletionCallbacks(). |
| 389 if (completed_tasks->empty()) |
363 return; | 390 return; |
364 | 391 |
365 while (completed_tasks_.size()) { | 392 while (completed_tasks->size()) { |
366 scoped_ptr<internal::WorkerPoolTask> task = completed_tasks_.take_front(); | 393 scoped_refptr<internal::WorkerPoolTask> task = completed_tasks->front(); |
367 task->DidComplete(); | 394 completed_tasks->pop_front(); |
| 395 task->DispatchCompletionCallback(); |
368 } | 396 } |
369 | 397 |
370 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); | 398 client_->DidFinishDispatchingWorkerPoolCompletionCallbacks(); |
371 } | 399 } |
372 | 400 |
373 void WorkerPool::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 401 void WorkerPool::ScheduleTasks( |
374 // Schedule check for completed tasks if not pending. | 402 scoped_ptr<internal::WorkerPoolTaskGraph> task_graph) { |
375 ScheduleCheckForCompletedTasks(); | 403 if (task_graph->HasMoreTasks()) |
376 | 404 ScheduleCheckForCompletedTasks(); |
377 inner_->PostTask(task.Pass()); | 405 inner_->ScheduleTasks(task_graph.Pass()); |
378 } | 406 } |
379 | 407 |
380 } // namespace cc | 408 } // namespace cc |
OLD | NEW |