Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(176)

Side by Side Diff: cc/resources/worker_pool.cc

Issue 73923003: Shared Raster Worker Threads (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: WIP - Mapped Task Pool for Worker Threads Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « cc/resources/worker_pool.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "cc/resources/worker_pool.h" 5 #include "cc/resources/worker_pool.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <queue> 8 #include <queue>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/command_line.h"
11 #include "base/containers/hash_tables.h" 12 #include "base/containers/hash_tables.h"
12 #include "base/debug/trace_event.h" 13 #include "base/debug/trace_event.h"
14 #include "base/lazy_instance.h"
13 #include "base/strings/stringprintf.h" 15 #include "base/strings/stringprintf.h"
14 #include "base/synchronization/condition_variable.h" 16 #include "base/synchronization/condition_variable.h"
15 #include "base/threading/simple_thread.h" 17 #include "base/threading/simple_thread.h"
16 #include "base/threading/thread_restrictions.h" 18 #include "base/threading/thread_restrictions.h"
17 #include "cc/base/scoped_ptr_deque.h" 19 #include "cc/base/scoped_ptr_deque.h"
20 #include "cc/base/switches.h"
18 21
19 namespace cc { 22 namespace cc {
20 23
21 namespace internal { 24 namespace {
22
23 WorkerPoolTask::WorkerPoolTask()
24 : did_schedule_(false),
25 did_run_(false),
26 did_complete_(false) {
27 }
28
29 WorkerPoolTask::~WorkerPoolTask() {
30 DCHECK_EQ(did_schedule_, did_complete_);
31 DCHECK(!did_run_ || did_schedule_);
32 DCHECK(!did_run_ || did_complete_);
33 }
34
35 void WorkerPoolTask::DidSchedule() {
36 DCHECK(!did_complete_);
37 did_schedule_ = true;
38 }
39
40 void WorkerPoolTask::WillRun() {
41 DCHECK(did_schedule_);
42 DCHECK(!did_complete_);
43 DCHECK(!did_run_);
44 }
45
46 void WorkerPoolTask::DidRun() {
47 did_run_ = true;
48 }
49
50 void WorkerPoolTask::WillComplete() {
51 DCHECK(!did_complete_);
52 }
53
54 void WorkerPoolTask::DidComplete() {
55 DCHECK(did_schedule_);
56 DCHECK(!did_complete_);
57 did_complete_ = true;
58 }
59
60 bool WorkerPoolTask::HasFinishedRunning() const {
61 return did_run_;
62 }
63
64 bool WorkerPoolTask::HasCompleted() const {
65 return did_complete_;
66 }
67
68 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
69 : task_(task),
70 priority_(priority),
71 num_dependencies_(0) {
72 }
73
74 GraphNode::~GraphNode() {
75 }
76
77 } // namespace internal
78 25
79 // Internal to the worker pool. Any data or logic that needs to be 26 // Internal to the worker pool. Any data or logic that needs to be
80 // shared between threads lives in this class. All members are guarded 27 // shared between threads lives in this class. All members are guarded
81 // by |lock_|. 28 // by |lock_|.
82 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { 29 class WorkerInner : public base::DelegateSimpleThread::Delegate {
83 public: 30 public:
84 Inner(size_t num_threads, const std::string& thread_name_prefix); 31 WorkerInner(size_t num_threads, const std::string& thread_name_prefix);
85 virtual ~Inner(); 32 virtual ~WorkerInner();
86 33
87 void Shutdown(); 34 void Shutdown();
88 35
36 typedef base::ScopedPtrHashMap<internal::WorkerPoolTask*, internal::GraphNode>
37 GraphNodeMap;
38 typedef GraphNodeMap TaskGraph;
39 typedef base::ScopedPtrHashMap<WorkerPool*, GraphNodeMap>
40 TaskMap;
41 typedef std::vector<scoped_refptr<internal::WorkerPoolTask> > TaskVector;
42
89 // Schedule running of tasks in |graph|. Tasks previously scheduled but 43 // Schedule running of tasks in |graph|. Tasks previously scheduled but
90 // no longer needed will be canceled unless already running. Canceled 44 // no longer needed will be canceled unless already running. Canceled
91 // tasks are moved to |completed_tasks_| without being run. The result 45 // tasks are moved to |completed_tasks_| without being run. The result
92 // is that once scheduled, a task is guaranteed to end up in the 46 // is that once scheduled, a task is guaranteed to end up in the
93 // |completed_tasks_| queue even if they later get canceled by another 47 // |completed_tasks_| queue even if they later get canceled by another
94 // call to SetTaskGraph(). 48 // call to SetTaskGraph().
95 void SetTaskGraph(TaskGraph* graph); 49
50 void SetTaskGraph(TaskGraph* graph, WorkerPool* worker_pool);
96 51
97 // Collect all completed tasks in |completed_tasks|. 52 // Collect all completed tasks in |completed_tasks|.
98 void CollectCompletedTasks(TaskVector* completed_tasks); 53 void CollectCompletedTasks(TaskVector* completed_tasks,
54 WorkerPool* worker_pool);
55
56 typedef std::map<const WorkerPool*, TaskVector> TaskVectorMap;
57 typedef std::map<const WorkerPool*, GraphNodeMap*> GraphNodeMapper;
58 typedef std::map<internal::GraphNode*, internal::WorkerPoolTask*> CheckMap;
59
60 TaskVectorMap completed_tasks_pool_;
61 GraphNodeMapper pending_tasks_pool_;
62 GraphNodeMapper running_tasks_pool_;
63
64 // Temp code to get proper Task from Graph Node
65 CheckMap c_map_;
99 66
100 private: 67 private:
101 class PriorityComparator { 68 class PriorityComparator {
102 public: 69 public:
103 bool operator()(const internal::GraphNode* a, 70 bool operator()(const internal::GraphNode* a,
104 const internal::GraphNode* b) { 71 const internal::GraphNode* b) {
105 // In this system, numerically lower priority is run first. 72 // In this system, numerically lower priority is run first.
106 if (a->priority() != b->priority()) 73 if (a->priority() != b->priority())
107 return a->priority() > b->priority(); 74 return a->priority() > b->priority();
108 75
(...skipping 15 matching lines...) Expand all
124 base::ConditionVariable has_ready_to_run_tasks_cv_; 91 base::ConditionVariable has_ready_to_run_tasks_cv_;
125 92
126 // Provides each running thread loop with a unique index. First thread 93 // Provides each running thread loop with a unique index. First thread
127 // loop index is 0. 94 // loop index is 0.
128 unsigned next_thread_index_; 95 unsigned next_thread_index_;
129 96
130 // Set during shutdown. Tells workers to exit when no more tasks 97 // Set during shutdown. Tells workers to exit when no more tasks
131 // are pending. 98 // are pending.
132 bool shutdown_; 99 bool shutdown_;
133 100
134 // This set contains all pending tasks.
135 GraphNodeMap pending_tasks_;
136
137 // Ordered set of tasks that are ready to run. 101 // Ordered set of tasks that are ready to run.
138 typedef std::priority_queue<internal::GraphNode*, 102 typedef std::priority_queue<internal::GraphNode*,
139 std::vector<internal::GraphNode*>, 103 std::vector<internal::GraphNode*>,
140 PriorityComparator> TaskQueue; 104 PriorityComparator> TaskQueue;
141 TaskQueue ready_to_run_tasks_; 105 TaskQueue ready_to_run_tasks_;
142 106
143 // This set contains all currently running tasks.
144 GraphNodeMap running_tasks_;
145
146 // Completed tasks not yet collected by origin thread.
147 TaskVector completed_tasks_;
148
149 ScopedPtrDeque<base::DelegateSimpleThread> workers_; 107 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
150 108
151 DISALLOW_COPY_AND_ASSIGN(Inner); 109 DISALLOW_COPY_AND_ASSIGN(WorkerInner);
152 }; 110 };
153 111
154 WorkerPool::Inner::Inner( 112 class CC_EXPORT DerivedInner : public WorkerInner {
113 public:
114 DerivedInner();
115 };
116
117 base::LazyInstance<DerivedInner> g_workerpool_inner;
118
119
120 WorkerInner::WorkerInner(
155 size_t num_threads, const std::string& thread_name_prefix) 121 size_t num_threads, const std::string& thread_name_prefix)
156 : lock_(), 122 : lock_(),
157 has_ready_to_run_tasks_cv_(&lock_), 123 has_ready_to_run_tasks_cv_(&lock_),
158 next_thread_index_(0), 124 next_thread_index_(0),
159 shutdown_(false) { 125 shutdown_(false) {
160 base::AutoLock lock(lock_); 126 base::AutoLock lock(lock_);
161 127
162 while (workers_.size() < num_threads) { 128 while (workers_.size() < num_threads) {
163 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 129 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
164 new base::DelegateSimpleThread( 130 new base::DelegateSimpleThread(
165 this, 131 this,
166 thread_name_prefix + 132 thread_name_prefix +
167 base::StringPrintf( 133 base::StringPrintf(
168 "Worker%u", 134 "Worker%u",
169 static_cast<unsigned>(workers_.size() + 1)).c_str())); 135 static_cast<unsigned>(workers_.size() + 1)).c_str()));
170 worker->Start(); 136 worker->Start();
171 #if defined(OS_ANDROID) || defined(OS_LINUX) 137 #if defined(OS_ANDROID) || defined(OS_LINUX)
172 worker->SetThreadPriority(base::kThreadPriority_Background); 138 worker->SetThreadPriority(base::kThreadPriority_Background);
173 #endif 139 #endif
174 workers_.push_back(worker.Pass()); 140 workers_.push_back(worker.Pass());
175 } 141 }
176 } 142 }
177 143
178 WorkerPool::Inner::~Inner() { 144 WorkerInner::~WorkerInner() {
179 base::AutoLock lock(lock_); 145 base::AutoLock lock(lock_);
180 146
181 DCHECK(shutdown_); 147 DCHECK(shutdown_);
182 148 // DCHECK_EQ(0u, wp_->pending_tasks_.size());
183 DCHECK_EQ(0u, pending_tasks_.size());
184 DCHECK_EQ(0u, ready_to_run_tasks_.size()); 149 DCHECK_EQ(0u, ready_to_run_tasks_.size());
185 DCHECK_EQ(0u, running_tasks_.size()); 150 // DCHECK_EQ(0u, wp_->running_tasks_.size());
186 DCHECK_EQ(0u, completed_tasks_.size()); 151 // DCHECK_EQ(0u, wp_->completed_tasks_.size());
187 } 152 }
188 153
189 void WorkerPool::Inner::Shutdown() { 154 void WorkerInner::Shutdown() {
190 { 155 {
191 base::AutoLock lock(lock_); 156 base::AutoLock lock(lock_);
192 157
193 DCHECK(!shutdown_); 158 DCHECK(!shutdown_);
194 shutdown_ = true; 159 shutdown_ = true;
195 160
196 // Wake up a worker so it knows it should exit. This will cause all workers 161 // Wake up a worker so it knows it should exit. This will cause all workers
197 // to exit as each will wake up another worker before exiting. 162 // to exit as each will wake up another worker before exiting.
198 has_ready_to_run_tasks_cv_.Signal(); 163 has_ready_to_run_tasks_cv_.Signal();
199 } 164 }
200 165
201 while (workers_.size()) { 166 while (workers_.size()) {
202 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 167 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
203 // http://crbug.com/240453 - Join() is considered IO and will block this 168 // http://crbug.com/240453 - Join() is considered IO and will block this
204 // thread. See also http://crbug.com/239423 for further ideas. 169 // thread. See also http://crbug.com/239423 for further ideas.
205 base::ThreadRestrictions::ScopedAllowIO allow_io; 170 base::ThreadRestrictions::ScopedAllowIO allow_io;
206 worker->Join(); 171 worker->Join();
207 } 172 }
208 } 173 }
209 174
210 void WorkerPool::Inner::SetTaskGraph(TaskGraph* graph) { 175 void WorkerInner::SetTaskGraph(TaskGraph* graph, WorkerPool* worker_pool) {
211 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty. 176 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty.
212 DCHECK(graph->empty() || !shutdown_); 177 DCHECK(graph->empty() || !shutdown_);
213 178
214 GraphNodeMap new_pending_tasks; 179 GraphNodeMap new_pending_tasks;
215 GraphNodeMap new_running_tasks; 180 GraphNodeMap new_running_tasks;
216 TaskQueue new_ready_to_run_tasks; 181 TaskQueue new_ready_to_run_tasks;
182 TaskVector temp_completed_tasks_;
217 183
218 new_pending_tasks.swap(*graph); 184 new_pending_tasks.swap(*graph);
219 185
220 { 186 {
221 base::AutoLock lock(lock_); 187 base::AutoLock lock(lock_);
222 188
223 // First remove all completed tasks from |new_pending_tasks| and 189 // Create Running task Map
224 // adjust number of dependencies. 190 if (running_tasks_pool_.count(worker_pool) == 0)
225 for (TaskVector::iterator it = completed_tasks_.begin(); 191 running_tasks_pool_.insert(std::pair<const WorkerPool*, GraphNodeMap*>
226 it != completed_tasks_.end(); ++it) { 192 (worker_pool, &new_running_tasks));
227 internal::WorkerPoolTask* task = it->get();
228 193
229 scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase( 194 // Create Pending task Map
230 task); 195 if (pending_tasks_pool_.count(worker_pool) == 0)
231 if (node) { 196 pending_tasks_pool_.insert(std::pair<const WorkerPool*, GraphNodeMap*>
232 for (internal::GraphNode::Vector::const_iterator it = 197 (worker_pool, graph));
233 node->dependents().begin(); 198
234 it != node->dependents().end(); ++it) { 199 // wp_ = worker_pool;
235 internal::GraphNode* dependent_node = *it; 200 if (completed_tasks_pool_[worker_pool].size() > 0) {
236 dependent_node->remove_dependency(); 201 // First remove all completed tasks from |new_pending_tasks| and
202 // adjust number of dependencies.
203 for (TaskVector::iterator it = completed_tasks_pool_[worker_pool].begin();
204 it != completed_tasks_pool_[worker_pool].end(); ++it) {
205 internal::WorkerPoolTask* task = it->get();
206 scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase(
207 task);
208 if (node) {
209 for (internal::GraphNode::Vector::const_iterator it =
210 node->dependents().begin();
211 it != node->dependents().end(); ++it) {
212 internal::GraphNode* dependent_node = *it;
213 dependent_node->remove_dependency();
214 }
237 } 215 }
238 } 216 }
239 } 217 }
240 218
241 // Build new running task set. 219 if (running_tasks_pool_[worker_pool] ->size() > 0) {
242 for (GraphNodeMap::iterator it = running_tasks_.begin(); 220 // Build new running task set.
243 it != running_tasks_.end(); ++it) { 221 for (GraphNodeMap::iterator it =
244 internal::WorkerPoolTask* task = it->first; 222 running_tasks_pool_[worker_pool]->begin(); it !=
245 // Transfer scheduled task value from |new_pending_tasks| to 223 running_tasks_pool_[worker_pool]->end(); ++it) {
246 // |new_running_tasks| if currently running. Value must be set to 224 internal::WorkerPoolTask* task = it->first;
247 // NULL if |new_pending_tasks| doesn't contain task. This does 225 // Transfer scheduled task value from |new_pending_tasks| to
248 // the right in both cases. 226 // |new_running_tasks| if currently running. Value must be set to
249 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); 227 // NULL if |new_pending_tasks| doesn't contain task. This does
228 // the right in both cases.
229 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
230 }
250 } 231 }
251 232
252 // Build new "ready to run" tasks queue. 233 // Build new "ready to run" tasks queue.
reveman 2013/12/04 16:25:50 new_ready_to_run_tasks need to also include the ta
253 // TODO(reveman): Create this queue when building the task graph instead. 234 // TODO(reveman): Create this queue when building the task graph instead.
254 for (GraphNodeMap::iterator it = new_pending_tasks.begin(); 235 for (GraphNodeMap::iterator it = new_pending_tasks.begin();
255 it != new_pending_tasks.end(); ++it) { 236 it != new_pending_tasks.end(); ++it) {
256 internal::WorkerPoolTask* task = it->first; 237 internal::WorkerPoolTask* task = it->first;
257 DCHECK(task); 238 DCHECK(task);
258 internal::GraphNode* node = it->second; 239 internal::GraphNode* node = it->second;
259 240
260 // Completed tasks should not exist in |new_pending_tasks|. 241 // Completed tasks should not exist in |new_pending_tasks|.
261 DCHECK(!task->HasFinishedRunning()); 242 DCHECK(!task->HasFinishedRunning());
262 243
263 // Call DidSchedule() to indicate that this task has been scheduled. 244 // Call DidSchedule() to indicate that this task has been scheduled.
264 // Note: This is only for debugging purposes. 245 // Note: This is only for debugging purposes.
265 task->DidSchedule(); 246 task->DidSchedule();
266 247
267 if (!node->num_dependencies()) 248 if (!node->num_dependencies()) {
249 // node->setTask(task);
268 new_ready_to_run_tasks.push(node); 250 new_ready_to_run_tasks.push(node);
269 251 // c_map_.insert(std::pair<internal::GraphNode*, internal::WorkerPoolTas k*>(node, task));
252 }
270 // Erase the task from old pending tasks. 253 // Erase the task from old pending tasks.
271 pending_tasks_.erase(task); 254 pending_tasks_pool_[worker_pool]->erase(task);
272 } 255 }
273 256
274 completed_tasks_.reserve(completed_tasks_.size() + pending_tasks_.size()); 257 if (pending_tasks_pool_[worker_pool] ->size() > 0)
258 completed_tasks_pool_[worker_pool].reserve(completed_tasks_pool_[worker_po ol].size() + pending_tasks_pool_[worker_pool]->size());
259 else
260 completed_tasks_pool_[worker_pool].reserve(completed_tasks_pool_[worker_po ol].size());
275 261
276 // The items left in |pending_tasks_| need to be canceled. 262 if (pending_tasks_pool_[worker_pool] ->size() > 0) {
277 for (GraphNodeMap::const_iterator it = pending_tasks_.begin(); 263 // The items left in |pending_tasks_| need to be canceled.
278 it != pending_tasks_.end(); 264 for (GraphNodeMap::const_iterator it = pending_tasks_pool_[worker_pool]->b egin();
279 ++it) { 265 it != pending_tasks_pool_[worker_pool]->end();
280 completed_tasks_.push_back(it->first); 266 ++it) {
267 // completed_tasks_.push_back(it->first);
268 completed_tasks_pool_[worker_pool].push_back(it->first);
269 }
281 } 270 }
282 271
283 // Swap task sets. 272 // Swap task sets.
284 // Note: old tasks are intentionally destroyed after releasing |lock_|. 273 // Note: old tasks are intentionally destroyed after releasing |lock_|.
285 pending_tasks_.swap(new_pending_tasks); 274 pending_tasks_pool_[worker_pool]->swap(new_pending_tasks);
286 running_tasks_.swap(new_running_tasks); 275 running_tasks_pool_[worker_pool]->swap(new_running_tasks);
287 std::swap(ready_to_run_tasks_, new_ready_to_run_tasks); 276 std::swap(ready_to_run_tasks_, new_ready_to_run_tasks);
288 277
289 // If |ready_to_run_tasks_| is empty, it means we either have 278 // If |ready_to_run_tasks_| is empty, it means we either have
290 // running tasks, or we have no pending tasks. 279 // running tasks, or we have no pending tasks.
291 DCHECK(!ready_to_run_tasks_.empty() || 280 DCHECK(!ready_to_run_tasks_.empty() ||
292 (pending_tasks_.empty() || !running_tasks_.empty())); 281 (pending_tasks_pool_[worker_pool]->empty() ||
282 !running_tasks_pool_[worker_pool]->empty()));
293 283
294 // If there is more work available, wake up worker thread. 284 // If there is more work available, wake up worker thread.
295 if (!ready_to_run_tasks_.empty()) 285 if (!ready_to_run_tasks_.empty())
296 has_ready_to_run_tasks_cv_.Signal(); 286 has_ready_to_run_tasks_cv_.Signal();
297 } 287 }
298 } 288 }
299 289
300 void WorkerPool::Inner::CollectCompletedTasks(TaskVector* completed_tasks) { 290 void WorkerInner::CollectCompletedTasks
291 (TaskVector* completed_tasks, WorkerPool* worker_pool) {
301 base::AutoLock lock(lock_); 292 base::AutoLock lock(lock_);
302 293
303 DCHECK_EQ(0u, completed_tasks->size()); 294 DCHECK_EQ(0u, completed_tasks->size());
304 completed_tasks->swap(completed_tasks_); 295 completed_tasks->swap(completed_tasks_pool_[worker_pool]);
305 } 296 }
306 297
307 void WorkerPool::Inner::Run() { 298 void WorkerInner::Run() {
308 base::AutoLock lock(lock_); 299 base::AutoLock lock(lock_);
309 300
310 // Get a unique thread index. 301 // Get a unique thread index.
311 int thread_index = next_thread_index_++; 302 int thread_index = next_thread_index_++;
312 303
313 while (true) { 304 while (true) {
314 if (ready_to_run_tasks_.empty()) { 305 if (ready_to_run_tasks_.empty()) {
315 // Exit when shutdown is set and no more tasks are pending. 306 // Exit when shutdown is set and no more tasks are pending.
316 if (shutdown_ && pending_tasks_.empty()) 307 // if (shutdown_ && pending_tasks_.empty())
308 if (shutdown_ && pending_tasks_pool_.empty())
317 break; 309 break;
318 310
319 // Wait for more tasks. 311 // Wait for more tasks.
320 has_ready_to_run_tasks_cv_.Wait(); 312 has_ready_to_run_tasks_cv_.Wait();
321 continue; 313 continue;
322 } 314 }
315 // scoped_refptr<internal::WorkerPoolTask> task(c_map_.find(ready_to_run_tas ks_.top())->second);
316 // c_map_.erase (c_map_.find(ready_to_run_tasks_.top()));
323 317
324 // Take top priority task from |ready_to_run_tasks_|. 318 // Take top priority task from |ready_to_run_tasks_|.
325 scoped_refptr<internal::WorkerPoolTask> task( 319 scoped_refptr<internal::WorkerPoolTask> task(
326 ready_to_run_tasks_.top()->task()); 320 ready_to_run_tasks_.top()->task());
327 ready_to_run_tasks_.pop(); 321 ready_to_run_tasks_.pop();
328 322
323 const WorkerPool* worker_pool;
324 // Iterate in Pending Task Map to find task
reveman 2013/12/04 16:25:50 We can't afford this. You need to get the worker p
325 GraphNodeMap curr_pending_tasks;
326 for (GraphNodeMapper::iterator it = pending_tasks_pool_.begin();
327 it != pending_tasks_pool_.end(); ++it) {
328 curr_pending_tasks.swap(*it->second);
329 if (curr_pending_tasks.contains(task.get())) {
330 worker_pool = it->first;
331 break;
332 }
333 }
334
329 // Move task from |pending_tasks_| to |running_tasks_|. 335 // Move task from |pending_tasks_| to |running_tasks_|.
330 DCHECK(pending_tasks_.contains(task.get())); 336 DCHECK(pending_tasks_pool_[worker_pool]->contains(task.get()));
331 DCHECK(!running_tasks_.contains(task.get())); 337 DCHECK(!running_tasks_pool_[worker_pool]->contains(task.get()));
332 running_tasks_.set(task.get(), pending_tasks_.take_and_erase(task.get())); 338
339 running_tasks_pool_[worker_pool]->set(task.get(), pending_tasks_pool_[worker _pool]->take_and_erase(task.get()));
333 340
334 // There may be more work available, so wake up another worker thread. 341 // There may be more work available, so wake up another worker thread.
335 has_ready_to_run_tasks_cv_.Signal(); 342 has_ready_to_run_tasks_cv_.Signal();
336 343
337 // Call WillRun() before releasing |lock_| and running task. 344 // Call WillRun() before releasing |lock_| and running task.
338 task->WillRun(); 345 task->WillRun();
339 346
340 { 347 {
341 base::AutoUnlock unlock(lock_); 348 base::AutoUnlock unlock(lock_);
342
343 task->RunOnWorkerThread(thread_index); 349 task->RunOnWorkerThread(thread_index);
344 } 350 }
345 351
346 // This will mark task as finished running. 352 // This will mark task as finished running.
347 task->DidRun(); 353 task->DidRun();
348 354
349 // Now iterate over all dependents to remove dependency and check 355 // Now iterate over all dependents to remove dependency and check
350 // if they are ready to run. 356 // if they are ready to run.
351 scoped_ptr<internal::GraphNode> node = running_tasks_.take_and_erase( 357 scoped_ptr<internal::GraphNode> node = running_tasks_pool_[worker_pool]->tak e_and_erase(
352 task.get()); 358 task.get());
353 if (node) { 359 if (node) {
354 for (internal::GraphNode::Vector::const_iterator it = 360 for (internal::GraphNode::Vector::const_iterator it =
355 node->dependents().begin(); 361 node->dependents().begin();
356 it != node->dependents().end(); ++it) { 362 it != node->dependents().end(); ++it) {
357 internal::GraphNode* dependent_node = *it; 363 internal::GraphNode* dependent_node = *it;
358
359 dependent_node->remove_dependency(); 364 dependent_node->remove_dependency();
360 // Task is ready if it has no dependencies. Add it to 365 // Task is ready if it has no dependencies. Add it to
361 // |ready_to_run_tasks_|. 366 // |ready_to_run_tasks_|.
362 if (!dependent_node->num_dependencies()) 367 if (!dependent_node->num_dependencies()) {
368 // c_map_.insert(std::pair<internal::GraphNode*, internal::WorkerPoolT ask*>(dependent_node, dependent_node->task()));
363 ready_to_run_tasks_.push(dependent_node); 369 ready_to_run_tasks_.push(dependent_node);
370 }
364 } 371 }
365 } 372 }
366 373
367 // Finally add task to |completed_tasks_|. 374 // Finally add task to |completed_tasks_|.
368 completed_tasks_.push_back(task); 375 completed_tasks_pool_[worker_pool].push_back(task);
369 } 376 }
370 377
371 // We noticed we should exit. Wake up the next worker so it knows it should 378 // We noticed we should exit. Wake up the next worker so it knows it should
372 // exit as well (because the Shutdown() code only signals once). 379 // exit as well (because the Shutdown() code only signals once).
373 has_ready_to_run_tasks_cv_.Signal(); 380 has_ready_to_run_tasks_cv_.Signal();
381 }
382
383 // Derived WorkerInner Ctor
384 DerivedInner::DerivedInner(): WorkerInner(cc::switches::GetNumRasterThreads(), " CompositorRaster") {
385 }
386 }// namespace anonymous
387
388 namespace internal {
389
390 WorkerPoolTask::WorkerPoolTask()
391 : did_schedule_(false),
392 did_run_(false),
393 did_complete_(false) {
374 } 394 }
375 395
396 WorkerPoolTask::~WorkerPoolTask() {
397 DCHECK_EQ(did_schedule_, did_complete_);
398 DCHECK(!did_run_ || did_schedule_);
399 DCHECK(!did_run_ || did_complete_);
400 }
401
402 void WorkerPoolTask::DidSchedule() {
403 DCHECK(!did_complete_);
404 did_schedule_ = true;
405 }
406
407 void WorkerPoolTask::WillRun() {
408 DCHECK(did_schedule_);
409 DCHECK(!did_complete_);
410 DCHECK(!did_run_);
411 }
412
413 void WorkerPoolTask::DidRun() {
414 did_run_ = true;
415 }
416
417 void WorkerPoolTask::WillComplete() {
418 DCHECK(!did_complete_);
419 }
420
421 void WorkerPoolTask::DidComplete() {
422 DCHECK(did_schedule_);
423 DCHECK(!did_complete_);
424 did_complete_ = true;
425 }
426
427 bool WorkerPoolTask::HasFinishedRunning() const {
428 return did_run_;
429 }
430
431 bool WorkerPoolTask::HasCompleted() const {
432 return did_complete_;
433 }
434
435 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
436 : task_(task),
437 priority_(priority),
438 num_dependencies_(0) {
439 }
440
441 GraphNode::~GraphNode() {
442 }
443
444 } // namespace internal
445
446
376 WorkerPool::WorkerPool(size_t num_threads, 447 WorkerPool::WorkerPool(size_t num_threads,
377 const std::string& thread_name_prefix) 448 const std::string& thread_name_prefix)
378 : in_dispatch_completion_callbacks_(false), 449 : in_dispatch_completion_callbacks_(false) {
379 inner_(make_scoped_ptr(new Inner(num_threads, thread_name_prefix))) {
380 } 450 }
381 451
382 WorkerPool::~WorkerPool() { 452 WorkerPool::~WorkerPool() {
383 } 453 }
384 454
385 void WorkerPool::Shutdown() { 455 void WorkerPool::Shutdown() {
386 TRACE_EVENT0("cc", "WorkerPool::Shutdown"); 456 TRACE_EVENT0("cc", "WorkerPool::Shutdown");
387 457
388 DCHECK(!in_dispatch_completion_callbacks_); 458 DCHECK(!in_dispatch_completion_callbacks_);
389 459 g_workerpool_inner.Pointer()->Shutdown();
390 inner_->Shutdown();
391 } 460 }
392 461
393 void WorkerPool::CheckForCompletedTasks() { 462 void WorkerPool::CheckForCompletedTasks() {
394 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); 463 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
395 464
396 DCHECK(!in_dispatch_completion_callbacks_); 465 DCHECK(!in_dispatch_completion_callbacks_);
397 466
398 TaskVector completed_tasks; 467 TaskVector completed_tasks;
399 inner_->CollectCompletedTasks(&completed_tasks); 468 g_workerpool_inner.Pointer()->CollectCompletedTasks(&completed_tasks, this);
400 ProcessCompletedTasks(completed_tasks); 469 ProcessCompletedTasks(completed_tasks);
401 } 470 }
402 471
403 void WorkerPool::ProcessCompletedTasks( 472 void WorkerPool::ProcessCompletedTasks(
404 const TaskVector& completed_tasks) { 473 const TaskVector& completed_tasks) {
405 TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks", 474 TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks",
406 "completed_task_count", completed_tasks.size()); 475 "completed_task_count", completed_tasks.size());
407 476
408 // Worker pool instance is not reentrant while processing completed tasks. 477 // Worker pool instance is not reentrant while processing completed tasks.
409 in_dispatch_completion_callbacks_ = true; 478 in_dispatch_completion_callbacks_ = true;
410 479
411 for (TaskVector::const_iterator it = completed_tasks.begin(); 480 for (TaskVector::const_iterator it = completed_tasks.begin();
412 it != completed_tasks.end(); 481 it != completed_tasks.end();
413 ++it) { 482 ++it) {
414 internal::WorkerPoolTask* task = it->get(); 483 internal::WorkerPoolTask* task = it->get();
415 484
416 task->WillComplete(); 485 task->WillComplete();
417 task->CompleteOnOriginThread(); 486 task->CompleteOnOriginThread();
418 task->DidComplete(); 487 task->DidComplete();
419 } 488 }
420 489
421 in_dispatch_completion_callbacks_ = false; 490 in_dispatch_completion_callbacks_ = false;
422 } 491 }
423 492
424 void WorkerPool::SetTaskGraph(TaskGraph* graph) { 493 void WorkerPool::SetTaskGraph(TaskGraph* graph) {
425 TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph", 494 TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph",
426 "num_tasks", graph->size()); 495 "num_tasks", graph->size());
427 496
428 DCHECK(!in_dispatch_completion_callbacks_); 497 DCHECK(!in_dispatch_completion_callbacks_);
429 498 g_workerpool_inner.Pointer()->SetTaskGraph(graph, this);
430 inner_->SetTaskGraph(graph);
431 } 499 }
432 500
433 } // namespace cc 501 } // namespace cc
OLDNEW
« no previous file with comments | « cc/resources/worker_pool.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698