Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: cc/resources/worker_pool.cc

Issue 73923003: Shared Raster Worker Threads (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: WIP - code review changes Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « cc/resources/worker_pool.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "cc/resources/worker_pool.h" 5 #include "cc/resources/worker_pool.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <queue> 8 #include <queue>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/command_line.h"
11 #include "base/containers/hash_tables.h" 12 #include "base/containers/hash_tables.h"
12 #include "base/debug/trace_event.h" 13 #include "base/debug/trace_event.h"
14 #include "base/lazy_instance.h"
13 #include "base/strings/stringprintf.h" 15 #include "base/strings/stringprintf.h"
14 #include "base/synchronization/condition_variable.h" 16 #include "base/synchronization/condition_variable.h"
15 #include "base/threading/simple_thread.h" 17 #include "base/threading/simple_thread.h"
16 #include "base/threading/thread_restrictions.h" 18 #include "base/threading/thread_restrictions.h"
17 #include "cc/base/scoped_ptr_deque.h" 19 #include "cc/base/scoped_ptr_deque.h"
20 #include "cc/base/switches.h"
18 21
19 namespace cc { 22 namespace cc {
20 23
21 namespace internal { 24 namespace {
22
23 WorkerPoolTask::WorkerPoolTask()
24 : did_schedule_(false),
25 did_run_(false),
26 did_complete_(false) {
27 }
28
29 WorkerPoolTask::~WorkerPoolTask() {
30 DCHECK_EQ(did_schedule_, did_complete_);
31 DCHECK(!did_run_ || did_schedule_);
32 DCHECK(!did_run_ || did_complete_);
33 }
34
35 void WorkerPoolTask::DidSchedule() {
36 DCHECK(!did_complete_);
37 did_schedule_ = true;
38 }
39
40 void WorkerPoolTask::WillRun() {
41 DCHECK(did_schedule_);
42 DCHECK(!did_complete_);
43 DCHECK(!did_run_);
44 }
45
46 void WorkerPoolTask::DidRun() {
47 did_run_ = true;
48 }
49
50 void WorkerPoolTask::WillComplete() {
51 DCHECK(!did_complete_);
52 }
53
54 void WorkerPoolTask::DidComplete() {
55 DCHECK(did_schedule_);
56 DCHECK(!did_complete_);
57 did_complete_ = true;
58 }
59
60 bool WorkerPoolTask::HasFinishedRunning() const {
61 return did_run_;
62 }
63
64 bool WorkerPoolTask::HasCompleted() const {
65 return did_complete_;
66 }
67
68 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
69 : task_(task),
70 priority_(priority),
71 num_dependencies_(0) {
72 }
73
74 GraphNode::~GraphNode() {
75 }
76
77 } // namespace internal
78 25
79 // Internal to the worker pool. Any data or logic that needs to be 26 // Internal to the worker pool. Any data or logic that needs to be
80 // shared between threads lives in this class. All members are guarded 27 // shared between threads lives in this class. All members are guarded
81 // by |lock_|. 28 // by |lock_|.
82 class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate { 29 class WorkerInner : public base::DelegateSimpleThread::Delegate {
reveman 2013/12/16 17:06:30 We need a better name than "WorkerInner" for this.
sohanjg 2013/12/17 14:58:12 Done.
83 public: 30 public:
84 Inner(size_t num_threads, const std::string& thread_name_prefix); 31 WorkerInner(size_t num_threads, const std::string& thread_name_prefix);
85 virtual ~Inner(); 32 virtual ~WorkerInner();
86 33
87 void Shutdown(); 34 void Shutdown();
88 35
36 typedef base::ScopedPtrHashMap<internal::WorkerPoolTask*, internal::GraphNode>
37 GraphNodeMap;
38 typedef GraphNodeMap TaskGraph;
reveman 2013/12/16 17:06:30 How about "typedef WorkerPool::TaskGraph TaskGraph
sohanjg 2013/12/17 14:58:12 Done. I had to pull the TaskGraph and Taskvector f
39 typedef base::ScopedPtrHashMap<WorkerPool*, GraphNodeMap>
40 TaskMap;
reveman 2013/12/16 17:06:30 Not a task map. Should it be TaskGraphMap? Also th
sohanjg 2013/12/17 14:58:12 Done. This code was not used
41 typedef std::vector<scoped_refptr<internal::WorkerPoolTask> > TaskVector;
42
89 // Schedule running of tasks in |graph|. Tasks previously scheduled but 43 // Schedule running of tasks in |graph|. Tasks previously scheduled but
90 // no longer needed will be canceled unless already running. Canceled 44 // no longer needed will be canceled unless already running. Canceled
91 // tasks are moved to |completed_tasks_| without being run. The result 45 // tasks are moved to |completed_tasks_| without being run. The result
92 // is that once scheduled, a task is guaranteed to end up in the 46 // is that once scheduled, a task is guaranteed to end up in the
93 // |completed_tasks_| queue even if they later get canceled by another 47 // |completed_tasks_| queue even if they later get canceled by another
94 // call to SetTaskGraph(). 48 // call to SetTaskGraph().
95 void SetTaskGraph(TaskGraph* graph); 49 void SetTaskGraph(TaskGraph* graph, WorkerPool* worker_pool);
reveman 2013/12/16 17:06:30 "const WorkerPool*" instead and I prefer the worke
sohanjg 2013/12/17 14:58:12 Done.
96 50
97 // Collect all completed tasks in |completed_tasks|. 51 // Collect all completed tasks in |completed_tasks|.
98 void CollectCompletedTasks(TaskVector* completed_tasks); 52 void CollectCompletedTasks(TaskVector* completed_tasks,
53 WorkerPool* worker_pool);
reveman 2013/12/16 17:06:30 same here.
sohanjg 2013/12/17 14:58:12 Done.
54
55 void Register(const WorkerPool* worker_pool);
56 void Unregister(const WorkerPool* worker_pool);
reveman 2013/12/16 17:06:30 Please keep these sorted the same way as the imple
sohanjg 2013/12/17 14:58:12 Done.
99 57
100 private: 58 private:
101 class PriorityComparator { 59 class PriorityComparator {
102 public: 60 public:
103 bool operator()(const internal::GraphNode* a, 61 bool operator()(const internal::GraphNode* a,
104 const internal::GraphNode* b) { 62 const internal::GraphNode* b) {
105 // In this system, numerically lower priority is run first. 63 // In this system, numerically lower priority is run first.
106 if (a->priority() != b->priority()) 64 if (a->priority() != b->priority())
107 return a->priority() > b->priority(); 65 return a->priority() > b->priority();
108 66
109 // Run task with most dependents first when priority is the same. 67 // Run task with most dependents first when priority is the same.
110 return a->dependents().size() < b->dependents().size(); 68 return a->dependents().size() < b->dependents().size();
111 } 69 }
112 }; 70 };
113 71
72 // Ordered set of tasks that are ready to run.
73 typedef std::priority_queue<internal::GraphNode*,
74 std::vector<internal::GraphNode*>,
75 PriorityComparator> TaskQueue;
76
77 class TaskSet {
reveman 2013/12/16 17:06:30 TaskNamespace?
sohanjg 2013/12/17 14:58:12 Done.
78 public:
reveman 2013/12/16 17:06:30 Make this a struct instead and remove "_" suffix f
sohanjg 2013/12/17 14:58:12 Done.
79 GraphNodeMap pending_tasks_;
80 GraphNodeMap running_tasks_;
81 TaskVector completed_tasks_;
82 TaskQueue ready_to_run_tasks_;
83 };
84
85 class TaskComparator {
reveman 2013/12/16 17:06:30 This is a priority comparator too. How about renam
sohanjg 2013/12/17 14:58:12 Done.
86 public:
87 bool operator()(const TaskSet* a,
88 const TaskSet* b) {
89 if (a->ready_to_run_tasks_.top()->priority()
90 != b->ready_to_run_tasks_.top()->priority())
91 return a->ready_to_run_tasks_.top()->priority() >
92 b->ready_to_run_tasks_.top()->priority();
93
94 return a->ready_to_run_tasks_.top()->dependents().size() >
95 b->ready_to_run_tasks_.top()->dependents().size();
96 }
reveman 2013/12/16 17:06:30 Could you reuse the logic from the above comparato
sohanjg 2013/12/17 14:58:12 There are some issue with constantness when invoki
97 };
98
99 typedef std::map<const WorkerPool*, TaskSet*> TaskMapper;
reveman 2013/12/16 17:06:30 TaskNamespaceMap? Also, please use linked_ptr<Task
sohanjg 2013/12/17 14:58:12 Done.
100
101 TaskMapper tasks_;
reveman 2013/12/16 17:06:30 namespaces_? Also, please move all member variable
sohanjg 2013/12/17 14:58:12 Done.
102
103 typedef std::priority_queue<TaskSet*,
104 std::vector<TaskSet*>,
105 TaskComparator> TaskPriorityQueue;
reveman 2013/12/16 17:06:30 NamespaceQueue?
sohanjg 2013/12/17 14:58:12 Done.
106
107 TaskPriorityQueue shared_ready_to_run_tasks_;
reveman 2013/12/16 17:06:30 ready_to_run_namespaces_? Also move this below mem
sohanjg 2013/12/17 14:58:12 Done.
114 // Overridden from base::DelegateSimpleThread: 108 // Overridden from base::DelegateSimpleThread:
115 virtual void Run() OVERRIDE; 109 virtual void Run() OVERRIDE;
116 110
117 // This lock protects all members of this class except 111 // This lock protects all members of this class except
118 // |worker_pool_on_origin_thread_|. Do not read or modify anything 112 // |worker_pool_on_origin_thread_|. Do not read or modify anything
119 // without holding this lock. Do not block while holding this lock. 113 // without holding this lock. Do not block while holding this lock.
120 mutable base::Lock lock_; 114 mutable base::Lock lock_;
121 115
122 // Condition variable that is waited on by worker threads until new 116 // Condition variable that is waited on by worker threads until new
123 // tasks are ready to run or shutdown starts. 117 // tasks are ready to run or shutdown starts.
124 base::ConditionVariable has_ready_to_run_tasks_cv_; 118 base::ConditionVariable has_ready_to_run_tasks_cv_;
125 119
126 // Provides each running thread loop with a unique index. First thread 120 // Provides each running thread loop with a unique index. First thread
127 // loop index is 0. 121 // loop index is 0.
128 unsigned next_thread_index_; 122 unsigned next_thread_index_;
129 123
130 // Set during shutdown. Tells workers to exit when no more tasks 124 // Set during shutdown. Tells workers to exit when no more tasks
131 // are pending. 125 // are pending.
132 bool shutdown_; 126 bool shutdown_;
133 127
134 // This set contains all pending tasks.
135 GraphNodeMap pending_tasks_;
136
137 // Ordered set of tasks that are ready to run.
138 typedef std::priority_queue<internal::GraphNode*,
139 std::vector<internal::GraphNode*>,
140 PriorityComparator> TaskQueue;
141 TaskQueue ready_to_run_tasks_;
142
143 // This set contains all currently running tasks.
144 GraphNodeMap running_tasks_;
145
146 // Completed tasks not yet collected by origin thread.
147 TaskVector completed_tasks_;
148
149 ScopedPtrDeque<base::DelegateSimpleThread> workers_; 128 ScopedPtrDeque<base::DelegateSimpleThread> workers_;
150 129
151 DISALLOW_COPY_AND_ASSIGN(Inner); 130 DISALLOW_COPY_AND_ASSIGN(WorkerInner);
152 }; 131 };
153 132
154 WorkerPool::Inner::Inner( 133 class CC_EXPORT DerivedInner : public WorkerInner {
134 public:
135 DerivedInner();
136 };
137
138 base::LazyInstance<DerivedInner> g_workerpool_inner;
139
140
141 WorkerInner::WorkerInner(
155 size_t num_threads, const std::string& thread_name_prefix) 142 size_t num_threads, const std::string& thread_name_prefix)
156 : lock_(), 143 : lock_(),
157 has_ready_to_run_tasks_cv_(&lock_), 144 has_ready_to_run_tasks_cv_(&lock_),
158 next_thread_index_(0), 145 next_thread_index_(0),
159 shutdown_(false) { 146 shutdown_(false) {
160 base::AutoLock lock(lock_); 147 base::AutoLock lock(lock_);
161 148
162 while (workers_.size() < num_threads) { 149 while (workers_.size() < num_threads) {
163 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr( 150 scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
164 new base::DelegateSimpleThread( 151 new base::DelegateSimpleThread(
165 this, 152 this,
166 thread_name_prefix + 153 thread_name_prefix +
167 base::StringPrintf( 154 base::StringPrintf(
168 "Worker%u", 155 "Worker%u",
169 static_cast<unsigned>(workers_.size() + 1)).c_str())); 156 static_cast<unsigned>(workers_.size() + 1)).c_str()));
170 worker->Start(); 157 worker->Start();
171 #if defined(OS_ANDROID) || defined(OS_LINUX) 158 #if defined(OS_ANDROID) || defined(OS_LINUX)
172 worker->SetThreadPriority(base::kThreadPriority_Background); 159 worker->SetThreadPriority(base::kThreadPriority_Background);
173 #endif 160 #endif
174 workers_.push_back(worker.Pass()); 161 workers_.push_back(worker.Pass());
175 } 162 }
176 } 163 }
177 164
178 WorkerPool::Inner::~Inner() { 165 WorkerInner::~WorkerInner() {
179 base::AutoLock lock(lock_); 166 base::AutoLock lock(lock_);
180 167
181 DCHECK(shutdown_); 168 DCHECK(shutdown_);
182 169 DCHECK_EQ(0u, shared_ready_to_run_tasks_.size());
183 DCHECK_EQ(0u, pending_tasks_.size());
184 DCHECK_EQ(0u, ready_to_run_tasks_.size());
185 DCHECK_EQ(0u, running_tasks_.size());
186 DCHECK_EQ(0u, completed_tasks_.size());
reveman 2013/12/16 17:06:30 I think all these DCHECKs should move to the new U
187 } 170 }
188 171
189 void WorkerPool::Inner::Shutdown() { 172 void WorkerInner::Register(const WorkerPool* worker_pool) {
173 base::AutoLock lock(lock_);
174
175 // Add TaskSet
reveman 2013/12/16 17:06:30 I don't think this comment adds any valuable infor
sohanjg 2013/12/17 14:58:12 Done.
176 TaskSet* task_set= new TaskSet();
reveman 2013/12/16 17:06:30 use linked_ptr here.
sohanjg 2013/12/17 14:58:12 Done.
177 tasks_.insert(std::pair<const WorkerPool*,
reveman 2013/12/16 17:06:30 Add DCHECK(tasks_.find(worker_pool) == tasks_.end(
sohanjg 2013/12/17 14:58:12 Done.
178 TaskSet*>(worker_pool, task_set));
179
180 }
181 void WorkerInner::Unregister(const WorkerPool* worker_pool) {
182 base::AutoLock lock(lock_);
183
184 // Remove TaskSet
reveman 2013/12/16 17:06:30 remove comment
sohanjg 2013/12/17 14:58:12 Done.
185 delete tasks_[worker_pool];
reveman 2013/12/16 17:06:30 this goes away when using linked_ptr
sohanjg 2013/12/17 14:58:12 Done.
186 tasks_.erase(worker_pool);
reveman 2013/12/16 17:06:30 Add DCHECK(tasks_.find(worker_pool) != tasks_.end(
sohanjg 2013/12/17 14:58:12 Done.
187
reveman 2013/12/16 17:06:30 no need for this blankline
sohanjg 2013/12/17 14:58:12 Done.
188 }
189
190 void WorkerInner::Shutdown() {
190 { 191 {
191 base::AutoLock lock(lock_); 192 base::AutoLock lock(lock_);
192 193
193 DCHECK(!shutdown_); 194 DCHECK(!shutdown_);
194 shutdown_ = true; 195 shutdown_ = true;
195
196 // Wake up a worker so it knows it should exit. This will cause all workers 196 // Wake up a worker so it knows it should exit. This will cause all workers
197 // to exit as each will wake up another worker before exiting. 197 // to exit as each will wake up another worker before exiting.
198 has_ready_to_run_tasks_cv_.Signal(); 198 has_ready_to_run_tasks_cv_.Signal();
199 } 199 }
200 200
201 while (workers_.size()) { 201 while (workers_.size()) {
202 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front(); 202 scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
203 // http://crbug.com/240453 - Join() is considered IO and will block this 203 // http://crbug.com/240453 - Join() is considered IO and will block this
204 // thread. See also http://crbug.com/239423 for further ideas. 204 // thread. See also http://crbug.com/239423 for further ideas.
205 base::ThreadRestrictions::ScopedAllowIO allow_io; 205 base::ThreadRestrictions::ScopedAllowIO allow_io;
206 worker->Join(); 206 worker->Join();
207 } 207 }
208 } 208 }
209 209
210 void WorkerPool::Inner::SetTaskGraph(TaskGraph* graph) { 210 void WorkerInner::SetTaskGraph(TaskGraph* graph, WorkerPool* worker_pool) {
211 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty. 211 // It is OK to call SetTaskGraph() after shutdown if |graph| is empty.
212 DCHECK(graph->empty() || !shutdown_); 212 DCHECK(graph->empty() || !shutdown_);
213 213
214 GraphNodeMap new_pending_tasks; 214 GraphNodeMap new_pending_tasks;
215 GraphNodeMap new_running_tasks; 215 GraphNodeMap new_running_tasks;
216 TaskQueue new_ready_to_run_tasks; 216 TaskQueue new_ready_to_run_tasks;
217 217
218 new_pending_tasks.swap(*graph); 218 new_pending_tasks.swap(*graph);
219 219
220 { 220 {
221 base::AutoLock lock(lock_); 221 base::AutoLock lock(lock_);
222 222
223 DCHECK(tasks_.find(worker_pool) != tasks_.end());
224 TaskSet* task_set = tasks_[worker_pool];
225
223 // First remove all completed tasks from |new_pending_tasks| and 226 // First remove all completed tasks from |new_pending_tasks| and
224 // adjust number of dependencies. 227 // adjust number of dependencies.
225 for (TaskVector::iterator it = completed_tasks_.begin(); 228 for (TaskVector::iterator it = task_set->completed_tasks_.begin();
226 it != completed_tasks_.end(); ++it) { 229 it != task_set->completed_tasks_.end(); ++it) {
227 internal::WorkerPoolTask* task = it->get(); 230 internal::WorkerPoolTask* task = it->get();
228
229 scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase( 231 scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase(
230 task); 232 task);
231 if (node) { 233 if (node) {
232 for (internal::GraphNode::Vector::const_iterator it = 234 for (internal::GraphNode::Vector::const_iterator it =
233 node->dependents().begin(); 235 node->dependents().begin();
234 it != node->dependents().end(); ++it) { 236 it != node->dependents().end(); ++it) {
235 internal::GraphNode* dependent_node = *it; 237 internal::GraphNode* dependent_node = *it;
236 dependent_node->remove_dependency(); 238 dependent_node->remove_dependency();
237 } 239 }
238 } 240 }
239 } 241 }
240 242
241 // Build new running task set. 243 // Build new running task set.
242 for (GraphNodeMap::iterator it = running_tasks_.begin(); 244 for (GraphNodeMap::iterator it =
243 it != running_tasks_.end(); ++it) { 245 task_set->running_tasks_.begin();
246 it != task_set->running_tasks_.end(); ++it) {
244 internal::WorkerPoolTask* task = it->first; 247 internal::WorkerPoolTask* task = it->first;
245 // Transfer scheduled task value from |new_pending_tasks| to 248 // Transfer scheduled task value from |new_pending_tasks| to
246 // |new_running_tasks| if currently running. Value must be set to 249 // |new_running_tasks| if currently running. Value must be set to
247 // NULL if |new_pending_tasks| doesn't contain task. This does 250 // NULL if |new_pending_tasks| doesn't contain task. This does
248 // the right in both cases. 251 // the right in both cases.
249 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task)); 252 new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
250 } 253 }
reveman 2013/12/16 17:06:30 no need to add a space here
sohanjg 2013/12/17 14:58:12 Done.
251 254
252 // Build new "ready to run" tasks queue. 255 // Build new "ready to run" tasks queue.
253 // TODO(reveman): Create this queue when building the task graph instead. 256 // TODO(reveman): Create this queue when building the task graph instead.
254 for (GraphNodeMap::iterator it = new_pending_tasks.begin(); 257 for (GraphNodeMap::iterator it = new_pending_tasks.begin();
255 it != new_pending_tasks.end(); ++it) { 258 it != new_pending_tasks.end(); ++it) {
256 internal::WorkerPoolTask* task = it->first; 259 internal::WorkerPoolTask* task = it->first;
257 DCHECK(task); 260 DCHECK(task);
258 internal::GraphNode* node = it->second; 261 internal::GraphNode* node = it->second;
259 262
260 // Completed tasks should not exist in |new_pending_tasks|. 263 // Completed tasks should not exist in |new_pending_tasks|.
261 DCHECK(!task->HasFinishedRunning()); 264 DCHECK(!task->HasFinishedRunning());
262 265
263 // Call DidSchedule() to indicate that this task has been scheduled. 266 // Call DidSchedule() to indicate that this task has been scheduled.
264 // Note: This is only for debugging purposes. 267 // Note: This is only for debugging purposes.
265 task->DidSchedule(); 268 task->DidSchedule();
266 269
267 if (!node->num_dependencies()) 270 if (!node->num_dependencies())
268 new_ready_to_run_tasks.push(node); 271 new_ready_to_run_tasks.push(node);
269 272
270 // Erase the task from old pending tasks. 273 // Erase the task from old pending tasks.
271 pending_tasks_.erase(task); 274 task_set->pending_tasks_.erase(task);
275
272 } 276 }
273 277
274 completed_tasks_.reserve(completed_tasks_.size() + pending_tasks_.size()); 278 task_set->completed_tasks_.reserve(
279 task_set->completed_tasks_.size() +
280 task_set->pending_tasks_.size());
275 281
276 // The items left in |pending_tasks_| need to be canceled. 282 // The items left in |pending_tasks_| need to be canceled.
277 for (GraphNodeMap::const_iterator it = pending_tasks_.begin(); 283 for (GraphNodeMap::const_iterator it =
278 it != pending_tasks_.end(); 284 task_set->pending_tasks_.begin();
279 ++it) { 285 it != task_set->pending_tasks_.end();
280 completed_tasks_.push_back(it->first); 286 ++it) {
287 task_set->completed_tasks_.push_back(it->first);
281 } 288 }
282 289
283 // Swap task sets. 290 // Swap task sets.
284 // Note: old tasks are intentionally destroyed after releasing |lock_|. 291 // Note: old tasks are intentionally destroyed after releasing |lock_|.
285 pending_tasks_.swap(new_pending_tasks); 292 task_set->pending_tasks_.swap(new_pending_tasks);
286 running_tasks_.swap(new_running_tasks); 293 task_set->running_tasks_.swap(new_running_tasks);
287 std::swap(ready_to_run_tasks_, new_ready_to_run_tasks); 294 std::swap(task_set->ready_to_run_tasks_, new_ready_to_run_tasks);
295
296 // Re-create the shared_ready_to_run_tasks_ with new taskset
297 while (!shared_ready_to_run_tasks_.empty())
298 shared_ready_to_run_tasks_.pop();
299 shared_ready_to_run_tasks_.push(task_set);
reveman 2013/12/16 17:06:30 This will clear the queue and only add back task_s
sohanjg 2013/12/17 14:58:12 There will be only 1 taskset/workerpool, so each t
reveman 2013/12/19 02:09:38 we'll have calls to settaskgraph from multiple thr
sohanjg 2013/12/19 15:06:59 Done. When i check the run-time state of the shar
288 300
289 // If |ready_to_run_tasks_| is empty, it means we either have 301 // If |ready_to_run_tasks_| is empty, it means we either have
290 // running tasks, or we have no pending tasks. 302 // running tasks, or we have no pending tasks.
291 DCHECK(!ready_to_run_tasks_.empty() || 303 DCHECK(!task_set->ready_to_run_tasks_.empty() ||
292 (pending_tasks_.empty() || !running_tasks_.empty())); 304 (task_set->pending_tasks_.empty() ||
305 !task_set->running_tasks_.empty()));
293 306
294 // If there is more work available, wake up worker thread. 307 // If there is more work available, wake up worker thread.
295 if (!ready_to_run_tasks_.empty()) 308 if (!task_set->ready_to_run_tasks_.empty())
296 has_ready_to_run_tasks_cv_.Signal(); 309 has_ready_to_run_tasks_cv_.Signal();
297 } 310 }
298 } 311 }
299 312
300 void WorkerPool::Inner::CollectCompletedTasks(TaskVector* completed_tasks) { 313 void WorkerInner::CollectCompletedTasks
314 (TaskVector* completed_tasks, WorkerPool* worker_pool) {
301 base::AutoLock lock(lock_); 315 base::AutoLock lock(lock_);
302 316
303 DCHECK_EQ(0u, completed_tasks->size()); 317 DCHECK_EQ(0u, completed_tasks->size());
304 completed_tasks->swap(completed_tasks_); 318 if (!shared_ready_to_run_tasks_.empty())
319 completed_tasks->swap(shared_ready_to_run_tasks_.top()->completed_tasks_);
320
305 } 321 }
306 322
307 void WorkerPool::Inner::Run() { 323 void WorkerInner::Run() {
308 base::AutoLock lock(lock_); 324 base::AutoLock lock(lock_);
309 325
310 // Get a unique thread index. 326 // Get a unique thread index.
311 int thread_index = next_thread_index_++; 327 int thread_index = next_thread_index_++;
328 TaskSet* ready_to_run_task_set = NULL;
312 329
313 while (true) { 330 while (true) {
314 if (ready_to_run_tasks_.empty()) {
315 // Exit when shutdown is set and no more tasks are pending.
316 if (shutdown_ && pending_tasks_.empty())
317 break;
318 331
319 // Wait for more tasks. 332 if (shared_ready_to_run_tasks_.empty()) {
320 has_ready_to_run_tasks_cv_.Wait(); 333 // Exit when shutdown is set and no more tasks are pending.
321 continue; 334 if (shutdown_ && ready_to_run_task_set &&
335 ready_to_run_task_set->pending_tasks_.empty()) {
336 break;
337 }
338 // Wait for more tasks.
339 has_ready_to_run_tasks_cv_.Wait();
340 continue;
322 } 341 }
323 342
343 // Take top priority TaskSet from |shared_ready_to_run_tasks_|.
344 ready_to_run_task_set = shared_ready_to_run_tasks_.top();
345
324 // Take top priority task from |ready_to_run_tasks_|. 346 // Take top priority task from |ready_to_run_tasks_|.
325 scoped_refptr<internal::WorkerPoolTask> task( 347 scoped_refptr<internal::WorkerPoolTask> task(
326 ready_to_run_tasks_.top()->task()); 348 ready_to_run_task_set->ready_to_run_tasks_.top()->task());
327 ready_to_run_tasks_.pop(); 349 ready_to_run_task_set->ready_to_run_tasks_.pop();
350
328 351
329 // Move task from |pending_tasks_| to |running_tasks_|. 352 // Move task from |pending_tasks_| to |running_tasks_|.
330 DCHECK(pending_tasks_.contains(task.get())); 353 DCHECK(ready_to_run_task_set->pending_tasks_.contains(task.get()));
331 DCHECK(!running_tasks_.contains(task.get())); 354 DCHECK(!ready_to_run_task_set->running_tasks_.contains(task.get()));
332 running_tasks_.set(task.get(), pending_tasks_.take_and_erase(task.get())); 355
356 ready_to_run_task_set->running_tasks_.set(
357 task.get(), ready_to_run_task_set->pending_tasks_.take_and_erase
358 (task.get()));
333 359
334 // There may be more work available, so wake up another worker thread. 360 // There may be more work available, so wake up another worker thread.
335 has_ready_to_run_tasks_cv_.Signal(); 361 has_ready_to_run_tasks_cv_.Signal();
336 362
363
337 // Call WillRun() before releasing |lock_| and running task. 364 // Call WillRun() before releasing |lock_| and running task.
338 task->WillRun(); 365 task->WillRun();
339 366
340 { 367 {
341 base::AutoUnlock unlock(lock_); 368 base::AutoUnlock unlock(lock_);
342
343 task->RunOnWorkerThread(thread_index); 369 task->RunOnWorkerThread(thread_index);
344 } 370 }
345 371
346 // This will mark task as finished running. 372 // This will mark task as finished running.
347 task->DidRun(); 373 task->DidRun();
348 374
349 // Now iterate over all dependents to remove dependency and check 375 // Now iterate over all dependents to remove dependency and check
350 // if they are ready to run. 376 // if they are ready to run.
351 scoped_ptr<internal::GraphNode> node = running_tasks_.take_and_erase( 377 scoped_ptr<internal::GraphNode> node =
378 ready_to_run_task_set->running_tasks_.take_and_erase(
352 task.get()); 379 task.get());
353 if (node) { 380 if (node) {
354 for (internal::GraphNode::Vector::const_iterator it = 381 for (internal::GraphNode::Vector::const_iterator it =
355 node->dependents().begin(); 382 node->dependents().begin();
356 it != node->dependents().end(); ++it) { 383 it != node->dependents().end(); ++it) {
357 internal::GraphNode* dependent_node = *it; 384 internal::GraphNode* dependent_node = *it;
358 385
359 dependent_node->remove_dependency(); 386 dependent_node->remove_dependency();
360 // Task is ready if it has no dependencies. Add it to 387 // Task is ready if it has no dependencies. Add it to
361 // |ready_to_run_tasks_|. 388 // |ready_to_run_tasks_|.
362 if (!dependent_node->num_dependencies()) 389 if (!dependent_node->num_dependencies())
363 ready_to_run_tasks_.push(dependent_node); 390 ready_to_run_task_set->ready_to_run_tasks_.push(dependent_node);
391 }
364 } 392 }
365 }
366 393
367 // Finally add task to |completed_tasks_|. 394 // Finally add task to |completed_tasks_|.
368 completed_tasks_.push_back(task); 395 ready_to_run_task_set->completed_tasks_.push_back(task);
396
397 // Pop when ready_to_run_tasks_ is empty
398 if (ready_to_run_task_set->ready_to_run_tasks_.empty())
399 shared_ready_to_run_tasks_.pop();
400
369 } 401 }
370 402
371 // We noticed we should exit. Wake up the next worker so it knows it should 403 // We noticed we should exit. Wake up the next worker so it knows it should
372 // exit as well (because the Shutdown() code only signals once). 404 // exit as well (because the Shutdown() code only signals once).
373 has_ready_to_run_tasks_cv_.Signal(); 405 has_ready_to_run_tasks_cv_.Signal();
374 } 406 }
375 407
408 // Derived WorkerInner Ctor
409 DerivedInner::DerivedInner(): WorkerInner
410 (switches::GetNumRasterThreads(), "CompositorRaster") {
411 }
412
413 } // namespace
414
415 namespace internal {
416
417 WorkerPoolTask::WorkerPoolTask()
418 : did_schedule_(false),
419 did_run_(false),
420 did_complete_(false) {
421 }
422
423 WorkerPoolTask::~WorkerPoolTask() {
424 DCHECK_EQ(did_schedule_, did_complete_);
425 DCHECK(!did_run_ || did_schedule_);
426 DCHECK(!did_run_ || did_complete_);
427 }
428
429 void WorkerPoolTask::DidSchedule() {
430 DCHECK(!did_complete_);
431 did_schedule_ = true;
432 }
433
434 void WorkerPoolTask::WillRun() {
435 DCHECK(did_schedule_);
436 DCHECK(!did_complete_);
437 DCHECK(!did_run_);
438 }
439
440 void WorkerPoolTask::DidRun() {
441 did_run_ = true;
442 }
443
444 void WorkerPoolTask::WillComplete() {
445 DCHECK(!did_complete_);
446 }
447
448 void WorkerPoolTask::DidComplete() {
449 DCHECK(did_schedule_);
450 DCHECK(!did_complete_);
451 did_complete_ = true;
452 }
453
454 bool WorkerPoolTask::HasFinishedRunning() const {
455 return did_run_;
456 }
457
458 bool WorkerPoolTask::HasCompleted() const {
459 return did_complete_;
460 }
461
462 GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
463 : task_(task),
464 priority_(priority),
465 num_dependencies_(0) {
466 }
467
468 GraphNode::~GraphNode() {
469 }
470
471 } // namespace internal
472
473
376 WorkerPool::WorkerPool(size_t num_threads, 474 WorkerPool::WorkerPool(size_t num_threads,
377 const std::string& thread_name_prefix) 475 const std::string& thread_name_prefix)
378 : in_dispatch_completion_callbacks_(false), 476 : in_dispatch_completion_callbacks_(false) {
379 inner_(make_scoped_ptr(new Inner(num_threads, thread_name_prefix))) { 477 g_workerpool_inner.Pointer()->Register(this);
380 } 478 }
381 479
382 WorkerPool::~WorkerPool() { 480 WorkerPool::~WorkerPool() {
481 g_workerpool_inner.Pointer()->Unregister(this);
383 } 482 }
384 483
385 void WorkerPool::Shutdown() { 484 void WorkerPool::Shutdown() {
386 TRACE_EVENT0("cc", "WorkerPool::Shutdown"); 485 TRACE_EVENT0("cc", "WorkerPool::Shutdown");
387 486
388 DCHECK(!in_dispatch_completion_callbacks_); 487 DCHECK(!in_dispatch_completion_callbacks_);
389 488 g_workerpool_inner.Pointer()->Shutdown();
390 inner_->Shutdown();
391 } 489 }
392 490
393 void WorkerPool::CheckForCompletedTasks() { 491 void WorkerPool::CheckForCompletedTasks() {
394 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks"); 492 TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
395 493
396 DCHECK(!in_dispatch_completion_callbacks_); 494 DCHECK(!in_dispatch_completion_callbacks_);
397 495
398 TaskVector completed_tasks; 496 TaskVector completed_tasks;
399 inner_->CollectCompletedTasks(&completed_tasks); 497 g_workerpool_inner.Pointer()->CollectCompletedTasks(&completed_tasks, this);
400 ProcessCompletedTasks(completed_tasks); 498 ProcessCompletedTasks(completed_tasks);
401 } 499 }
402 500
403 void WorkerPool::ProcessCompletedTasks( 501 void WorkerPool::ProcessCompletedTasks(
404 const TaskVector& completed_tasks) { 502 const TaskVector& completed_tasks) {
405 TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks", 503 TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks",
406 "completed_task_count", completed_tasks.size()); 504 "completed_task_count", completed_tasks.size());
407 505
408 // Worker pool instance is not reentrant while processing completed tasks. 506 // Worker pool instance is not reentrant while processing completed tasks.
409 in_dispatch_completion_callbacks_ = true; 507 in_dispatch_completion_callbacks_ = true;
410 508
411 for (TaskVector::const_iterator it = completed_tasks.begin(); 509 for (TaskVector::const_iterator it = completed_tasks.begin();
412 it != completed_tasks.end(); 510 it != completed_tasks.end();
413 ++it) { 511 ++it) {
414 internal::WorkerPoolTask* task = it->get(); 512 internal::WorkerPoolTask* task = it->get();
415 513
416 task->WillComplete(); 514 task->WillComplete();
417 task->CompleteOnOriginThread(); 515 task->CompleteOnOriginThread();
418 task->DidComplete(); 516 task->DidComplete();
419 } 517 }
420 518
421 in_dispatch_completion_callbacks_ = false; 519 in_dispatch_completion_callbacks_ = false;
422 } 520 }
423 521
424 void WorkerPool::SetTaskGraph(TaskGraph* graph) { 522 void WorkerPool::SetTaskGraph(TaskGraph* graph) {
425 TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph", 523 TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph",
426 "num_tasks", graph->size()); 524 "num_tasks", graph->size());
427 525
428 DCHECK(!in_dispatch_completion_callbacks_); 526 DCHECK(!in_dispatch_completion_callbacks_);
429 527 g_workerpool_inner.Pointer()->SetTaskGraph(graph, this);
430 inner_->SetTaskGraph(graph);
431 } 528 }
432 529
433 } // namespace cc 530 } // namespace cc
OLDNEW
« no previous file with comments | « cc/resources/worker_pool.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698