Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(85)

Side by Side Diff: cc/resources/task_graph_runner.cc

Issue 141163019: Re-land: cc: Remove WorkerPool class and instead use TaskGraphRunner directly. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: build fix Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "cc/resources/worker_pool.h" 5 #include "cc/resources/task_graph_runner.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/bind.h"
10 #include "base/containers/hash_tables.h" 9 #include "base/containers/hash_tables.h"
11 #include "base/debug/trace_event.h" 10 #include "base/debug/trace_event.h"
12 #include "base/lazy_instance.h"
13 #include "base/memory/linked_ptr.h"
14 #include "base/strings/stringprintf.h" 11 #include "base/strings/stringprintf.h"
15 #include "base/synchronization/condition_variable.h"
16 #include "base/threading/simple_thread.h"
17 #include "base/threading/thread_restrictions.h" 12 #include "base/threading/thread_restrictions.h"
18 #include "cc/base/scoped_ptr_deque.h"
19 13
20 namespace cc { 14 namespace cc {
15 namespace internal {
21 16
22 namespace { 17 Task::Task()
18 : did_schedule_(false),
19 did_run_(false) {
20 }
23 21
24 // TaskGraphRunners can process task graphs from multiple 22 Task::~Task() {
25 // workerpool instances. All members are guarded by |lock_|. 23 DCHECK(!did_run_ || did_schedule_);
26 class TaskGraphRunner : public base::DelegateSimpleThread::Delegate { 24 }
27 public:
28 typedef WorkerPool::TaskGraph TaskGraph;
29 typedef WorkerPool::TaskVector TaskVector;
30 25
31 TaskGraphRunner(size_t num_threads, const std::string& thread_name_prefix); 26 void Task::DidSchedule() {
32 virtual ~TaskGraphRunner(); 27 did_schedule_ = true;
28 }
33 29
34 void Register(const WorkerPool* worker_pool); 30 void Task::WillRun() {
35 void Unregister(const WorkerPool* worker_pool); 31 DCHECK(did_schedule_);
36 // Schedule running of tasks in |graph|. Tasks previously scheduled but 32 DCHECK(!did_run_);
37 // no longer needed will be canceled unless already running. Canceled 33 }
38 // tasks are moved to |completed_tasks| without being run. The result
39 // is that once scheduled, a task is guaranteed to end up in the
40 // |completed_tasks| queue even if it later get canceled by another
41 // call to SetTaskGraph().
42 void SetTaskGraph(const WorkerPool* worker_pool, TaskGraph* graph);
43 34
44 // Wait for all scheduled tasks to finish running. 35 void Task::DidRun() {
45 void WaitForTasksToFinishRunning(const WorkerPool* worker_pool); 36 did_run_ = true;
37 }
46 38
47 // Collect all completed tasks in |completed_tasks|. 39 bool Task::HasFinishedRunning() const {
48 void CollectCompletedTasks(const WorkerPool* worker_pool, 40 return did_run_;
49 TaskVector* completed_tasks); 41 }
50 42
51 private: 43 GraphNode::GraphNode(Task* task, unsigned priority)
52 static bool CompareTaskPriority(const internal::GraphNode* a, 44 : task_(task),
53 const internal::GraphNode* b) { 45 priority_(priority),
54 // In this system, numerically lower priority is run first. 46 num_dependencies_(0) {
55 if (a->priority() != b->priority()) 47 }
56 return a->priority() > b->priority();
57 48
58 // Run task with most dependents first when priority is the same. 49 GraphNode::~GraphNode() {}
59 return a->dependents().size() < b->dependents().size();
60 }
61 50
62 struct TaskNamespace { 51 TaskGraphRunner::TaskNamespace::TaskNamespace() {}
63 // This set contains all pending tasks.
64 TaskGraph pending_tasks;
65 // This set contains all currently running tasks.
66 TaskGraph running_tasks;
67 // Completed tasks not yet collected by origin thread.
68 TaskVector completed_tasks;
69 // Ordered set of tasks that are ready to run.
70 internal::GraphNode::Vector ready_to_run_tasks;
71 };
72 52
73 static bool CompareTaskNamespacePriority(const TaskNamespace* a, 53 TaskGraphRunner::TaskNamespace::~TaskNamespace() {}
74 const TaskNamespace* b) {
75 DCHECK(!a->ready_to_run_tasks.empty());
76 DCHECK(!b->ready_to_run_tasks.empty());
77
78 // Compare based on task priority of the ready_to_run_tasks heap
79 // .front() will hold the max element of the heap,
80 // except after pop_heap, when max element is moved to .back().
81 return CompareTaskPriority(a->ready_to_run_tasks.front(),
82 b->ready_to_run_tasks.front());
83 }
84
85 typedef std::map<const WorkerPool*, linked_ptr<TaskNamespace> >
86 TaskNamespaceMap;
87
88 // Overridden from base::DelegateSimpleThread:
89 virtual void Run() OVERRIDE;
90
91 inline bool has_finished_running_tasks(TaskNamespace* task_namespace) {
92 return (task_namespace->pending_tasks.empty() &&
93 task_namespace->running_tasks.empty());
94 }
95
96 // This lock protects all members of this class except
97 // |worker_pool_on_origin_thread_|. Do not read or modify anything
98 // without holding this lock. Do not block while holding this lock.
99 mutable base::Lock lock_;
100
101 // Condition variable that is waited on by worker threads until new
102 // tasks are ready to run or shutdown starts.
103 base::ConditionVariable has_ready_to_run_tasks_cv_;
104
105 // Condition variable that is waited on by origin threads until a
106 // namespace has finished running all associated tasks.
107 base::ConditionVariable has_namespaces_with_finished_running_tasks_cv_;
108
109 // Provides each running thread loop with a unique index. First thread
110 // loop index is 0.
111 unsigned next_thread_index_;
112
113 // Set during shutdown. Tells workers to exit when no more tasks
114 // are pending.
115 bool shutdown_;
116
117 // This set contains all registered namespaces.
118 TaskNamespaceMap namespaces_;
119
120 // Ordered set of task namespaces that have ready to run tasks.
121 std::vector<TaskNamespace*> ready_to_run_namespaces_;
122
123 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
124
125 DISALLOW_COPY_AND_ASSIGN(TaskGraphRunner);
126 };
127 54
128 TaskGraphRunner::TaskGraphRunner( 55 TaskGraphRunner::TaskGraphRunner(
129 size_t num_threads, const std::string& thread_name_prefix) 56 size_t num_threads, const std::string& thread_name_prefix)
130 : lock_(), 57 : lock_(),
131 has_ready_to_run_tasks_cv_(&lock_), 58 has_ready_to_run_tasks_cv_(&lock_),
132 has_namespaces_with_finished_running_tasks_cv_(&lock_), 59 has_namespaces_with_finished_running_tasks_cv_(&lock_),
133 next_thread_index_(0), 60 next_namespace_id_(1),
61 next_thread_index_(0u),
134 shutdown_(false) { 62 shutdown_(false) {
135 base::AutoLock lock(lock_); 63 base::AutoLock lock(lock_);
136 64
137 while (workers_.size() < num_threads) { 65 while (workers_.size() < num_threads) {
138 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 66 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
139 new base::DelegateSimpleThread( 67 new base::DelegateSimpleThread(
140 this, 68 this,
141 thread_name_prefix + 69 thread_name_prefix +
142 base::StringPrintf( 70 base::StringPrintf(
143 "Worker%u", 71 "Worker%u",
(...skipping 16 matching lines...) Expand all
160 DCHECK(!shutdown_); 88 DCHECK(!shutdown_);
161 shutdown_ = true; 89 shutdown_ = true;
162 90
163 // Wake up a worker so it knows it should exit. This will cause all workers 91 // Wake up a worker so it knows it should exit. This will cause all workers
164 // to exit as each will wake up another worker before exiting. 92 // to exit as each will wake up another worker before exiting.
165 has_ready_to_run_tasks_cv_.Signal(); 93 has_ready_to_run_tasks_cv_.Signal();
166 } 94 }
167 95
168 while (workers_.size()) { 96 while (workers_.size()) {
169 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 97 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
170 // http://crbug.com/240453 - Join() is considered IO and will block this 98 // Join() is considered IO and will block this thread.
171 // thread. See also http://crbug.com/239423 for further ideas.
172 base::ThreadRestrictions::ScopedAllowIO allow_io; 99 base::ThreadRestrictions::ScopedAllowIO allow_io;
173 worker->Join(); 100 worker->Join();
174 } 101 }
175 } 102 }
176 103
177 void TaskGraphRunner::Register(const WorkerPool* worker_pool) { 104 NamespaceToken TaskGraphRunner::GetNamespaceToken() {
178 base::AutoLock lock(lock_); 105 base::AutoLock lock(lock_);
179 106
180 DCHECK(namespaces_.find(worker_pool) == namespaces_.end()); 107 NamespaceToken token(next_namespace_id_++);
181 linked_ptr<TaskNamespace> task_set = make_linked_ptr(new TaskNamespace()); 108 DCHECK(namespaces_.find(token.id_) == namespaces_.end());
182 namespaces_[worker_pool] = task_set; 109 return token;
183 } 110 }
184 111
185 void TaskGraphRunner::Unregister(const WorkerPool* worker_pool) { 112 void TaskGraphRunner::WaitForTasksToFinishRunning(NamespaceToken token) {
186 base::AutoLock lock(lock_); 113 base::AutoLock lock(lock_);
187 114
188 DCHECK(namespaces_.find(worker_pool) != namespaces_.end()); 115 DCHECK(token.IsValid());
189 DCHECK_EQ(0u, namespaces_[worker_pool]->pending_tasks.size()); 116 TaskNamespaceMap::iterator it = namespaces_.find(token.id_);
190 DCHECK_EQ(0u, namespaces_[worker_pool]->ready_to_run_tasks.size()); 117 if (it == namespaces_.end())
191 DCHECK_EQ(0u, namespaces_[worker_pool]->running_tasks.size()); 118 return;
192 DCHECK_EQ(0u, namespaces_[worker_pool]->completed_tasks.size());
193 119
194 namespaces_.erase(worker_pool); 120 TaskNamespace* task_namespace = it->second;
195 } 121 while (!HasFinishedRunningTasksInNamespace(task_namespace))
196
197 void TaskGraphRunner::WaitForTasksToFinishRunning(
198 const WorkerPool* worker_pool) {
199 base::AutoLock lock(lock_);
200
201 DCHECK(namespaces_.find(worker_pool) != namespaces_.end());
202 TaskNamespace* task_namespace = namespaces_[worker_pool].get();
203
204 while (!has_finished_running_tasks(task_namespace))
205 has_namespaces_with_finished_running_tasks_cv_.Wait(); 122 has_namespaces_with_finished_running_tasks_cv_.Wait();
206 123
207 // There may be other namespaces that have finished running 124 // There may be other namespaces that have finished running
208 // tasks, so wake up another origin thread. 125 // tasks, so wake up another origin thread.
209 has_namespaces_with_finished_running_tasks_cv_.Signal(); 126 has_namespaces_with_finished_running_tasks_cv_.Signal();
210 } 127 }
211 128
212 void TaskGraphRunner::SetTaskGraph(const WorkerPool* worker_pool, 129 void TaskGraphRunner::SetTaskGraph(NamespaceToken token, TaskGraph* graph) {
213 TaskGraph* graph) { 130 DCHECK(token.IsValid());
131
214 TaskGraph new_pending_tasks; 132 TaskGraph new_pending_tasks;
215 TaskGraph new_running_tasks; 133 TaskGraph new_running_tasks;
216 134
217 new_pending_tasks.swap(*graph); 135 new_pending_tasks.swap(*graph);
218 136
219 { 137 {
220 base::AutoLock lock(lock_); 138 base::AutoLock lock(lock_);
221 139
222 DCHECK(!shutdown_); 140 DCHECK(!shutdown_);
223 DCHECK(namespaces_.find(worker_pool) != namespaces_.end()); 141
224 TaskNamespace* task_namespace = namespaces_[worker_pool].get(); 142 scoped_ptr<TaskNamespace> task_namespace = namespaces_.take_and_erase(
143 token.id_);
144
145 // Create task namespace if it doesn't exist.
146 if (!task_namespace)
147 task_namespace.reset(new TaskNamespace);
225 148
226 // First remove all completed tasks from |new_pending_tasks| and 149 // First remove all completed tasks from |new_pending_tasks| and
227 // adjust number of dependencies. 150 // adjust number of dependencies.
228 for (TaskVector::iterator it = task_namespace->completed_tasks.begin(); 151 for (Task::Vector::iterator it = task_namespace->completed_tasks.begin();
229 it != task_namespace->completed_tasks.end(); ++it) { 152 it != task_namespace->completed_tasks.end(); ++it) {
230 internal::WorkerPoolTask* task = it->get(); 153 Task* task = it->get();
231 154
232 scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase( 155 scoped_ptr<GraphNode> node = new_pending_tasks.take_and_erase(task);
233 task);
234 if (node) { 156 if (node) {
235 for (internal::GraphNode::Vector::const_iterator it = 157 for (GraphNode::Vector::const_iterator it = node->dependents().begin();
236 node->dependents().begin();
237 it != node->dependents().end(); ++it) { 158 it != node->dependents().end(); ++it) {
238 internal::GraphNode* dependent_node = *it; 159 GraphNode* dependent_node = *it;
239 dependent_node->remove_dependency(); 160 dependent_node->remove_dependency();
240 } 161 }
241 } 162 }
242 } 163 }
243 164
244 // Build new running task set. 165 // Build new running task set.
245 for (TaskGraph::iterator it = task_namespace->running_tasks.begin(); 166 for (TaskGraph::iterator it = task_namespace->running_tasks.begin();
246 it != task_namespace->running_tasks.end(); ++it) { 167 it != task_namespace->running_tasks.end(); ++it) {
247 internal::WorkerPoolTask* task = it->first; 168 Task* task = it->first;
248 // Transfer scheduled task value from |new_pending_tasks| to 169 // Transfer scheduled task value from |new_pending_tasks| to
249 // |new_running_tasks| if currently running. Value must be set to 170 // |new_running_tasks| if currently running. Value must be set to
250 // NULL if |new_pending_tasks| doesn't contain task. This does 171 // NULL if |new_pending_tasks| doesn't contain task. This does
251 // the right in both cases. 172 // the right in both cases.
252 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); 173 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
253 } 174 }
254 175
255 // Build new "ready to run" tasks queue. 176 // Build new "ready to run" tasks queue.
256 task_namespace->ready_to_run_tasks.clear(); 177 task_namespace->ready_to_run_tasks.clear();
257 for (TaskGraph::iterator it = new_pending_tasks.begin(); 178 for (TaskGraph::iterator it = new_pending_tasks.begin();
258 it != new_pending_tasks.end(); ++it) { 179 it != new_pending_tasks.end(); ++it) {
259 internal::WorkerPoolTask* task = it->first; 180 Task* task = it->first;
260 DCHECK(task); 181 DCHECK(task);
261 internal::GraphNode* node = it->second; 182 GraphNode* node = it->second;
262 183
263 // Completed tasks should not exist in |new_pending_tasks|. 184 // Completed tasks should not exist in |new_pending_tasks|.
264 DCHECK(!task->HasFinishedRunning()); 185 DCHECK(!task->HasFinishedRunning());
265 186
266 // Call DidSchedule() to indicate that this task has been scheduled. 187 // Call DidSchedule() to indicate that this task has been scheduled.
267 // Note: This is only for debugging purposes. 188 // Note: This is only for debugging purposes.
268 task->DidSchedule(); 189 task->DidSchedule();
269 190
270 if (!node->num_dependencies()) 191 if (!node->num_dependencies())
271 task_namespace->ready_to_run_tasks.push_back(node); 192 task_namespace->ready_to_run_tasks.push_back(node);
(...skipping 22 matching lines...) Expand all
294 // Note: old tasks are intentionally destroyed after releasing |lock_|. 215 // Note: old tasks are intentionally destroyed after releasing |lock_|.
295 task_namespace->pending_tasks.swap(new_pending_tasks); 216 task_namespace->pending_tasks.swap(new_pending_tasks);
296 task_namespace->running_tasks.swap(new_running_tasks); 217 task_namespace->running_tasks.swap(new_running_tasks);
297 218
298 // If |ready_to_run_tasks| is empty, it means we either have 219 // If |ready_to_run_tasks| is empty, it means we either have
299 // running tasks, or we have no pending tasks. 220 // running tasks, or we have no pending tasks.
300 DCHECK(!task_namespace->ready_to_run_tasks.empty() || 221 DCHECK(!task_namespace->ready_to_run_tasks.empty() ||
301 (task_namespace->pending_tasks.empty() || 222 (task_namespace->pending_tasks.empty() ||
302 !task_namespace->running_tasks.empty())); 223 !task_namespace->running_tasks.empty()));
303 224
225 // Add task namespace if not empty.
226 if (!task_namespace->pending_tasks.empty() ||
227 !task_namespace->running_tasks.empty() ||
228 !task_namespace->completed_tasks.empty()) {
229 namespaces_.set(token.id_, task_namespace.Pass());
230 }
231
304 // Build new "ready to run" task namespaces queue. 232 // Build new "ready to run" task namespaces queue.
305 ready_to_run_namespaces_.clear(); 233 ready_to_run_namespaces_.clear();
306 for (TaskNamespaceMap::iterator it = namespaces_.begin(); 234 for (TaskNamespaceMap::iterator it = namespaces_.begin();
307 it != namespaces_.end(); ++it) { 235 it != namespaces_.end(); ++it) {
308 if (!it->second->ready_to_run_tasks.empty()) 236 if (!it->second->ready_to_run_tasks.empty())
309 ready_to_run_namespaces_.push_back(it->second.get()); 237 ready_to_run_namespaces_.push_back(it->second);
310 } 238 }
311 239
312 // Rearrange the task namespaces in |ready_to_run_namespaces_| 240 // Rearrange the task namespaces in |ready_to_run_namespaces_|
313 // in such a way that they form a heap. 241 // in such a way that they form a heap.
314 std::make_heap(ready_to_run_namespaces_.begin(), 242 std::make_heap(ready_to_run_namespaces_.begin(),
315 ready_to_run_namespaces_.end(), 243 ready_to_run_namespaces_.end(),
316 CompareTaskNamespacePriority); 244 CompareTaskNamespacePriority);
317 245
318 // If there is more work available, wake up worker thread. 246 // If there is more work available, wake up worker thread.
319 if (!ready_to_run_namespaces_.empty()) 247 if (!ready_to_run_namespaces_.empty())
320 has_ready_to_run_tasks_cv_.Signal(); 248 has_ready_to_run_tasks_cv_.Signal();
321 } 249 }
322 } 250 }
323 251
324 void TaskGraphRunner::CollectCompletedTasks( 252 void TaskGraphRunner::CollectCompletedTasks(
325 const WorkerPool* worker_pool, TaskVector* completed_tasks) { 253 NamespaceToken token, Task::Vector* completed_tasks) {
326 base::AutoLock lock(lock_); 254 base::AutoLock lock(lock_);
327 255
256 DCHECK(token.IsValid());
257 TaskNamespaceMap::iterator it = namespaces_.find(token.id_);
258 if (it == namespaces_.end())
259 return;
260
261 TaskNamespace* task_namespace = it->second;
262
328 DCHECK_EQ(0u, completed_tasks->size()); 263 DCHECK_EQ(0u, completed_tasks->size());
329 DCHECK(namespaces_.find(worker_pool) != namespaces_.end()); 264 completed_tasks->swap(task_namespace->completed_tasks);
330 completed_tasks->swap(namespaces_[worker_pool]->completed_tasks); 265 if (!HasFinishedRunningTasksInNamespace(task_namespace))
266 return;
267
268 // Remove namespace if finished running tasks.
269 DCHECK_EQ(0u, task_namespace->pending_tasks.size());
270 DCHECK_EQ(0u, task_namespace->running_tasks.size());
271 DCHECK_EQ(0u, task_namespace->completed_tasks.size());
272 DCHECK_EQ(0u, task_namespace->ready_to_run_tasks.size());
273 namespaces_.erase(it);
331 } 274 }
332 275
333 void TaskGraphRunner::Run() { 276 void TaskGraphRunner::Run() {
334 base::AutoLock lock(lock_); 277 base::AutoLock lock(lock_);
335 278
336 // Get a unique thread index. 279 // Get a unique thread index.
337 int thread_index = next_thread_index_++; 280 int thread_index = next_thread_index_++;
338 281
339 while (true) { 282 while (true) {
340 if (ready_to_run_namespaces_.empty()) { 283 if (ready_to_run_namespaces_.empty()) {
(...skipping 11 matching lines...) Expand all
352 ready_to_run_namespaces_.end(), 295 ready_to_run_namespaces_.end(),
353 CompareTaskNamespacePriority); 296 CompareTaskNamespacePriority);
354 TaskNamespace* task_namespace = ready_to_run_namespaces_.back(); 297 TaskNamespace* task_namespace = ready_to_run_namespaces_.back();
355 ready_to_run_namespaces_.pop_back(); 298 ready_to_run_namespaces_.pop_back();
356 DCHECK(!task_namespace->ready_to_run_tasks.empty()); 299 DCHECK(!task_namespace->ready_to_run_tasks.empty());
357 300
358 // Take top priority task from |ready_to_run_tasks|. 301 // Take top priority task from |ready_to_run_tasks|.
359 std::pop_heap(task_namespace->ready_to_run_tasks.begin(), 302 std::pop_heap(task_namespace->ready_to_run_tasks.begin(),
360 task_namespace->ready_to_run_tasks.end(), 303 task_namespace->ready_to_run_tasks.end(),
361 CompareTaskPriority); 304 CompareTaskPriority);
362 scoped_refptr<internal::WorkerPoolTask> task( 305 scoped_refptr<Task> task(
363 task_namespace->ready_to_run_tasks.back()->task()); 306 task_namespace->ready_to_run_tasks.back()->task());
364 task_namespace->ready_to_run_tasks.pop_back(); 307 task_namespace->ready_to_run_tasks.pop_back();
365 308
366 // Add task namespace back to |ready_to_run_namespaces_| if not 309 // Add task namespace back to |ready_to_run_namespaces_| if not
367 // empty after taking top priority task. 310 // empty after taking top priority task.
368 if (!task_namespace->ready_to_run_tasks.empty()) { 311 if (!task_namespace->ready_to_run_tasks.empty()) {
369 ready_to_run_namespaces_.push_back(task_namespace); 312 ready_to_run_namespaces_.push_back(task_namespace);
370 std::push_heap(ready_to_run_namespaces_.begin(), 313 std::push_heap(ready_to_run_namespaces_.begin(),
371 ready_to_run_namespaces_.end(), 314 ready_to_run_namespaces_.end(),
372 CompareTaskNamespacePriority); 315 CompareTaskNamespacePriority);
(...skipping 16 matching lines...) Expand all
389 base::AutoUnlock unlock(lock_); 332 base::AutoUnlock unlock(lock_);
390 333
391 task->RunOnWorkerThread(thread_index); 334 task->RunOnWorkerThread(thread_index);
392 } 335 }
393 336
394 // This will mark task as finished running. 337 // This will mark task as finished running.
395 task->DidRun(); 338 task->DidRun();
396 339
397 // Now iterate over all dependents to remove dependency and check 340 // Now iterate over all dependents to remove dependency and check
398 // if they are ready to run. 341 // if they are ready to run.
399 scoped_ptr<internal::GraphNode> node = 342 scoped_ptr<GraphNode> node =
400 task_namespace->running_tasks.take_and_erase(task.get()); 343 task_namespace->running_tasks.take_and_erase(task.get());
401 if (node) { 344 if (node) {
402 bool ready_to_run_namespaces_has_heap_properties = true; 345 bool ready_to_run_namespaces_has_heap_properties = true;
403 346
404 for (internal::GraphNode::Vector::const_iterator it = 347 for (GraphNode::Vector::const_iterator it = node->dependents().begin();
405 node->dependents().begin();
406 it != node->dependents().end(); ++it) { 348 it != node->dependents().end(); ++it) {
407 internal::GraphNode* dependent_node = *it; 349 GraphNode* dependent_node = *it;
408 350
409 dependent_node->remove_dependency(); 351 dependent_node->remove_dependency();
410 // Task is ready if it has no dependencies. Add it to 352 // Task is ready if it has no dependencies. Add it to
411 // |ready_to_run_tasks_|. 353 // |ready_to_run_tasks_|.
412 if (!dependent_node->num_dependencies()) { 354 if (!dependent_node->num_dependencies()) {
413 bool was_empty = task_namespace->ready_to_run_tasks.empty(); 355 bool was_empty = task_namespace->ready_to_run_tasks.empty();
414 task_namespace->ready_to_run_tasks.push_back(dependent_node); 356 task_namespace->ready_to_run_tasks.push_back(dependent_node);
415 std::push_heap(task_namespace->ready_to_run_tasks.begin(), 357 std::push_heap(task_namespace->ready_to_run_tasks.begin(),
416 task_namespace->ready_to_run_tasks.end(), 358 task_namespace->ready_to_run_tasks.end(),
417 CompareTaskPriority); 359 CompareTaskPriority);
(...skipping 17 matching lines...) Expand all
435 std::make_heap(ready_to_run_namespaces_.begin(), 377 std::make_heap(ready_to_run_namespaces_.begin(),
436 ready_to_run_namespaces_.end(), 378 ready_to_run_namespaces_.end(),
437 CompareTaskNamespacePriority); 379 CompareTaskNamespacePriority);
438 } 380 }
439 } 381 }
440 382
441 // Finally add task to |completed_tasks_|. 383 // Finally add task to |completed_tasks_|.
442 task_namespace->completed_tasks.push_back(task); 384 task_namespace->completed_tasks.push_back(task);
443 385
444 // If namespace has finished running all tasks, wake up origin thread. 386 // If namespace has finished running all tasks, wake up origin thread.
445 if (has_finished_running_tasks(task_namespace)) 387 if (HasFinishedRunningTasksInNamespace(task_namespace))
446 has_namespaces_with_finished_running_tasks_cv_.Signal(); 388 has_namespaces_with_finished_running_tasks_cv_.Signal();
447 } 389 }
448 390
449 // We noticed we should exit. Wake up the next worker so it knows it should 391 // We noticed we should exit. Wake up the next worker so it knows it should
450 // exit as well (because the Shutdown() code only signals once). 392 // exit as well (because the Shutdown() code only signals once).
451 has_ready_to_run_tasks_cv_.Signal(); 393 has_ready_to_run_tasks_cv_.Signal();
452 } 394 }
453 395
454 class CompositorRasterTaskGraphRunner
455 : public TaskGraphRunner {
456 public:
457 CompositorRasterTaskGraphRunner() : TaskGraphRunner(
458 WorkerPool::GetNumRasterThreads(), "CompositorRaster") {
459 }
460 };
461
462 base::LazyInstance<CompositorRasterTaskGraphRunner>
463 g_task_graph_runner = LAZY_INSTANCE_INITIALIZER;
464
465 const int kDefaultNumRasterThreads = 1;
466
467 int g_num_raster_threads = 0;
468
469 } // namespace
470
471 namespace internal {
472
473 WorkerPoolTask::WorkerPoolTask()
474 : did_schedule_(false),
475 did_run_(false),
476 did_complete_(false) {
477 }
478
479 WorkerPoolTask::~WorkerPoolTask() {
480 DCHECK_EQ(did_schedule_, did_complete_);
481 DCHECK(!did_run_ || did_schedule_);
482 DCHECK(!did_run_ || did_complete_);
483 }
484
485 void WorkerPoolTask::DidSchedule() {
486 DCHECK(!did_complete_);
487 did_schedule_ = true;
488 }
489
490 void WorkerPoolTask::WillRun() {
491 DCHECK(did_schedule_);
492 DCHECK(!did_complete_);
493 DCHECK(!did_run_);
494 }
495
496 void WorkerPoolTask::DidRun() {
497 did_run_ = true;
498 }
499
500 void WorkerPoolTask::WillComplete() {
501 DCHECK(!did_complete_);
502 }
503
504 void WorkerPoolTask::DidComplete() {
505 DCHECK(did_schedule_);
506 DCHECK(!did_complete_);
507 did_complete_ = true;
508 }
509
510 bool WorkerPoolTask::HasFinishedRunning() const {
511 return did_run_;
512 }
513
514 bool WorkerPoolTask::HasCompleted() const {
515 return did_complete_;
516 }
517
518 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
519 : task_(task),
520 priority_(priority),
521 num_dependencies_(0) {
522 }
523
524 GraphNode::~GraphNode() {
525 }
526
527 } // namespace internal 396 } // namespace internal
528
529 // static
530 void WorkerPool::SetNumRasterThreads(int num_threads) {
531 DCHECK_LT(0, num_threads);
532 DCHECK_EQ(0, g_num_raster_threads);
533
534 g_num_raster_threads = num_threads;
535 }
536
537 // static
538 int WorkerPool::GetNumRasterThreads() {
539 if (!g_num_raster_threads)
540 g_num_raster_threads = kDefaultNumRasterThreads;
541
542 return g_num_raster_threads;
543 }
544
545 WorkerPool::WorkerPool() : in_dispatch_completion_callbacks_(false) {
546 g_task_graph_runner.Pointer()->Register(this);
547 }
548
549 WorkerPool::~WorkerPool() {
550 g_task_graph_runner.Pointer()->Unregister(this);
551 }
552
553 void WorkerPool::Shutdown() {
554 TRACE_EVENT0("cc", "WorkerPool::Shutdown");
555
556 DCHECK(!in_dispatch_completion_callbacks_);
557
558 g_task_graph_runner.Pointer()->WaitForTasksToFinishRunning(this);
559 }
560
561 void WorkerPool::SetTaskGraph(TaskGraph* graph) {
562 TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph",
563 "num_tasks", graph->size());
564
565 DCHECK(!in_dispatch_completion_callbacks_);
566
567 g_task_graph_runner.Pointer()->SetTaskGraph(this, graph);
568 }
569
570 void WorkerPool::CheckForCompletedWorkerTasks() {
571 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedWorkerTasks");
572
573 DCHECK(!in_dispatch_completion_callbacks_);
574
575 TaskVector completed_tasks;
576 g_task_graph_runner.Pointer()->CollectCompletedTasks(this, &completed_tasks);
577 ProcessCompletedTasks(completed_tasks);
578 }
579
580 void WorkerPool::ProcessCompletedTasks(
581 const TaskVector& completed_tasks) {
582 TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks",
583 "completed_task_count", completed_tasks.size());
584
585 // Worker pool instance is not reentrant while processing completed tasks.
586 in_dispatch_completion_callbacks_ = true;
587
588 for (TaskVector::const_iterator it = completed_tasks.begin();
589 it != completed_tasks.end();
590 ++it) {
591 internal::WorkerPoolTask* task = it->get();
592
593 task->WillComplete();
594 task->CompleteOnOriginThread();
595 task->DidComplete();
596 }
597
598 in_dispatch_completion_callbacks_ = false;
599 }
600
601 } // namespace cc 397 } // namespace cc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698