Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(244)

Side by Side Diff: cc/resources/worker_pool.cc

Issue 73923003: Shared Raster Worker Threads (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: WIP - Mapped Task Struct and Priority Queue Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « cc/resources/worker_pool.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "cc/resources/worker_pool.h" 5 #include "cc/resources/worker_pool.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <queue> 8 #include <queue>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/command_line.h"
11 #include "base/containers/hash_tables.h" 12 #include "base/containers/hash_tables.h"
12 #include "base/debug/trace_event.h" 13 #include "base/debug/trace_event.h"
14 #include "base/lazy_instance.h"
13 #include "base/strings/stringprintf.h" 15 #include "base/strings/stringprintf.h"
14 #include "base/synchronization/condition_variable.h" 16 #include "base/synchronization/condition_variable.h"
15 #include "base/threading/simple_thread.h" 17 #include "base/threading/simple_thread.h"
16 #include "base/threading/thread_restrictions.h" 18 #include "base/threading/thread_restrictions.h"
17 #include "cc/base/scoped_ptr_deque.h" 19 #include "cc/base/scoped_ptr_deque.h"
20 #include "cc/base/switches.h"
18 21
19 namespace cc { 22 namespace cc {
20 23
21 namespace internal { 24 namespace {
22
23 WorkerPoolTask::WorkerPoolTask()
24 : did_schedule_(false),
25 did_run_(false),
26 did_complete_(false) {
27 }
28
29 WorkerPoolTask::~WorkerPoolTask() {
30 DCHECK_EQ(did_schedule_, did_complete_);
31 DCHECK(!did_run_ || did_schedule_);
32 DCHECK(!did_run_ || did_complete_);
33 }
34
35 void WorkerPoolTask::DidSchedule() {
36 DCHECK(!did_complete_);
37 did_schedule_ = true;
38 }
39
40 void WorkerPoolTask::WillRun() {
41 DCHECK(did_schedule_);
42 DCHECK(!did_complete_);
43 DCHECK(!did_run_);
44 }
45
46 void WorkerPoolTask::DidRun() {
47 did_run_ = true;
48 }
49
50 void WorkerPoolTask::WillComplete() {
51 DCHECK(!did_complete_);
52 }
53
54 void WorkerPoolTask::DidComplete() {
55 DCHECK(did_schedule_);
56 DCHECK(!did_complete_);
57 did_complete_ = true;
58 }
59
60 bool WorkerPoolTask::HasFinishedRunning() const {
61 return did_run_;
62 }
63
64 bool WorkerPoolTask::HasCompleted() const {
65 return did_complete_;
66 }
67
68 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
69 : task_(task),
70 priority_(priority),
71 num_dependencies_(0) {
72 }
73
74 GraphNode::~GraphNode() {
75 }
76
77 } // namespace internal
78 25
79 // Internal to the worker pool. Any data or logic that needs to be 26 // Internal to the worker pool. Any data or logic that needs to be
80 // shared between threads lives in this class. All members are guarded 27 // shared between threads lives in this class. All members are guarded
81 // by |lock_|. 28 // by |lock_|.
82 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { 29 class WorkerInner : public base::DelegateSimpleThread::Delegate {
83 public: 30 public:
84 Inner(size_t num_threads, const std::string& thread_name_prefix); 31 WorkerInner(size_t num_threads, const std::string& thread_name_prefix);
85 virtual ~Inner(); 32 virtual ~WorkerInner();
86 33
87 void Shutdown(); 34 void Shutdown();
88 35
36 typedef base::ScopedPtrHashMap<internal::WorkerPoolTask*, internal::GraphNode>
37 GraphNodeMap;
38 typedef GraphNodeMap TaskGraph;
39 typedef base::ScopedPtrHashMap<WorkerPool*, GraphNodeMap>
40 TaskMap;
41 typedef std::vector<scoped_refptr<internal::WorkerPoolTask> > TaskVector;
42
89 // Schedule running of tasks in |graph|. Tasks previously scheduled but 43 // Schedule running of tasks in |graph|. Tasks previously scheduled but
90 // no longer needed will be canceled unless already running. Canceled 44 // no longer needed will be canceled unless already running. Canceled
91 // tasks are moved to |completed_tasks_| without being run. The result 45 // tasks are moved to |completed_tasks_| without being run. The result
92 // is that once scheduled, a task is guaranteed to end up in the 46 // is that once scheduled, a task is guaranteed to end up in the
93 // |completed_tasks_| queue even if they later get canceled by another 47 // |completed_tasks_| queue even if they later get canceled by another
94 // call to SetTaskGraph(). 48 // call to SetTaskGraph().
95 void SetTaskGraph(TaskGraph* graph); 49
50 void SetTaskGraph(TaskGraph* graph, WorkerPool* worker_pool);
96 51
97 // Collect all completed tasks in |completed_tasks|. 52 // Collect all completed tasks in |completed_tasks|.
98 void CollectCompletedTasks(TaskVector* completed_tasks); 53 void CollectCompletedTasks(TaskVector* completed_tasks, WorkerPool* worker_poo l);
54
99 55
100 private: 56 private:
101 class PriorityComparator { 57 class PriorityComparator {
102 public: 58 public:
103 bool operator()(const internal::GraphNode* a, 59 bool operator()(const internal::GraphNode* a,
104 const internal::GraphNode* b) { 60 const internal::GraphNode* b) {
105 // In this system, numerically lower priority is run first. 61 // In this system, numerically lower priority is run first.
106 if (a->priority() != b->priority()) 62 if (a->priority() != b->priority())
107 return a->priority() > b->priority(); 63 return a->priority() > b->priority();
108 64
109 // Run task with most dependents first when priority is the same. 65 // Run task with most dependents first when priority is the same.
110 return a->dependents().size() < b->dependents().size(); 66 return a->dependents().size() < b->dependents().size();
111 } 67 }
112 }; 68 };
113 69
70 // Ordered set of tasks that are ready to run.
71 typedef std::priority_queue<internal::GraphNode*,
72 std::vector<internal::GraphNode*>,
73 PriorityComparator> TaskQueue;
74 // TaskQueue ready_to_run_tasks_;
75
76 class TaskSet {
77 public:
78 GraphNodeMap pending_tasks_;
79 GraphNodeMap running_tasks_;
80 TaskVector completed_tasks_;
81 TaskQueue ready_to_run_tasks_;
82 };
83
84 class TaskComparator {
85 public:
86 bool operator()(const TaskSet* a, const TaskSet* b) {
87 if (a->ready_to_run_tasks_.top()->priority() != b->ready_to_run_tasks_.top ()->priority())
88 return a->ready_to_run_tasks_.top()->priority() > b->ready_to_run_tasks _.top()->priority();
89
90 return a->ready_to_run_tasks_.top()->dependents().size() > b->ready_to_run _tasks_.top()->dependents().size();
91 }
92 };
93
94 typedef std::map<const WorkerPool*, TaskSet*> TaskMapper;
reveman 2013/12/09 20:19:18 Could you just store TaskSets by value instead of
sohanjg 2013/12/10 07:19:37 We cannot use values, as we discussed in earlier i
95
96 TaskMapper tasks_;
97
98 typedef std::priority_queue<TaskSet*,
99 std::vector<TaskSet*>,
100 TaskComparator> TaskPriorityQueue;
101
102 TaskPriorityQueue shared_ready_to_run_tasks_;
114 // Overridden from base::DelegateSimpleThread: 103 // Overridden from base::DelegateSimpleThread:
115 virtual void Run() OVERRIDE; 104 virtual void Run() OVERRIDE;
116 105
117 // This lock protects all members of this class except 106 // This lock protects all members of this class except
118 // |worker_pool_on_origin_thread_|. Do not read or modify anything 107 // |worker_pool_on_origin_thread_|. Do not read or modify anything
119 // without holding this lock. Do not block while holding this lock. 108 // without holding this lock. Do not block while holding this lock.
120 mutable base::Lock lock_; 109 mutable base::Lock lock_;
121 110
122 // Condition variable that is waited on by worker threads until new 111 // Condition variable that is waited on by worker threads until new
123 // tasks are ready to run or shutdown starts. 112 // tasks are ready to run or shutdown starts.
124 base::ConditionVariable has_ready_to_run_tasks_cv_; 113 base::ConditionVariable has_ready_to_run_tasks_cv_;
125 114
126 // Provides each running thread loop with a unique index. First thread 115 // Provides each running thread loop with a unique index. First thread
127 // loop index is 0. 116 // loop index is 0.
128 unsigned next_thread_index_; 117 unsigned next_thread_index_;
129 118
130 // Set during shutdown. Tells workers to exit when no more tasks 119 // Set during shutdown. Tells workers to exit when no more tasks
131 // are pending. 120 // are pending.
132 bool shutdown_; 121 bool shutdown_;
133 122
134 // This set contains all pending tasks.
135 GraphNodeMap pending_tasks_;
136
137 // Ordered set of tasks that are ready to run.
138 typedef std::priority_queue<internal::GraphNode*,
139 std::vector<internal::GraphNode*>,
140 PriorityComparator> TaskQueue;
141 TaskQueue ready_to_run_tasks_;
142
143 // This set contains all currently running tasks.
144 GraphNodeMap running_tasks_;
145
146 // Completed tasks not yet collected by origin thread.
147 TaskVector completed_tasks_;
148
149 ScopedPtrDeque<base::DelegateSimpleThread> workers_; 123 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
150 124
151 DISALLOW_COPY_AND_ASSIGN(Inner); 125 DISALLOW_COPY_AND_ASSIGN(WorkerInner);
152 }; 126 };
153 127
154 WorkerPool::Inner::Inner( 128 class CC_EXPORT DerivedInner : public WorkerInner {
129 public:
130 DerivedInner();
131 };
132
133 base::LazyInstance<DerivedInner> g_workerpool_inner;
134
135
136 WorkerInner::WorkerInner(
155 size_t num_threads, const std::string& thread_name_prefix) 137 size_t num_threads, const std::string& thread_name_prefix)
156 : lock_(), 138 : lock_(),
157 has_ready_to_run_tasks_cv_(&lock_), 139 has_ready_to_run_tasks_cv_(&lock_),
158 next_thread_index_(0), 140 next_thread_index_(0),
159 shutdown_(false) { 141 shutdown_(false) {
160 base::AutoLock lock(lock_); 142 base::AutoLock lock(lock_);
161 143
162 while (workers_.size() < num_threads) { 144 while (workers_.size() < num_threads) {
163 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 145 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
164 new base::DelegateSimpleThread( 146 new base::DelegateSimpleThread(
165 this, 147 this,
166 thread_name_prefix + 148 thread_name_prefix +
167 base::StringPrintf( 149 base::StringPrintf(
168 "Worker%u", 150 "Worker%u",
169 static_cast<unsigned>(workers_.size() + 1)).c_str())); 151 static_cast<unsigned>(workers_.size() + 1)).c_str()));
170 worker->Start(); 152 worker->Start();
171 #if defined(OS_ANDROID) || defined(OS_LINUX) 153 #if defined(OS_ANDROID) || defined(OS_LINUX)
172 worker->SetThreadPriority(base::kThreadPriority_Background); 154 worker->SetThreadPriority(base::kThreadPriority_Background);
173 #endif 155 #endif
174 workers_.push_back(worker.Pass()); 156 workers_.push_back(worker.Pass());
175 } 157 }
176 } 158 }
177 159
178 WorkerPool::Inner::~Inner() { 160 WorkerInner::~WorkerInner() {
179 base::AutoLock lock(lock_); 161 base::AutoLock lock(lock_);
180 162
181 DCHECK(shutdown_); 163 DCHECK(shutdown_);
182 164 // DCHECK_EQ(0u, wp_->pending_tasks_.size());
183 DCHECK_EQ(0u, pending_tasks_.size()); 165 DCHECK_EQ(0u, shared_ready_to_run_tasks_.size());
184 DCHECK_EQ(0u, ready_to_run_tasks_.size()); 166 // DCHECK_EQ(0u, wp_->running_tasks_.size());
185 DCHECK_EQ(0u, running_tasks_.size()); 167 // DCHECK_EQ(0u, wp_->completed_tasks_.size());
186 DCHECK_EQ(0u, completed_tasks_.size());
187 } 168 }
188 169
189 void WorkerPool::Inner::Shutdown() { 170 void WorkerInner::Shutdown() {
190 { 171 {
191 base::AutoLock lock(lock_); 172 base::AutoLock lock(lock_);
192 173
193 DCHECK(!shutdown_); 174 DCHECK(!shutdown_);
194 shutdown_ = true; 175 shutdown_ = true;
195 176
196 // Wake up a worker so it knows it should exit. This will cause all workers 177 // Wake up a worker so it knows it should exit. This will cause all workers
197 // to exit as each will wake up another worker before exiting. 178 // to exit as each will wake up another worker before exiting.
198 has_ready_to_run_tasks_cv_.Signal(); 179 has_ready_to_run_tasks_cv_.Signal();
199 } 180 }
200 181
201 while (workers_.size()) { 182 while (workers_.size()) {
202 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 183 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
203 // http://crbug.com/240453 - Join() is considered IO and will block this 184 // http://crbug.com/240453 - Join() is considered IO and will block this
204 // thread. See also http://crbug.com/239423 for further ideas. 185 // thread. See also http://crbug.com/239423 for further ideas.
205 base::ThreadRestrictions::ScopedAllowIO allow_io; 186 base::ThreadRestrictions::ScopedAllowIO allow_io;
206 worker->Join(); 187 worker->Join();
207 } 188 }
208 } 189 }
209 190
210 void WorkerPool::Inner::SetTaskGraph(TaskGraph* graph) { 191 void WorkerInner::SetTaskGraph(TaskGraph* graph, WorkerPool* worker_pool) {
211 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty. 192 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty.
212 DCHECK(graph->empty() || !shutdown_); 193 DCHECK(graph->empty() || !shutdown_);
213 194
214 GraphNodeMap new_pending_tasks; 195 GraphNodeMap new_pending_tasks;
215 GraphNodeMap new_running_tasks; 196 GraphNodeMap new_running_tasks;
216 TaskQueue new_ready_to_run_tasks; 197 TaskQueue new_ready_to_run_tasks;
198 TaskVector temp_completed_tasks_;
199 GraphNodeMap temp_pending_tasks;
217 200
218 new_pending_tasks.swap(*graph); 201 new_pending_tasks.swap(*graph);
219 202
220 { 203 {
221 base::AutoLock lock(lock_); 204 base::AutoLock lock(lock_);
222 205
206
207
208 // Create Task Set
209 if (tasks_.count(worker_pool) == 0) {
210
211 TaskSet* task_set= new TaskSet();
212 task_set->completed_tasks_ = temp_completed_tasks_;
213 task_set->ready_to_run_tasks_ = new_ready_to_run_tasks;
214 task_set->running_tasks_.swap(new_running_tasks);
215 task_set->pending_tasks_.swap(temp_pending_tasks);
216 tasks_.insert(std::pair<const WorkerPool*, TaskSet*>(worker_pool, task_set ));
217 delete task_set;
reveman 2013/12/09 20:19:18 I don't think you want to delete the task set here
sohanjg 2013/12/10 07:19:37 As mentioned above, there is inheritance issue wit
218
219 }
220
223 // First remove all completed tasks from |new_pending_tasks| and 221 // First remove all completed tasks from |new_pending_tasks| and
224 // adjust number of dependencies. 222 // adjust number of dependencies.
225 for (TaskVector::iterator it = completed_tasks_.begin(); 223 if (!tasks_[worker_pool]->completed_tasks_.empty()) {
reveman 2013/12/09 20:19:18 why is this empty() check now needed?
sohanjg 2013/12/10 07:19:37 This and other size/empty checks are preventive ch
226 it != completed_tasks_.end(); ++it) { 224 for (TaskVector::iterator it = tasks_[worker_pool]->completed_tasks_.begin ();
reveman 2013/12/09 20:19:18 Please avoid a lookup in tasks_ for every use and
227 internal::WorkerPoolTask* task = it->get(); 225 it != tasks_[worker_pool]->completed_tasks_.end(); ++it) {
228 226 internal::WorkerPoolTask* task = it->get();
229 scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase( 227 scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase(
230 task); 228 task);
231 if (node) { 229 if (node) {
232 for (internal::GraphNode::Vector::const_iterator it = 230 for (internal::GraphNode::Vector::const_iterator it =
233 node->dependents().begin(); 231 node->dependents().begin();
234 it != node->dependents().end(); ++it) { 232 it != node->dependents().end(); ++it) {
235 internal::GraphNode* dependent_node = *it; 233 internal::GraphNode* dependent_node = *it;
236 dependent_node->remove_dependency(); 234 dependent_node->remove_dependency();
235 }
237 } 236 }
238 } 237 }
239 } 238 }
240 239
241 // Build new running task set. 240 // Build new running task set.
242 for (GraphNodeMap::iterator it = running_tasks_.begin(); 241 if (!tasks_[worker_pool]->running_tasks_.empty()) {
reveman 2013/12/09 20:19:18 why the empty() check?
243 it != running_tasks_.end(); ++it) { 242 for (GraphNodeMap::iterator it = tasks_[worker_pool]->running_tasks_.begin ();
244 internal::WorkerPoolTask* task = it->first; 243 it != tasks_[worker_pool]->running_tasks_.end(); ++it) {
245 // Transfer scheduled task value from |new_pending_tasks| to 244 internal::WorkerPoolTask* task = it->first;
246 // |new_running_tasks| if currently running. Value must be set to 245 // Transfer scheduled task value from |new_pending_tasks| to
247 // NULL if |new_pending_tasks| doesn't contain task. This does 246 // |new_running_tasks| if currently running. Value must be set to
248 // the right in both cases. 247 // NULL if |new_pending_tasks| doesn't contain task. This does
249 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); 248 // the right in both cases.
249 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
250 }
250 } 251 }
251 252
252 // Build new "ready to run" tasks queue. 253 // Build new "ready to run" tasks queue.
253 // TODO(reveman): Create this queue when building the task graph instead. 254 // TODO(reveman): Create this queue when building the task graph instead.
254 for (GraphNodeMap::iterator it = new_pending_tasks.begin(); 255 for (GraphNodeMap::iterator it = new_pending_tasks.begin();
255 it != new_pending_tasks.end(); ++it) { 256 it != new_pending_tasks.end(); ++it) {
256 internal::WorkerPoolTask* task = it->first; 257 internal::WorkerPoolTask* task = it->first;
257 DCHECK(task); 258 DCHECK(task);
258 internal::GraphNode* node = it->second; 259 internal::GraphNode* node = it->second;
259 260
260 // Completed tasks should not exist in |new_pending_tasks|. 261 // Completed tasks should not exist in |new_pending_tasks|.
261 DCHECK(!task->HasFinishedRunning()); 262 DCHECK(!task->HasFinishedRunning());
262 263
263 // Call DidSchedule() to indicate that this task has been scheduled. 264 // Call DidSchedule() to indicate that this task has been scheduled.
264 // Note: This is only for debugging purposes. 265 // Note: This is only for debugging purposes.
265 task->DidSchedule(); 266 task->DidSchedule();
266 267
267 if (!node->num_dependencies()) 268 if (!node->num_dependencies()) {
268 new_ready_to_run_tasks.push(node); 269 new_ready_to_run_tasks.push(node);
270 }
271 // Erase the task from old pending tasks.
272 if (tasks_[worker_pool]->pending_tasks_.size() > 0)
reveman 2013/12/09 20:19:18 why the size() check?
273 tasks_[worker_pool]->pending_tasks_.erase(task);
269 274
270 // Erase the task from old pending tasks.
271 pending_tasks_.erase(task);
272 } 275 }
273 276
274 completed_tasks_.reserve(completed_tasks_.size() + pending_tasks_.size()); 277 if (!tasks_[worker_pool]->pending_tasks_.empty())
reveman 2013/12/09 20:19:18 and why check empty() here?
278 tasks_[worker_pool]->completed_tasks_.reserve(tasks_[worker_pool]->complet ed_tasks_.size() + tasks_[worker_pool]->pending_tasks_.size());
275 279
276 // The items left in |pending_tasks_| need to be canceled. 280 // The items left in |pending_tasks_| need to be canceled.
277 for (GraphNodeMap::const_iterator it = pending_tasks_.begin(); 281 if (!tasks_[worker_pool]->pending_tasks_.empty()) {
reveman 2013/12/09 20:19:18 and here
278 it != pending_tasks_.end(); 282 for (GraphNodeMap::const_iterator it = tasks_[worker_pool]->pending_task s_.begin();
279 ++it) { 283 it != tasks_[worker_pool]->pending_tasks_.end();
280 completed_tasks_.push_back(it->first); 284 ++it) {
281 } 285 // completed_tasks_.push_back(it->first);
286 tasks_[worker_pool]->completed_tasks_.push_back(it->first);
287 // temp_completed_tasks_.push_back(it->first);
288 }
289 }
282 290
283 // Swap task sets. 291 tasks_[worker_pool]->pending_tasks_.swap(new_pending_tasks);
284 // Note: old tasks are intentionally destroyed after releasing |lock_|. 292 tasks_[worker_pool]->running_tasks_.swap(new_running_tasks);
285 pending_tasks_.swap(new_pending_tasks); 293 std::swap(tasks_[worker_pool]->ready_to_run_tasks_, new_ready_to_run_tasks);
286 running_tasks_.swap(new_running_tasks); 294 shared_ready_to_run_tasks_.push(tasks_[worker_pool]);
reveman 2013/12/09 20:19:18 what happens to the old TaskSet pointer in shared_
sohanjg 2013/12/10 07:19:37 shared_ready_to_run_tasks_ is a prioirity queue, s
287 std::swap(ready_to_run_tasks_, new_ready_to_run_tasks);
288 295
289 // If |ready_to_run_tasks_| is empty, it means we either have 296 // If |ready_to_run_tasks_| is empty, it means we either have
290 // running tasks, or we have no pending tasks. 297 // running tasks, or we have no pending tasks.
291 DCHECK(!ready_to_run_tasks_.empty() || 298 DCHECK(!tasks_[worker_pool]->ready_to_run_tasks_.empty() ||
292 (pending_tasks_.empty() || !running_tasks_.empty())); 299 (tasks_[worker_pool]->pending_tasks_.empty() || !tasks_[worker_pool]- >running_tasks_.empty()));
293 300
294 // If there is more work available, wake up worker thread. 301 // If there is more work available, wake up worker thread.
295 if (!ready_to_run_tasks_.empty()) 302 if (!tasks_[worker_pool]->ready_to_run_tasks_.empty())
296 has_ready_to_run_tasks_cv_.Signal(); 303 has_ready_to_run_tasks_cv_.Signal();
297 } 304 }
298 } 305 }
299 306
300 void WorkerPool::Inner::CollectCompletedTasks(TaskVector* completed_tasks) { 307 void WorkerInner::CollectCompletedTasks
308 (TaskVector* completed_tasks, WorkerPool* worker_pool) {
301 base::AutoLock lock(lock_); 309 base::AutoLock lock(lock_);
302 310
303 DCHECK_EQ(0u, completed_tasks->size()); 311 DCHECK_EQ(0u, completed_tasks->size());
304 completed_tasks->swap(completed_tasks_); 312 if (!shared_ready_to_run_tasks_.empty())
reveman 2013/12/09 20:19:18 why the empty check?
313 completed_tasks->swap(shared_ready_to_run_tasks_.top()->completed_tasks_);
314
305 } 315 }
306 316
307 void WorkerPool::Inner::Run() { 317 void WorkerInner::Run() {
308 base::AutoLock lock(lock_); 318 base::AutoLock lock(lock_);
309 319
310 // Get a unique thread index. 320 // Get a unique thread index.
311 int thread_index = next_thread_index_++; 321 int thread_index = next_thread_index_++;
322 // bool get_new_taskqueue = true;
323 TaskSet* ready_to_run_task_set_ = new TaskSet();
reveman 2013/12/09 20:19:18 why this heap allocation? Also don't use underscor
sohanjg 2013/12/10 07:19:37 Will take care of it.
312 324
313 while (true) { 325 while (true) {
314 if (ready_to_run_tasks_.empty()) { 326 if (shared_ready_to_run_tasks_.empty()) {
315 // Exit when shutdown is set and no more tasks are pending. 327 // Exit when shutdown is set and no more tasks are pending.
316 if (shutdown_ && pending_tasks_.empty()) 328 // if (shutdown_ && pending_tasks_.empty())
317 break; 329 // if (shutdown_ && ready_to_run_task_set_->pending_tasks_.empty())
330 // break;
331 // Wait for more tasks.
332 has_ready_to_run_tasks_cv_.Wait();
333 continue;
334 }
318 335
319 // Wait for more tasks. 336
320 has_ready_to_run_tasks_cv_.Wait(); 337 // Take top priority TaskSet from |shared_ready_to_run_tasks_|.
321 continue; 338 if (ready_to_run_task_set_ == NULL || ready_to_run_task_set_->ready_to_run_t asks_.empty()) {
reveman 2013/12/09 20:19:18 I don't understand why you have these checks here.
sohanjg 2013/12/10 07:19:37 This check is to ensure that, ready_to_run_task_se
339 ready_to_run_task_set_ = shared_ready_to_run_tasks_.top();
340 shared_ready_to_run_tasks_.pop();
322 } 341 }
323 342
324 // Take top priority task from |ready_to_run_tasks_|. 343 // Take top priority task from |ready_to_run_tasks_|.
325 scoped_refptr<internal::WorkerPoolTask> task( 344 scoped_refptr<internal::WorkerPoolTask> task(
326 ready_to_run_tasks_.top()->task()); 345 ready_to_run_task_set_->ready_to_run_tasks_.top()->task());
327 ready_to_run_tasks_.pop(); 346 ready_to_run_task_set_->ready_to_run_tasks_.pop();
reveman 2013/12/09 20:19:18 You'll have to insert the TaskSet in the shared qu
sohanjg 2013/12/10 07:19:37 Why would we need to re-insert again ? Wont SetTas
347
328 348
329 // Move task from |pending_tasks_| to |running_tasks_|. 349 // Move task from |pending_tasks_| to |running_tasks_|.
330 DCHECK(pending_tasks_.contains(task.get())); 350 DCHECK(ready_to_run_task_set_->pending_tasks_.contains(task.get()));
331 DCHECK(!running_tasks_.contains(task.get())); 351 DCHECK(!ready_to_run_task_set_->running_tasks_.contains(task.get()));
332 running_tasks_.set(task.get(), pending_tasks_.take_and_erase(task.get())); 352
353 ready_to_run_task_set_->running_tasks_.set(task.get(), ready_to_run_task_set _->pending_tasks_.take_and_erase(task.get()));
333 354
334 // There may be more work available, so wake up another worker thread. 355 // There may be more work available, so wake up another worker thread.
335 has_ready_to_run_tasks_cv_.Signal(); 356 has_ready_to_run_tasks_cv_.Signal();
336 357
358 // if (ready_to_run_task_set_->ready_to_run_tasks_.empty())
359 // get_new_taskqueue = true;
360
337 // Call WillRun() before releasing |lock_| and running task. 361 // Call WillRun() before releasing |lock_| and running task.
338 task->WillRun(); 362 task->WillRun();
339 363
340 { 364 {
341 base::AutoUnlock unlock(lock_); 365 base::AutoUnlock unlock(lock_);
342 366 // VLOG(0)<<__FUNCTION__<<" Sohan - RunOnWorkerThread";
343 task->RunOnWorkerThread(thread_index); 367 task->RunOnWorkerThread(thread_index);
344 } 368 }
345 369
346 // This will mark task as finished running. 370 // This will mark task as finished running.
347 task->DidRun(); 371 task->DidRun();
348 372
349 // Now iterate over all dependents to remove dependency and check 373 // Now iterate over all dependents to remove dependency and check
350 // if they are ready to run. 374 // if they are ready to run.
351 scoped_ptr<internal::GraphNode> node = running_tasks_.take_and_erase( 375 if (ready_to_run_task_set_->running_tasks_.size() > 0) {
reveman 2013/12/09 20:19:18 why the size check?
376 scoped_ptr<internal::GraphNode> node = ready_to_run_task_set_->running_tasks _.take_and_erase(
352 task.get()); 377 task.get());
353 if (node) { 378 if (node) {
354 for (internal::GraphNode::Vector::const_iterator it = 379 for (internal::GraphNode::Vector::const_iterator it =
355 node->dependents().begin(); 380 node->dependents().begin();
356 it != node->dependents().end(); ++it) { 381 it != node->dependents().end(); ++it) {
357 internal::GraphNode* dependent_node = *it; 382 internal::GraphNode* dependent_node = *it;
358 383
359 dependent_node->remove_dependency(); 384 dependent_node->remove_dependency();
360 // Task is ready if it has no dependencies. Add it to 385 // Task is ready if it has no dependencies. Add it to
361 // |ready_to_run_tasks_|. 386 // |ready_to_run_tasks_|.
362 if (!dependent_node->num_dependencies()) 387 if (!dependent_node->num_dependencies())
363 ready_to_run_tasks_.push(dependent_node); 388 ready_to_run_task_set_->ready_to_run_tasks_.push(dependent_node);
389 }
364 } 390 }
365 } 391 }
366 392
367 // Finally add task to |completed_tasks_|. 393 // Finally add task to |completed_tasks_|.
368 completed_tasks_.push_back(task); 394 ready_to_run_task_set_->completed_tasks_.push_back(task);
395
369 } 396 }
370 397
398 delete ready_to_run_task_set_;
371 // We noticed we should exit. Wake up the next worker so it knows it should 399 // We noticed we should exit. Wake up the next worker so it knows it should
372 // exit as well (because the Shutdown() code only signals once). 400 // exit as well (because the Shutdown() code only signals once).
373 has_ready_to_run_tasks_cv_.Signal(); 401 has_ready_to_run_tasks_cv_.Signal();
374 } 402 }
375 403
404 // Derived WorkerInner Ctor
405 DerivedInner::DerivedInner(): WorkerInner(cc::switches::GetNumRasterThreads(), " CompositorRaster") {
406 }
407 } // namespace anonymous
408
409 namespace internal {
410
411 WorkerPoolTask::WorkerPoolTask()
412 : did_schedule_(false),
413 did_run_(false),
414 did_complete_(false) {
415 }
416
417 WorkerPoolTask::~WorkerPoolTask() {
418 DCHECK_EQ(did_schedule_, did_complete_);
419 DCHECK(!did_run_ || did_schedule_);
420 DCHECK(!did_run_ || did_complete_);
421 }
422
423 void WorkerPoolTask::DidSchedule() {
424 DCHECK(!did_complete_);
425 did_schedule_ = true;
426 }
427
428 void WorkerPoolTask::WillRun() {
429 DCHECK(did_schedule_);
430 DCHECK(!did_complete_);
431 DCHECK(!did_run_);
432 }
433
434 void WorkerPoolTask::DidRun() {
435 did_run_ = true;
436 }
437
438 void WorkerPoolTask::WillComplete() {
439 DCHECK(!did_complete_);
440 }
441
442 void WorkerPoolTask::DidComplete() {
443 DCHECK(did_schedule_);
444 DCHECK(!did_complete_);
445 did_complete_ = true;
446 }
447
448 bool WorkerPoolTask::HasFinishedRunning() const {
449 return did_run_;
450 }
451
452 bool WorkerPoolTask::HasCompleted() const {
453 return did_complete_;
454 }
455
456 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
457 : task_(task),
458 priority_(priority),
459 num_dependencies_(0) {
460 }
461
462 GraphNode::~GraphNode() {
463 }
464
465 } // namespace internal
466
467
376 WorkerPool::WorkerPool(size_t num_threads, 468 WorkerPool::WorkerPool(size_t num_threads,
377 const std::string& thread_name_prefix) 469 const std::string& thread_name_prefix)
378 : in_dispatch_completion_callbacks_(false), 470 : in_dispatch_completion_callbacks_(false) {
379 inner_(make_scoped_ptr(new Inner(num_threads, thread_name_prefix))) {
380 } 471 }
381 472
382 WorkerPool::~WorkerPool() { 473 WorkerPool::~WorkerPool() {
383 } 474 }
384 475
385 void WorkerPool::Shutdown() { 476 void WorkerPool::Shutdown() {
386 TRACE_EVENT0("cc", "WorkerPool::Shutdown"); 477 TRACE_EVENT0("cc", "WorkerPool::Shutdown");
387 478
388 DCHECK(!in_dispatch_completion_callbacks_); 479 DCHECK(!in_dispatch_completion_callbacks_);
389 480 g_workerpool_inner.Pointer()->Shutdown();
390 inner_->Shutdown();
391 } 481 }
392 482
393 void WorkerPool::CheckForCompletedTasks() { 483 void WorkerPool::CheckForCompletedTasks() {
394 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); 484 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
395 485
396 DCHECK(!in_dispatch_completion_callbacks_); 486 DCHECK(!in_dispatch_completion_callbacks_);
397 487
398 TaskVector completed_tasks; 488 TaskVector completed_tasks;
399 inner_->CollectCompletedTasks(&completed_tasks); 489 g_workerpool_inner.Pointer()->CollectCompletedTasks(&completed_tasks, this);
400 ProcessCompletedTasks(completed_tasks); 490 ProcessCompletedTasks(completed_tasks);
401 } 491 }
402 492
403 void WorkerPool::ProcessCompletedTasks( 493 void WorkerPool::ProcessCompletedTasks(
404 const TaskVector& completed_tasks) { 494 const TaskVector& completed_tasks) {
405 TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks", 495 TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks",
406 "completed_task_count", completed_tasks.size()); 496 "completed_task_count", completed_tasks.size());
407 497
408 // Worker pool instance is not reentrant while processing completed tasks. 498 // Worker pool instance is not reentrant while processing completed tasks.
409 in_dispatch_completion_callbacks_ = true; 499 in_dispatch_completion_callbacks_ = true;
410 500
411 for (TaskVector::const_iterator it = completed_tasks.begin(); 501 for (TaskVector::const_iterator it = completed_tasks.begin();
412 it != completed_tasks.end(); 502 it != completed_tasks.end();
413 ++it) { 503 ++it) {
414 internal::WorkerPoolTask* task = it->get(); 504 internal::WorkerPoolTask* task = it->get();
415 505
416 task->WillComplete(); 506 task->WillComplete();
417 task->CompleteOnOriginThread(); 507 task->CompleteOnOriginThread();
418 task->DidComplete(); 508 task->DidComplete();
419 } 509 }
420 510
421 in_dispatch_completion_callbacks_ = false; 511 in_dispatch_completion_callbacks_ = false;
422 } 512 }
423 513
424 void WorkerPool::SetTaskGraph(TaskGraph* graph) { 514 void WorkerPool::SetTaskGraph(TaskGraph* graph) {
425 TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph", 515 TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph",
426 "num_tasks", graph->size()); 516 "num_tasks", graph->size());
427 517
428 DCHECK(!in_dispatch_completion_callbacks_); 518 DCHECK(!in_dispatch_completion_callbacks_);
429 519 g_workerpool_inner.Pointer()->SetTaskGraph(graph, this);
430 inner_->SetTaskGraph(graph);
431 } 520 }
432 521
433 } // namespace cc 522 } // namespace cc
OLDNEW
« no previous file with comments | « cc/resources/worker_pool.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698