Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(471)

Side by Side Diff: cc/resources/task_graph_runner.cc

Issue 143003012: Revert of cc: Remove WorkerPool class and instead use TaskGraphRunner directly. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « cc/resources/task_graph_runner.h ('k') | cc/resources/task_graph_runner_perftest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "cc/resources/task_graph_runner.h"
6
7 #include <algorithm>
8
9 #include "base/containers/hash_tables.h"
10 #include "base/debug/trace_event.h"
11 #include "base/strings/stringprintf.h"
12 #include "base/threading/thread_restrictions.h"
13
14 namespace cc {
15 namespace internal {
16
17 Task::Task()
18 : did_schedule_(false),
19 did_run_(false) {
20 }
21
22 Task::~Task() {
23 DCHECK(!did_run_ || did_schedule_);
24 }
25
26 void Task::DidSchedule() {
27 did_schedule_ = true;
28 }
29
30 void Task::WillRun() {
31 DCHECK(did_schedule_);
32 DCHECK(!did_run_);
33 }
34
35 void Task::DidRun() {
36 did_run_ = true;
37 }
38
39 bool Task::HasFinishedRunning() const {
40 return did_run_;
41 }
42
43 GraphNode::GraphNode(Task* task, unsigned priority)
44 : task_(task),
45 priority_(priority),
46 num_dependencies_(0) {
47 }
48
49 GraphNode::~GraphNode() {}
50
51 TaskGraphRunner::TaskNamespace::TaskNamespace() {}
52
53 TaskGraphRunner::TaskNamespace::~TaskNamespace() {}
54
55 TaskGraphRunner::TaskGraphRunner(
56 size_t num_threads, const std::string& thread_name_prefix)
57 : lock_(),
58 has_ready_to_run_tasks_cv_(&lock_),
59 has_namespaces_with_finished_running_tasks_cv_(&lock_),
60 next_namespace_id_(1),
61 next_thread_index_(0u),
62 shutdown_(false) {
63 base::AutoLock lock(lock_);
64
65 while (workers_.size() < num_threads) {
66 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
67 new base::DelegateSimpleThread(
68 this,
69 thread_name_prefix +
70 base::StringPrintf(
71 "Worker%u",
72 static_cast<unsigned>(workers_.size() + 1)).c_str()));
73 worker->Start();
74 #if defined(OS_ANDROID) || defined(OS_LINUX)
75 worker->SetThreadPriority(base::kThreadPriority_Background);
76 #endif
77 workers_.push_back(worker.Pass());
78 }
79 }
80
81 TaskGraphRunner::~TaskGraphRunner() {
82 {
83 base::AutoLock lock(lock_);
84
85 DCHECK_EQ(0u, ready_to_run_namespaces_.size());
86 DCHECK_EQ(0u, namespaces_.size());
87
88 DCHECK(!shutdown_);
89 shutdown_ = true;
90
91 // Wake up a worker so it knows it should exit. This will cause all workers
92 // to exit as each will wake up another worker before exiting.
93 has_ready_to_run_tasks_cv_.Signal();
94 }
95
96 while (workers_.size()) {
97 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
98 // Join() is considered IO and will block this thread.
99 base::ThreadRestrictions::ScopedAllowIO allow_io;
100 worker->Join();
101 }
102 }
103
104 NamespaceToken TaskGraphRunner::GetNamespaceToken() {
105 base::AutoLock lock(lock_);
106
107 NamespaceToken token(next_namespace_id_++);
108 DCHECK(namespaces_.find(token.id_) == namespaces_.end());
109 return token;
110 }
111
112 void TaskGraphRunner::WaitForTasksToFinishRunning(NamespaceToken token) {
113 base::AutoLock lock(lock_);
114
115 DCHECK(token.IsValid());
116 TaskNamespaceMap::iterator it = namespaces_.find(token.id_);
117 if (it == namespaces_.end())
118 return;
119
120 TaskNamespace* task_namespace = it->second;
121 while (!HasFinishedRunningTasksInNamespace(task_namespace))
122 has_namespaces_with_finished_running_tasks_cv_.Wait();
123
124 // There may be other namespaces that have finished running
125 // tasks, so wake up another origin thread.
126 has_namespaces_with_finished_running_tasks_cv_.Signal();
127 }
128
129 void TaskGraphRunner::SetTaskGraph(NamespaceToken token, TaskGraph* graph) {
130 DCHECK(token.IsValid());
131
132 TaskGraph new_pending_tasks;
133 TaskGraph new_running_tasks;
134
135 new_pending_tasks.swap(*graph);
136
137 {
138 base::AutoLock lock(lock_);
139
140 DCHECK(!shutdown_);
141
142 scoped_ptr<TaskNamespace> task_namespace = namespaces_.take_and_erase(
143 token.id_);
144
145 // Create task namespace if it doesn't exist.
146 if (!task_namespace)
147 task_namespace.reset(new TaskNamespace);
148
149 // First remove all completed tasks from |new_pending_tasks| and
150 // adjust number of dependencies.
151 for (Task::Vector::iterator it = task_namespace->completed_tasks.begin();
152 it != task_namespace->completed_tasks.end(); ++it) {
153 Task* task = it->get();
154
155 scoped_ptr<GraphNode> node = new_pending_tasks.take_and_erase(task);
156 if (node) {
157 for (GraphNode::Vector::const_iterator it = node->dependents().begin();
158 it != node->dependents().end(); ++it) {
159 GraphNode* dependent_node = *it;
160 dependent_node->remove_dependency();
161 }
162 }
163 }
164
165 // Build new running task set.
166 for (TaskGraph::iterator it = task_namespace->running_tasks.begin();
167 it != task_namespace->running_tasks.end(); ++it) {
168 Task* task = it->first;
169 // Transfer scheduled task value from |new_pending_tasks| to
170 // |new_running_tasks| if currently running. Value must be set to
171 // NULL if |new_pending_tasks| doesn't contain task. This does
172 // the right in both cases.
173 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
174 }
175
176 // Build new "ready to run" tasks queue.
177 task_namespace->ready_to_run_tasks.clear();
178 for (TaskGraph::iterator it = new_pending_tasks.begin();
179 it != new_pending_tasks.end(); ++it) {
180 Task* task = it->first;
181 DCHECK(task);
182 GraphNode* node = it->second;
183
184 // Completed tasks should not exist in |new_pending_tasks|.
185 DCHECK(!task->HasFinishedRunning());
186
187 // Call DidSchedule() to indicate that this task has been scheduled.
188 // Note: This is only for debugging purposes.
189 task->DidSchedule();
190
191 if (!node->num_dependencies())
192 task_namespace->ready_to_run_tasks.push_back(node);
193
194 // Erase the task from old pending tasks.
195 task_namespace->pending_tasks.erase(task);
196 }
197
198 // Rearrange the elements in |ready_to_run_tasks| in such a way that
199 // they form a heap.
200 std::make_heap(task_namespace->ready_to_run_tasks.begin(),
201 task_namespace->ready_to_run_tasks.end(),
202 CompareTaskPriority);
203
204 task_namespace->completed_tasks.reserve(
205 task_namespace->completed_tasks.size() +
206 task_namespace->pending_tasks.size());
207
208 // The items left in |pending_tasks| need to be canceled.
209 for (TaskGraph::const_iterator it = task_namespace->pending_tasks.begin();
210 it != task_namespace->pending_tasks.end(); ++it) {
211 task_namespace->completed_tasks.push_back(it->first);
212 }
213
214 // Swap task sets.
215 // Note: old tasks are intentionally destroyed after releasing |lock_|.
216 task_namespace->pending_tasks.swap(new_pending_tasks);
217 task_namespace->running_tasks.swap(new_running_tasks);
218
219 // If |ready_to_run_tasks| is empty, it means we either have
220 // running tasks, or we have no pending tasks.
221 DCHECK(!task_namespace->ready_to_run_tasks.empty() ||
222 (task_namespace->pending_tasks.empty() ||
223 !task_namespace->running_tasks.empty()));
224
225 // Add task namespace if not empty.
226 if (!task_namespace->pending_tasks.empty() ||
227 !task_namespace->running_tasks.empty() ||
228 !task_namespace->completed_tasks.empty()) {
229 namespaces_.set(token.id_, task_namespace.Pass());
230 }
231
232 // Build new "ready to run" task namespaces queue.
233 ready_to_run_namespaces_.clear();
234 for (TaskNamespaceMap::iterator it = namespaces_.begin();
235 it != namespaces_.end(); ++it) {
236 if (!it->second->ready_to_run_tasks.empty())
237 ready_to_run_namespaces_.push_back(it->second);
238 }
239
240 // Rearrange the task namespaces in |ready_to_run_namespaces_|
241 // in such a way that they form a heap.
242 std::make_heap(ready_to_run_namespaces_.begin(),
243 ready_to_run_namespaces_.end(),
244 CompareTaskNamespacePriority);
245
246 // If there is more work available, wake up worker thread.
247 if (!ready_to_run_namespaces_.empty())
248 has_ready_to_run_tasks_cv_.Signal();
249 }
250 }
251
252 void TaskGraphRunner::CollectCompletedTasks(
253 NamespaceToken token, Task::Vector* completed_tasks) {
254 base::AutoLock lock(lock_);
255
256 DCHECK(token.IsValid());
257 TaskNamespaceMap::iterator it = namespaces_.find(token.id_);
258 if (it == namespaces_.end())
259 return;
260
261 TaskNamespace* task_namespace = it->second;
262
263 DCHECK_EQ(0u, completed_tasks->size());
264 completed_tasks->swap(task_namespace->completed_tasks);
265 if (!HasFinishedRunningTasksInNamespace(task_namespace))
266 return;
267
268 // Remove namespace if finished running tasks.
269 DCHECK_EQ(0u, task_namespace->pending_tasks.size());
270 DCHECK_EQ(0u, task_namespace->running_tasks.size());
271 DCHECK_EQ(0u, task_namespace->completed_tasks.size());
272 DCHECK_EQ(0u, task_namespace->ready_to_run_tasks.size());
273 namespaces_.erase(it);
274 }
275
276 void TaskGraphRunner::Run() {
277 base::AutoLock lock(lock_);
278
279 // Get a unique thread index.
280 int thread_index = next_thread_index_++;
281
282 while (true) {
283 if (ready_to_run_namespaces_.empty()) {
284 // Exit when shutdown is set and no more tasks are pending.
285 if (shutdown_)
286 break;
287
288 // Wait for more tasks.
289 has_ready_to_run_tasks_cv_.Wait();
290 continue;
291 }
292
293 // Take top priority TaskNamespace from |ready_to_run_namespaces_|.
294 std::pop_heap(ready_to_run_namespaces_.begin(),
295 ready_to_run_namespaces_.end(),
296 CompareTaskNamespacePriority);
297 TaskNamespace* task_namespace = ready_to_run_namespaces_.back();
298 ready_to_run_namespaces_.pop_back();
299 DCHECK(!task_namespace->ready_to_run_tasks.empty());
300
301 // Take top priority task from |ready_to_run_tasks|.
302 std::pop_heap(task_namespace->ready_to_run_tasks.begin(),
303 task_namespace->ready_to_run_tasks.end(),
304 CompareTaskPriority);
305 scoped_refptr<Task> task(
306 task_namespace->ready_to_run_tasks.back()->task());
307 task_namespace->ready_to_run_tasks.pop_back();
308
309 // Add task namespace back to |ready_to_run_namespaces_| if not
310 // empty after taking top priority task.
311 if (!task_namespace->ready_to_run_tasks.empty()) {
312 ready_to_run_namespaces_.push_back(task_namespace);
313 std::push_heap(ready_to_run_namespaces_.begin(),
314 ready_to_run_namespaces_.end(),
315 CompareTaskNamespacePriority);
316 }
317
318 // Move task from |pending_tasks| to |running_tasks|.
319 DCHECK(task_namespace->pending_tasks.contains(task.get()));
320 DCHECK(!task_namespace->running_tasks.contains(task.get()));
321 task_namespace->running_tasks.set(
322 task.get(),
323 task_namespace->pending_tasks.take_and_erase(task.get()));
324
325 // There may be more work available, so wake up another worker thread.
326 has_ready_to_run_tasks_cv_.Signal();
327
328 // Call WillRun() before releasing |lock_| and running task.
329 task->WillRun();
330
331 {
332 base::AutoUnlock unlock(lock_);
333
334 task->RunOnWorkerThread(thread_index);
335 }
336
337 // This will mark task as finished running.
338 task->DidRun();
339
340 // Now iterate over all dependents to remove dependency and check
341 // if they are ready to run.
342 scoped_ptr<GraphNode> node =
343 task_namespace->running_tasks.take_and_erase(task.get());
344 if (node) {
345 bool ready_to_run_namespaces_has_heap_properties = true;
346
347 for (GraphNode::Vector::const_iterator it = node->dependents().begin();
348 it != node->dependents().end(); ++it) {
349 GraphNode* dependent_node = *it;
350
351 dependent_node->remove_dependency();
352 // Task is ready if it has no dependencies. Add it to
353 // |ready_to_run_tasks_|.
354 if (!dependent_node->num_dependencies()) {
355 bool was_empty = task_namespace->ready_to_run_tasks.empty();
356 task_namespace->ready_to_run_tasks.push_back(dependent_node);
357 std::push_heap(task_namespace->ready_to_run_tasks.begin(),
358 task_namespace->ready_to_run_tasks.end(),
359 CompareTaskPriority);
360 // Task namespace is ready if it has at least one ready
361 // to run task. Add it to |ready_to_run_namespaces_| if
362 // it just become ready.
363 if (was_empty) {
364 DCHECK(std::find(ready_to_run_namespaces_.begin(),
365 ready_to_run_namespaces_.end(),
366 task_namespace) ==
367 ready_to_run_namespaces_.end());
368 ready_to_run_namespaces_.push_back(task_namespace);
369 }
370 ready_to_run_namespaces_has_heap_properties = false;
371 }
372 }
373
374 // Rearrange the task namespaces in |ready_to_run_namespaces_|
375 // in such a way that they yet again form a heap.
376 if (!ready_to_run_namespaces_has_heap_properties) {
377 std::make_heap(ready_to_run_namespaces_.begin(),
378 ready_to_run_namespaces_.end(),
379 CompareTaskNamespacePriority);
380 }
381 }
382
383 // Finally add task to |completed_tasks_|.
384 task_namespace->completed_tasks.push_back(task);
385
386 // If namespace has finished running all tasks, wake up origin thread.
387 if (HasFinishedRunningTasksInNamespace(task_namespace))
388 has_namespaces_with_finished_running_tasks_cv_.Signal();
389 }
390
391 // We noticed we should exit. Wake up the next worker so it knows it should
392 // exit as well (because the Shutdown() code only signals once).
393 has_ready_to_run_tasks_cv_.Signal();
394 }
395
396 } // namespace internal
397 } // namespace cc
OLDNEW
« no previous file with comments | « cc/resources/task_graph_runner.h ('k') | cc/resources/task_graph_runner_perftest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698