Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(332)

Side by Side Diff: cc/resources/worker_pool.cc

Issue 73923003: Shared Raster Worker Threads (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Code cleanup and pre-submit issue changes Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « cc/resources/worker_pool.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "cc/resources/worker_pool.h" 5 #include "cc/resources/worker_pool.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <queue> 8 #include <queue>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/command_line.h"
11 #include "base/containers/hash_tables.h" 12 #include "base/containers/hash_tables.h"
12 #include "base/debug/trace_event.h" 13 #include "base/debug/trace_event.h"
14 #include "base/lazy_instance.h"
13 #include "base/strings/stringprintf.h" 15 #include "base/strings/stringprintf.h"
14 #include "base/synchronization/condition_variable.h" 16 #include "base/synchronization/condition_variable.h"
15 #include "base/threading/simple_thread.h" 17 #include "base/threading/simple_thread.h"
16 #include "base/threading/thread_restrictions.h" 18 #include "base/threading/thread_restrictions.h"
17 #include "cc/base/scoped_ptr_deque.h" 19 #include "cc/base/scoped_ptr_deque.h"
20 #include "cc/base/switches.h"
18 21
19 namespace cc { 22 namespace cc {
20 23
21 namespace internal { 24 namespace {
22
23 WorkerPoolTask::WorkerPoolTask()
24 : did_schedule_(false),
25 did_run_(false),
26 did_complete_(false) {
27 }
28
29 WorkerPoolTask::~WorkerPoolTask() {
30 DCHECK_EQ(did_schedule_, did_complete_);
31 DCHECK(!did_run_ || did_schedule_);
32 DCHECK(!did_run_ || did_complete_);
33 }
34
35 void WorkerPoolTask::DidSchedule() {
36 DCHECK(!did_complete_);
37 did_schedule_ = true;
38 }
39
40 void WorkerPoolTask::WillRun() {
41 DCHECK(did_schedule_);
42 DCHECK(!did_complete_);
43 DCHECK(!did_run_);
44 }
45
46 void WorkerPoolTask::DidRun() {
47 did_run_ = true;
48 }
49
50 void WorkerPoolTask::WillComplete() {
51 DCHECK(!did_complete_);
52 }
53
54 void WorkerPoolTask::DidComplete() {
55 DCHECK(did_schedule_);
56 DCHECK(!did_complete_);
57 did_complete_ = true;
58 }
59
60 bool WorkerPoolTask::HasFinishedRunning() const {
61 return did_run_;
62 }
63
64 bool WorkerPoolTask::HasCompleted() const {
65 return did_complete_;
66 }
67
68 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
69 : task_(task),
70 priority_(priority),
71 num_dependencies_(0) {
72 }
73
74 GraphNode::~GraphNode() {
75 }
76
77 } // namespace internal
78 25
79 // Internal to the worker pool. Any data or logic that needs to be 26 // Internal to the worker pool. Any data or logic that needs to be
80 // shared between threads lives in this class. All members are guarded 27 // shared between threads lives in this class. All members are guarded
81 // by |lock_|. 28 // by |lock_|.
82 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { 29 class WorkerInner : public base::DelegateSimpleThread::Delegate {
83 public: 30 public:
84 Inner(size_t num_threads, const std::string& thread_name_prefix); 31 WorkerInner(size_t num_threads, const std::string& thread_name_prefix);
85 virtual ~Inner(); 32 virtual ~WorkerInner();
86 33
87 void Shutdown(); 34 void Shutdown();
88 35
36 typedef base::ScopedPtrHashMap<internal::WorkerPoolTask*, internal::GraphNode>
37 GraphNodeMap;
38 typedef GraphNodeMap TaskGraph;
39 typedef base::ScopedPtrHashMap<WorkerPool*, GraphNodeMap>
40 TaskMap;
41 typedef std::vector<scoped_refptr<internal::WorkerPoolTask> > TaskVector;
42
89 // Schedule running of tasks in |graph|. Tasks previously scheduled but 43 // Schedule running of tasks in |graph|. Tasks previously scheduled but
90 // no longer needed will be canceled unless already running. Canceled 44 // no longer needed will be canceled unless already running. Canceled
91 // tasks are moved to |completed_tasks_| without being run. The result 45 // tasks are moved to |completed_tasks_| without being run. The result
92 // is that once scheduled, a task is guaranteed to end up in the 46 // is that once scheduled, a task is guaranteed to end up in the
93 // |completed_tasks_| queue even if they later get canceled by another 47 // |completed_tasks_| queue even if they later get canceled by another
94 // call to SetTaskGraph(). 48 // call to SetTaskGraph().
95 void SetTaskGraph(TaskGraph* graph); 49 void SetTaskGraph(TaskGraph* graph, WorkerPool* worker_pool);
96 50
97 // Collect all completed tasks in |completed_tasks|. 51 // Collect all completed tasks in |completed_tasks|.
98 void CollectCompletedTasks(TaskVector* completed_tasks); 52 void CollectCompletedTasks(TaskVector* completed_tasks,
53 WorkerPool* worker_pool);
54
99 55
100 private: 56 private:
101 class PriorityComparator { 57 class PriorityComparator {
102 public: 58 public:
103 bool operator()(const internal::GraphNode* a, 59 bool operator()(const internal::GraphNode* a,
104 const internal::GraphNode* b) { 60 const internal::GraphNode* b) {
105 // In this system, numerically lower priority is run first. 61 // In this system, numerically lower priority is run first.
106 if (a->priority() != b->priority()) 62 if (a->priority() != b->priority())
107 return a->priority() > b->priority(); 63 return a->priority() > b->priority();
108 64
109 // Run task with most dependents first when priority is the same. 65 // Run task with most dependents first when priority is the same.
110 return a->dependents().size() < b->dependents().size(); 66 return a->dependents().size() < b->dependents().size();
111 } 67 }
112 }; 68 };
113 69
70 // Ordered set of tasks that are ready to run.
71 typedef std::priority_queue<internal::GraphNode*,
72 std::vector<internal::GraphNode*>,
73 PriorityComparator> TaskQueue;
74
75 class TaskSet {
76 public:
77 GraphNodeMap pending_tasks_;
78 GraphNodeMap running_tasks_;
79 TaskVector completed_tasks_;
80 TaskQueue ready_to_run_tasks_;
81 };
82
83 class TaskComparator {
84 public:
85 bool operator()(const TaskSet* a,
86 const TaskSet* b) {
87 if (a->ready_to_run_tasks_.top()->priority()
88 != b->ready_to_run_tasks_.top()->priority())
89 return a->ready_to_run_tasks_.top()->priority() >
90 b->ready_to_run_tasks_.top()->priority();
91
92 return a->ready_to_run_tasks_.top()->dependents().size() >
93 b->ready_to_run_tasks_.top()->dependents().size();
94 }
95 };
96
97 typedef std::map<const WorkerPool*, TaskSet*> TaskMapper;
98
99 TaskMapper tasks_;
100
101 typedef std::priority_queue<TaskSet*,
102 std::vector<TaskSet*>,
103 TaskComparator> TaskPriorityQueue;
104
105 TaskPriorityQueue shared_ready_to_run_tasks_;
114 // Overridden from base::DelegateSimpleThread: 106 // Overridden from base::DelegateSimpleThread:
115 virtual void Run() OVERRIDE; 107 virtual void Run() OVERRIDE;
116 108
117 // This lock protects all members of this class except 109 // This lock protects all members of this class except
118 // |worker_pool_on_origin_thread_|. Do not read or modify anything 110 // |worker_pool_on_origin_thread_|. Do not read or modify anything
119 // without holding this lock. Do not block while holding this lock. 111 // without holding this lock. Do not block while holding this lock.
120 mutable base::Lock lock_; 112 mutable base::Lock lock_;
121 113
122 // Condition variable that is waited on by worker threads until new 114 // Condition variable that is waited on by worker threads until new
123 // tasks are ready to run or shutdown starts. 115 // tasks are ready to run or shutdown starts.
124 base::ConditionVariable has_ready_to_run_tasks_cv_; 116 base::ConditionVariable has_ready_to_run_tasks_cv_;
125 117
126 // Provides each running thread loop with a unique index. First thread 118 // Provides each running thread loop with a unique index. First thread
127 // loop index is 0. 119 // loop index is 0.
128 unsigned next_thread_index_; 120 unsigned next_thread_index_;
129 121
130 // Set during shutdown. Tells workers to exit when no more tasks 122 // Set during shutdown. Tells workers to exit when no more tasks
131 // are pending. 123 // are pending.
132 bool shutdown_; 124 bool shutdown_;
133 125
134 // This set contains all pending tasks.
135 GraphNodeMap pending_tasks_;
136
137 // Ordered set of tasks that are ready to run.
138 typedef std::priority_queue<internal::GraphNode*,
139 std::vector<internal::GraphNode*>,
140 PriorityComparator> TaskQueue;
141 TaskQueue ready_to_run_tasks_;
142
143 // This set contains all currently running tasks.
144 GraphNodeMap running_tasks_;
145
146 // Completed tasks not yet collected by origin thread.
147 TaskVector completed_tasks_;
148
149 ScopedPtrDeque<base::DelegateSimpleThread> workers_; 126 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
150 127
151 DISALLOW_COPY_AND_ASSIGN(Inner); 128 DISALLOW_COPY_AND_ASSIGN(WorkerInner);
152 }; 129 };
153 130
154 WorkerPool::Inner::Inner( 131 class CC_EXPORT DerivedInner : public WorkerInner {
132 public:
133 DerivedInner();
134 };
135
136 base::LazyInstance<DerivedInner> g_workerpool_inner;
137
138
139 WorkerInner::WorkerInner(
155 size_t num_threads, const std::string& thread_name_prefix) 140 size_t num_threads, const std::string& thread_name_prefix)
156 : lock_(), 141 : lock_(),
157 has_ready_to_run_tasks_cv_(&lock_), 142 has_ready_to_run_tasks_cv_(&lock_),
158 next_thread_index_(0), 143 next_thread_index_(0),
159 shutdown_(false) { 144 shutdown_(false) {
160 base::AutoLock lock(lock_); 145 base::AutoLock lock(lock_);
161 146
162 while (workers_.size() < num_threads) { 147 while (workers_.size() < num_threads) {
163 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 148 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
164 new base::DelegateSimpleThread( 149 new base::DelegateSimpleThread(
165 this, 150 this,
166 thread_name_prefix + 151 thread_name_prefix +
167 base::StringPrintf( 152 base::StringPrintf(
168 "Worker%u", 153 "Worker%u",
169 static_cast<unsigned>(workers_.size() + 1)).c_str())); 154 static_cast<unsigned>(workers_.size() + 1)).c_str()));
170 worker->Start(); 155 worker->Start();
171 #if defined(OS_ANDROID) || defined(OS_LINUX) 156 #if defined(OS_ANDROID) || defined(OS_LINUX)
172 worker->SetThreadPriority(base::kThreadPriority_Background); 157 worker->SetThreadPriority(base::kThreadPriority_Background);
173 #endif 158 #endif
174 workers_.push_back(worker.Pass()); 159 workers_.push_back(worker.Pass());
175 } 160 }
176 } 161 }
177 162
178 WorkerPool::Inner::~Inner() { 163 WorkerInner::~WorkerInner() {
179 base::AutoLock lock(lock_); 164 base::AutoLock lock(lock_);
180 165
181 DCHECK(shutdown_); 166 DCHECK(shutdown_);
182 167 DCHECK_EQ(0u, shared_ready_to_run_tasks_.size());
183 DCHECK_EQ(0u, pending_tasks_.size());
184 DCHECK_EQ(0u, ready_to_run_tasks_.size());
185 DCHECK_EQ(0u, running_tasks_.size());
186 DCHECK_EQ(0u, completed_tasks_.size());
187 } 168 }
188 169
189 void WorkerPool::Inner::Shutdown() { 170 void WorkerInner::Shutdown() {
190 { 171 {
191 base::AutoLock lock(lock_); 172 base::AutoLock lock(lock_);
192 173
193 DCHECK(!shutdown_); 174 DCHECK(!shutdown_);
194 shutdown_ = true; 175 shutdown_ = true;
195
196 // Wake up a worker so it knows it should exit. This will cause all workers 176 // Wake up a worker so it knows it should exit. This will cause all workers
197 // to exit as each will wake up another worker before exiting. 177 // to exit as each will wake up another worker before exiting.
198 has_ready_to_run_tasks_cv_.Signal(); 178 has_ready_to_run_tasks_cv_.Signal();
199 } 179 }
200 180
201 while (workers_.size()) { 181 while (workers_.size()) {
202 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 182 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
203 // http://crbug.com/240453 - Join() is considered IO and will block this 183 // http://crbug.com/240453 - Join() is considered IO and will block this
204 // thread. See also http://crbug.com/239423 for further ideas. 184 // thread. See also http://crbug.com/239423 for further ideas.
205 base::ThreadRestrictions::ScopedAllowIO allow_io; 185 base::ThreadRestrictions::ScopedAllowIO allow_io;
206 worker->Join(); 186 worker->Join();
207 } 187 }
208 } 188 }
209 189
210 void WorkerPool::Inner::SetTaskGraph(TaskGraph* graph) { 190 void WorkerInner::SetTaskGraph(TaskGraph* graph, WorkerPool* worker_pool) {
reveman 2013/12/14 05:22:40 Let's focus on getting this function right first.
211 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty. 191 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty.
212 DCHECK(graph->empty() || !shutdown_); 192 DCHECK(graph->empty() || !shutdown_);
213 193
214 GraphNodeMap new_pending_tasks; 194 GraphNodeMap new_pending_tasks;
215 GraphNodeMap new_running_tasks; 195 GraphNodeMap new_running_tasks;
216 TaskQueue new_ready_to_run_tasks; 196 TaskQueue new_ready_to_run_tasks;
197 TaskVector new_completed_tasks;
reveman 2013/12/14 05:22:40 why do you need this?
sohanjg 2013/12/16 10:23:36 Done. This and below temp variables were added fo
198 GraphNodeMap temp_pending_tasks;
reveman 2013/12/14 05:22:40 This doesn't seem useful either.
sohanjg 2013/12/16 10:23:36 Done.
199
217 200
218 new_pending_tasks.swap(*graph); 201 new_pending_tasks.swap(*graph);
219 202
220 { 203 {
221 base::AutoLock lock(lock_); 204 base::AutoLock lock(lock_);
222 205
206 // Create Task Set
207 if (tasks_.count(worker_pool) == 0) {
reveman 2013/12/14 05:22:40 I think we should add ::Register/Unregister(const
sohanjg 2013/12/16 10:23:36 Done.
208
209 TaskSet* task_set= new TaskSet();
210 task_set->completed_tasks_ = new_completed_tasks;
211 task_set->ready_to_run_tasks_ = new_ready_to_run_tasks;
212 task_set->running_tasks_.swap(new_running_tasks);
213 task_set->pending_tasks_.swap(temp_pending_tasks);
214 tasks_.insert(std::pair<const WorkerPool*,
215 TaskSet*>(worker_pool, task_set));
216
217 }
218
219 new_completed_tasks = tasks_[worker_pool]->completed_tasks_;
reveman 2013/12/14 05:22:40 This is making a complete copy of the vector, whic
sohanjg 2013/12/16 10:23:36 Done. This i had added because i had wrongly inte
220
223 // First remove all completed tasks from |new_pending_tasks| and 221 // First remove all completed tasks from |new_pending_tasks| and
224 // adjust number of dependencies. 222 // adjust number of dependencies.
225 for (TaskVector::iterator it = completed_tasks_.begin(); 223 for (TaskVector::iterator it = new_completed_tasks.begin();
226 it != completed_tasks_.end(); ++it) { 224 it != new_completed_tasks.end(); ++it) {
227 internal::WorkerPoolTask* task = it->get(); 225 internal::WorkerPoolTask* task = it->get();
228
229 scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase( 226 scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase(
230 task); 227 task);
231 if (node) { 228 if (node) {
232 for (internal::GraphNode::Vector::const_iterator it = 229 for (internal::GraphNode::Vector::const_iterator it =
233 node->dependents().begin(); 230 node->dependents().begin();
234 it != node->dependents().end(); ++it) { 231 it != node->dependents().end(); ++it) {
235 internal::GraphNode* dependent_node = *it; 232 internal::GraphNode* dependent_node = *it;
236 dependent_node->remove_dependency(); 233 dependent_node->remove_dependency();
237 } 234 }
238 } 235 }
239 } 236 }
240 237
241 // Build new running task set. 238 // Build new running task set.
242 for (GraphNodeMap::iterator it = running_tasks_.begin(); 239 for (GraphNodeMap::iterator it =
243 it != running_tasks_.end(); ++it) { 240 tasks_[worker_pool]->running_tasks_.begin();
241 it != tasks_[worker_pool]->running_tasks_.end(); ++it) {
244 internal::WorkerPoolTask* task = it->first; 242 internal::WorkerPoolTask* task = it->first;
245 // Transfer scheduled task value from |new_pending_tasks| to 243 // Transfer scheduled task value from |new_pending_tasks| to
246 // |new_running_tasks| if currently running. Value must be set to 244 // |new_running_tasks| if currently running. Value must be set to
247 // NULL if |new_pending_tasks| doesn't contain task. This does 245 // NULL if |new_pending_tasks| doesn't contain task. This does
248 // the right in both cases. 246 // the right in both cases.
249 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); 247 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
250 } 248 }
251 249
252 // Build new "ready to run" tasks queue. 250 // Build new "ready to run" tasks queue.
253 // TODO(reveman): Create this queue when building the task graph instead. 251 // TODO(reveman): Create this queue when building the task graph instead.
254 for (GraphNodeMap::iterator it = new_pending_tasks.begin(); 252 for (GraphNodeMap::iterator it = new_pending_tasks.begin();
255 it != new_pending_tasks.end(); ++it) { 253 it != new_pending_tasks.end(); ++it) {
256 internal::WorkerPoolTask* task = it->first; 254 internal::WorkerPoolTask* task = it->first;
257 DCHECK(task); 255 DCHECK(task);
258 internal::GraphNode* node = it->second; 256 internal::GraphNode* node = it->second;
259 257
260 // Completed tasks should not exist in |new_pending_tasks|. 258 // Completed tasks should not exist in |new_pending_tasks|.
261 DCHECK(!task->HasFinishedRunning()); 259 DCHECK(!task->HasFinishedRunning());
262 260
263 // Call DidSchedule() to indicate that this task has been scheduled. 261 // Call DidSchedule() to indicate that this task has been scheduled.
264 // Note: This is only for debugging purposes. 262 // Note: This is only for debugging purposes.
265 task->DidSchedule(); 263 task->DidSchedule();
266 264
267 if (!node->num_dependencies()) 265 if (!node->num_dependencies()) {
reveman 2013/12/14 05:22:40 please avoid small changes like this as it makes i
sohanjg 2013/12/16 10:23:36 Done.
268 new_ready_to_run_tasks.push(node); 266 new_ready_to_run_tasks.push(node);
267 }
268 // Erase the task from old pending tasks.
269 tasks_[worker_pool]->pending_tasks_.erase(task);
269 270
270 // Erase the task from old pending tasks.
271 pending_tasks_.erase(task);
272 } 271 }
273 272
274 completed_tasks_.reserve(completed_tasks_.size() + pending_tasks_.size()); 273 new_completed_tasks.reserve(
274 new_completed_tasks.size() +
275 tasks_[worker_pool]->pending_tasks_.size());
275 276
276 // The items left in |pending_tasks_| need to be canceled. 277 // The items left in |pending_tasks_| need to be canceled.
277 for (GraphNodeMap::const_iterator it = pending_tasks_.begin(); 278 for (GraphNodeMap::const_iterator it =
278 it != pending_tasks_.end(); 279 tasks_[worker_pool]->pending_tasks_.begin();
279 ++it) { 280 it != tasks_[worker_pool]->pending_tasks_.end();
280 completed_tasks_.push_back(it->first); 281 ++it) {
282 new_completed_tasks.push_back(it->first);
281 } 283 }
282 284
285
283 // Swap task sets. 286 // Swap task sets.
284 // Note: old tasks are intentionally destroyed after releasing |lock_|. 287 // Note: old tasks are intentionally destroyed after releasing |lock_|.
285 pending_tasks_.swap(new_pending_tasks); 288 tasks_[worker_pool]->pending_tasks_.swap(new_pending_tasks);
286 running_tasks_.swap(new_running_tasks); 289 tasks_[worker_pool]->running_tasks_.swap(new_running_tasks);
287 std::swap(ready_to_run_tasks_, new_ready_to_run_tasks); 290 std::swap(tasks_[worker_pool]->ready_to_run_tasks_, new_ready_to_run_tasks);
291
292 if (!shared_ready_to_run_tasks_.empty())
293 shared_ready_to_run_tasks_.pop();
294 shared_ready_to_run_tasks_.push(tasks_[worker_pool]);
reveman 2013/12/14 05:22:40 this doesn't seem right. what are you trying to do
sohanjg 2013/12/16 10:23:36 Done. The idea was to recreate the shared_ready_t
288 295
289 // If |ready_to_run_tasks_| is empty, it means we either have 296 // If |ready_to_run_tasks_| is empty, it means we either have
290 // running tasks, or we have no pending tasks. 297 // running tasks, or we have no pending tasks.
291 DCHECK(!ready_to_run_tasks_.empty() || 298 DCHECK(!tasks_[worker_pool]->ready_to_run_tasks_.empty() ||
292 (pending_tasks_.empty() || !running_tasks_.empty())); 299 (tasks_[worker_pool]->pending_tasks_.empty() ||
300 !tasks_[worker_pool]->running_tasks_.empty()));
293 301
294 // If there is more work available, wake up worker thread. 302 // If there is more work available, wake up worker thread.
295 if (!ready_to_run_tasks_.empty()) 303 if (!tasks_[worker_pool]->ready_to_run_tasks_.empty())
296 has_ready_to_run_tasks_cv_.Signal(); 304 has_ready_to_run_tasks_cv_.Signal();
297 } 305 }
298 } 306 }
299 307
300 void WorkerPool::Inner::CollectCompletedTasks(TaskVector* completed_tasks) { 308 void WorkerInner::CollectCompletedTasks
309 (TaskVector* completed_tasks, WorkerPool* worker_pool) {
301 base::AutoLock lock(lock_); 310 base::AutoLock lock(lock_);
302 311
303 DCHECK_EQ(0u, completed_tasks->size()); 312 DCHECK_EQ(0u, completed_tasks->size());
304 completed_tasks->swap(completed_tasks_); 313 if (!shared_ready_to_run_tasks_.empty())
314 completed_tasks->swap(shared_ready_to_run_tasks_.top()->completed_tasks_);
315
305 } 316 }
306 317
307 void WorkerPool::Inner::Run() { 318 void WorkerInner::Run() {
308 base::AutoLock lock(lock_); 319 base::AutoLock lock(lock_);
309 320
310 // Get a unique thread index. 321 // Get a unique thread index.
311 int thread_index = next_thread_index_++; 322 int thread_index = next_thread_index_++;
323 TaskSet* ready_to_run_task_set = NULL;
312 324
313 while (true) { 325 while (true) {
314 if (ready_to_run_tasks_.empty()) {
315 // Exit when shutdown is set and no more tasks are pending.
316 if (shutdown_ && pending_tasks_.empty())
317 break;
318 326
319 // Wait for more tasks. 327 if (shared_ready_to_run_tasks_.empty()) {
320 has_ready_to_run_tasks_cv_.Wait(); 328 // Exit when shutdown is set and no more tasks are pending.
321 continue; 329 if (shutdown_ && ready_to_run_task_set &&
330 ready_to_run_task_set->pending_tasks_.empty()) {
331 break;
332 }
333 // Wait for more tasks.
334 has_ready_to_run_tasks_cv_.Wait();
335 continue;
322 } 336 }
323 337
338
339 // Take top priority TaskSet from |shared_ready_to_run_tasks_|.
340 ready_to_run_task_set = shared_ready_to_run_tasks_.top();
341
342
324 // Take top priority task from |ready_to_run_tasks_|. 343 // Take top priority task from |ready_to_run_tasks_|.
325 scoped_refptr<internal::WorkerPoolTask> task( 344 scoped_refptr<internal::WorkerPoolTask> task(
326 ready_to_run_tasks_.top()->task()); 345 ready_to_run_task_set->ready_to_run_tasks_.top()->task());
327 ready_to_run_tasks_.pop(); 346 ready_to_run_task_set->ready_to_run_tasks_.pop();
347
328 348
329 // Move task from |pending_tasks_| to |running_tasks_|. 349 // Move task from |pending_tasks_| to |running_tasks_|.
330 DCHECK(pending_tasks_.contains(task.get())); 350 DCHECK(ready_to_run_task_set->pending_tasks_.contains(task.get()));
331 DCHECK(!running_tasks_.contains(task.get())); 351 DCHECK(!ready_to_run_task_set->running_tasks_.contains(task.get()));
332 running_tasks_.set(task.get(), pending_tasks_.take_and_erase(task.get())); 352
353 ready_to_run_task_set->running_tasks_.set(
354 task.get(), ready_to_run_task_set->pending_tasks_.take_and_erase
355 (task.get()));
333 356
334 // There may be more work available, so wake up another worker thread. 357 // There may be more work available, so wake up another worker thread.
335 has_ready_to_run_tasks_cv_.Signal(); 358 has_ready_to_run_tasks_cv_.Signal();
336 359
360
337 // Call WillRun() before releasing |lock_| and running task. 361 // Call WillRun() before releasing |lock_| and running task.
338 task->WillRun(); 362 task->WillRun();
339 363
340 { 364 {
341 base::AutoUnlock unlock(lock_); 365 base::AutoUnlock unlock(lock_);
342
343 task->RunOnWorkerThread(thread_index); 366 task->RunOnWorkerThread(thread_index);
344 } 367 }
345 368
346 // This will mark task as finished running. 369 // This will mark task as finished running.
347 task->DidRun(); 370 task->DidRun();
348 371
349 // Now iterate over all dependents to remove dependency and check 372 // Now iterate over all dependents to remove dependency and check
350 // if they are ready to run. 373 // if they are ready to run.
351 scoped_ptr<internal::GraphNode> node = running_tasks_.take_and_erase( 374 scoped_ptr<internal::GraphNode> node =
375 ready_to_run_task_set->running_tasks_.take_and_erase(
352 task.get()); 376 task.get());
353 if (node) { 377 if (node) {
354 for (internal::GraphNode::Vector::const_iterator it = 378 for (internal::GraphNode::Vector::const_iterator it =
355 node->dependents().begin(); 379 node->dependents().begin();
356 it != node->dependents().end(); ++it) { 380 it != node->dependents().end(); ++it) {
357 internal::GraphNode* dependent_node = *it; 381 internal::GraphNode* dependent_node = *it;
358 382
359 dependent_node->remove_dependency(); 383 dependent_node->remove_dependency();
360 // Task is ready if it has no dependencies. Add it to 384 // Task is ready if it has no dependencies. Add it to
361 // |ready_to_run_tasks_|. 385 // |ready_to_run_tasks_|.
362 if (!dependent_node->num_dependencies()) 386 if (!dependent_node->num_dependencies())
363 ready_to_run_tasks_.push(dependent_node); 387 ready_to_run_task_set->ready_to_run_tasks_.push(dependent_node);
388 }
364 } 389 }
390
391 // Finally add task to |completed_tasks_|.
392 ready_to_run_task_set->completed_tasks_.push_back(task);
393
394 // Pop when ready_to_run_tasks_ is empty
395 if (ready_to_run_task_set->ready_to_run_tasks_.empty()) {
396 shared_ready_to_run_tasks_.pop();
365 } 397 }
366 398
367 // Finally add task to |completed_tasks_|.
368 completed_tasks_.push_back(task);
369 } 399 }
370 400
371 // We noticed we should exit. Wake up the next worker so it knows it should 401 // We noticed we should exit. Wake up the next worker so it knows it should
372 // exit as well (because the Shutdown() code only signals once). 402 // exit as well (because the Shutdown() code only signals once).
373 has_ready_to_run_tasks_cv_.Signal(); 403 has_ready_to_run_tasks_cv_.Signal();
374 } 404 }
375 405
406 // Derived WorkerInner Ctor
407 DerivedInner::DerivedInner(): WorkerInner
408 (switches::GetNumRasterThreads(), "CompositorRaster") {
409 }
410
411 } // namespace
412
413 namespace internal {
414
415 WorkerPoolTask::WorkerPoolTask()
416 : did_schedule_(false),
417 did_run_(false),
418 did_complete_(false) {
419 }
420
421 WorkerPoolTask::~WorkerPoolTask() {
422 DCHECK_EQ(did_schedule_, did_complete_);
423 DCHECK(!did_run_ || did_schedule_);
424 DCHECK(!did_run_ || did_complete_);
425 }
426
427 void WorkerPoolTask::DidSchedule() {
428 DCHECK(!did_complete_);
429 did_schedule_ = true;
430 }
431
432 void WorkerPoolTask::WillRun() {
433 DCHECK(did_schedule_);
434 DCHECK(!did_complete_);
435 DCHECK(!did_run_);
436 }
437
438 void WorkerPoolTask::DidRun() {
439 did_run_ = true;
440 }
441
442 void WorkerPoolTask::WillComplete() {
443 DCHECK(!did_complete_);
444 }
445
446 void WorkerPoolTask::DidComplete() {
447 DCHECK(did_schedule_);
448 DCHECK(!did_complete_);
449 did_complete_ = true;
450 }
451
452 bool WorkerPoolTask::HasFinishedRunning() const {
453 return did_run_;
454 }
455
456 bool WorkerPoolTask::HasCompleted() const {
457 return did_complete_;
458 }
459
460 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
461 : task_(task),
462 priority_(priority),
463 num_dependencies_(0) {
464 }
465
466 GraphNode::~GraphNode() {
467 }
468
469 } // namespace internal
470
471
376 WorkerPool::WorkerPool(size_t num_threads, 472 WorkerPool::WorkerPool(size_t num_threads,
377 const std::string& thread_name_prefix) 473 const std::string& thread_name_prefix)
378 : in_dispatch_completion_callbacks_(false), 474 : in_dispatch_completion_callbacks_(false) {
379 inner_(make_scoped_ptr(new Inner(num_threads, thread_name_prefix))) {
380 } 475 }
381 476
382 WorkerPool::~WorkerPool() { 477 WorkerPool::~WorkerPool() {
383 } 478 }
384 479
385 void WorkerPool::Shutdown() { 480 void WorkerPool::Shutdown() {
386 TRACE_EVENT0("cc", "WorkerPool::Shutdown"); 481 TRACE_EVENT0("cc", "WorkerPool::Shutdown");
387 482
388 DCHECK(!in_dispatch_completion_callbacks_); 483 DCHECK(!in_dispatch_completion_callbacks_);
389 484 g_workerpool_inner.Pointer()->Shutdown();
390 inner_->Shutdown();
391 } 485 }
392 486
393 void WorkerPool::CheckForCompletedTasks() { 487 void WorkerPool::CheckForCompletedTasks() {
394 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); 488 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
395 489
396 DCHECK(!in_dispatch_completion_callbacks_); 490 DCHECK(!in_dispatch_completion_callbacks_);
397 491
398 TaskVector completed_tasks; 492 TaskVector completed_tasks;
399 inner_->CollectCompletedTasks(&completed_tasks); 493 g_workerpool_inner.Pointer()->CollectCompletedTasks(&completed_tasks, this);
400 ProcessCompletedTasks(completed_tasks); 494 ProcessCompletedTasks(completed_tasks);
401 } 495 }
402 496
403 void WorkerPool::ProcessCompletedTasks( 497 void WorkerPool::ProcessCompletedTasks(
404 const TaskVector& completed_tasks) { 498 const TaskVector& completed_tasks) {
405 TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks", 499 TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks",
406 "completed_task_count", completed_tasks.size()); 500 "completed_task_count", completed_tasks.size());
407 501
408 // Worker pool instance is not reentrant while processing completed tasks. 502 // Worker pool instance is not reentrant while processing completed tasks.
409 in_dispatch_completion_callbacks_ = true; 503 in_dispatch_completion_callbacks_ = true;
410 504
411 for (TaskVector::const_iterator it = completed_tasks.begin(); 505 for (TaskVector::const_iterator it = completed_tasks.begin();
412 it != completed_tasks.end(); 506 it != completed_tasks.end();
413 ++it) { 507 ++it) {
414 internal::WorkerPoolTask* task = it->get(); 508 internal::WorkerPoolTask* task = it->get();
415 509
416 task->WillComplete(); 510 task->WillComplete();
417 task->CompleteOnOriginThread(); 511 task->CompleteOnOriginThread();
418 task->DidComplete(); 512 task->DidComplete();
419 } 513 }
420 514
421 in_dispatch_completion_callbacks_ = false; 515 in_dispatch_completion_callbacks_ = false;
422 } 516 }
423 517
424 void WorkerPool::SetTaskGraph(TaskGraph* graph) { 518 void WorkerPool::SetTaskGraph(TaskGraph* graph) {
425 TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph", 519 TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph",
426 "num_tasks", graph->size()); 520 "num_tasks", graph->size());
427 521
428 DCHECK(!in_dispatch_completion_callbacks_); 522 DCHECK(!in_dispatch_completion_callbacks_);
429 523 g_workerpool_inner.Pointer()->SetTaskGraph(graph, this);
430 inner_->SetTaskGraph(graph);
431 } 524 }
432 525
433 } // namespace cc 526 } // namespace cc
OLDNEW
« no previous file with comments | « cc/resources/worker_pool.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698