OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "cc/worker_pool.h" | 5 #include "cc/worker_pool.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/stl_util.h" | 10 #include "base/stl_util.h" |
11 #include "base/stringprintf.h" | 11 #include "base/stringprintf.h" |
12 | 12 |
13 #if defined(OS_ANDROID) | 13 #if defined(OS_ANDROID) |
14 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 14 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
15 #include <sys/resource.h> | 15 #include <sys/resource.h> |
16 #endif | 16 #endif |
17 | 17 |
18 namespace cc { | 18 namespace cc { |
19 | 19 |
20 namespace { | 20 namespace { |
21 | 21 |
22 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { | 22 class WorkerPoolTaskImpl : public internal::WorkerPoolTask { |
23 public: | 23 public: |
24 WorkerPoolTaskImpl(const WorkerPool::Callback& task, | 24 WorkerPoolTaskImpl(const WorkerPool::Callback& task, |
25 const base::Closure& reply) | 25 const base::Callback<void(bool)>& reply) |
26 : internal::WorkerPoolTask(reply), | 26 : internal::WorkerPoolTask(reply), |
27 task_(task) {} | 27 task_(task) {} |
28 | 28 |
29 virtual void Run(RenderingStats* rendering_stats) OVERRIDE { | 29 virtual void Run(RenderingStats* rendering_stats) OVERRIDE { |
30 task_.Run(rendering_stats); | 30 task_.Run(rendering_stats); |
31 base::subtle::Release_Store(&completed_, 1); | |
31 } | 32 } |
32 | 33 |
33 private: | 34 private: |
34 WorkerPool::Callback task_; | 35 WorkerPool::Callback task_; |
35 }; | 36 }; |
36 | 37 |
37 const char* kWorkerThreadNamePrefix = "Compositor"; | 38 const char* kWorkerThreadNamePrefix = "Compositor"; |
38 | 39 |
39 // Allow two pending tasks per worker. This keeps resource usage | 40 const int kCheckForCompletedTasksDelayMs = 6; |
40 // low while making sure workers aren't unnecessarily idle. | |
41 const int kNumPendingTasksPerWorker = 2; | |
42 | 41 |
43 } // namespace | 42 } // namespace |
44 | 43 |
45 namespace internal { | 44 namespace internal { |
46 | 45 |
47 WorkerPoolTask::WorkerPoolTask(const base::Closure& reply) | 46 WorkerPoolTask::WorkerPoolTask(const base::Callback<void(bool)>& reply) |
48 : reply_(reply) { | 47 : reply_(reply) { |
48 base::subtle::Acquire_Store(&completed_, 0); | |
49 } | 49 } |
50 | 50 |
51 WorkerPoolTask::~WorkerPoolTask() { | 51 WorkerPoolTask::~WorkerPoolTask() { |
52 } | 52 } |
53 | 53 |
54 void WorkerPoolTask::Completed() { | 54 bool WorkerPoolTask::IsPending() { |
55 reply_.Run(); | 55 return base::subtle::Acquire_Load(&completed_) == 0; |
56 } | |
57 | |
58 void WorkerPoolTask::Completed(bool more_tasks_completed) { | |
59 DCHECK_EQ(base::subtle::Acquire_Load(&completed_), 1); | |
60 reply_.Run(more_tasks_completed); | |
56 } | 61 } |
57 | 62 |
58 } // namespace internal | 63 } // namespace internal |
59 | 64 |
60 WorkerPool::Worker::Worker(WorkerPool* worker_pool, const std::string name) | 65 WorkerPool::Worker::Worker(WorkerPool* worker_pool, const std::string name) |
61 : base::Thread(name.c_str()), | 66 : base::Thread(name.c_str()), |
62 worker_pool_(worker_pool), | 67 worker_pool_(worker_pool), |
63 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), | |
64 rendering_stats_(make_scoped_ptr(new RenderingStats)), | 68 rendering_stats_(make_scoped_ptr(new RenderingStats)), |
65 record_rendering_stats_(false) { | 69 record_rendering_stats_(false) { |
66 Start(); | 70 Start(); |
67 DCHECK(IsRunning()); | 71 DCHECK(IsRunning()); |
68 } | 72 } |
69 | 73 |
70 WorkerPool::Worker::~Worker() { | 74 WorkerPool::Worker::~Worker() { |
71 DCHECK(!IsRunning()); | 75 DCHECK(!IsRunning()); |
72 DCHECK_EQ(pending_tasks_.size(), 0); | 76 DCHECK_EQ(pending_tasks_.size(), 0); |
73 } | 77 } |
74 | 78 |
75 void WorkerPool::Worker::StopAfterCompletingAllPendingTasks() { | 79 void WorkerPool::Worker::StopAfterCompletingAllPendingTasks() { |
76 // Signals the thread to exit and returns once all pending tasks have run. | 80 // Signals the thread to exit and returns once all pending tasks have run. |
77 Stop(); | 81 Stop(); |
78 | 82 |
79 // Complete all pending tasks. The Stop() call above guarantees that | 83 // Complete all pending tasks. The Stop() call above guarantees that |
80 // all tasks have finished running. | 84 // all tasks have finished running. |
81 while (!pending_tasks_.empty()) | 85 while (!pending_tasks_.empty()) |
82 OnTaskCompleted(); | 86 OnTaskCompleted(); |
83 | |
84 // Cancel all pending replies. | |
85 weak_ptr_factory_.InvalidateWeakPtrs(); | |
86 } | 87 } |
87 | 88 |
88 void WorkerPool::Worker::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { | 89 void WorkerPool::Worker::PostTask(scoped_ptr<internal::WorkerPoolTask> task) { |
89 DCHECK_LT(num_pending_tasks(), kNumPendingTasksPerWorker); | |
90 | |
91 RenderingStats* stats = | 90 RenderingStats* stats = |
92 record_rendering_stats_ ? rendering_stats_.get() : NULL; | 91 record_rendering_stats_ ? rendering_stats_.get() : NULL; |
93 | 92 |
94 message_loop_proxy()->PostTaskAndReply( | 93 worker_pool_->WillPostWorkTask(); |
94 | |
95 message_loop_proxy()->PostTask( | |
95 FROM_HERE, | 96 FROM_HERE, |
96 base::Bind(&Worker::RunTask, | 97 base::Bind(&Worker::RunTask, |
97 base::Unretained(task.get()), | 98 base::Unretained(task.get()), |
98 base::Unretained(stats)), | 99 base::Unretained(worker_pool_), |
99 base::Bind(&Worker::OnTaskCompleted, weak_ptr_factory_.GetWeakPtr())); | 100 base::Unretained(stats))); |
100 | 101 |
101 pending_tasks_.push_back(task.Pass()); | 102 pending_tasks_.push_back(task.Pass()); |
102 | |
103 worker_pool_->DidNumPendingTasksChange(); | |
104 } | 103 } |
105 | 104 |
106 void WorkerPool::Worker::Init() { | 105 void WorkerPool::Worker::Init() { |
107 #if defined(OS_ANDROID) | 106 #if defined(OS_ANDROID) |
108 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) | 107 // TODO(epenner): Move thread priorities to base. (crbug.com/170549) |
109 int nice_value = 10; // Idle priority. | 108 int nice_value = 10; // Idle priority. |
110 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); | 109 setpriority(PRIO_PROCESS, base::PlatformThread::CurrentId(), nice_value); |
111 #endif | 110 #endif |
112 } | 111 } |
113 | 112 |
114 // static | 113 // static |
115 void WorkerPool::Worker::RunTask( | 114 void WorkerPool::Worker::RunTask( |
116 internal::WorkerPoolTask* task, RenderingStats* rendering_stats) { | 115 internal::WorkerPoolTask* task, |
116 WorkerPool* worker_pool, | |
117 RenderingStats* rendering_stats) { | |
117 task->Run(rendering_stats); | 118 task->Run(rendering_stats); |
119 worker_pool->OnWorkCompletedOnWorkerThread(); | |
118 } | 120 } |
119 | 121 |
120 void WorkerPool::Worker::OnTaskCompleted() { | 122 void WorkerPool::Worker::OnTaskCompleted() { |
121 CHECK(!pending_tasks_.empty()); | 123 CHECK(!pending_tasks_.empty()); |
122 | 124 |
125 // Notify worker pool of task completion. | |
126 worker_pool_->OnTaskCompleted(); | |
127 | |
123 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); | 128 scoped_ptr<internal::WorkerPoolTask> task = pending_tasks_.take_front(); |
124 task->Completed(); | 129 task->Completed(worker_pool_->MoreTasksCompleted()); |
130 } | |
125 | 131 |
126 worker_pool_->DidNumPendingTasksChange(); | 132 void WorkerPool::Worker::CheckForCompletedTasks() { |
133 while (!pending_tasks_.empty()) { | |
nduca
2013/02/13 09:06:29
any reason we return tasks fifo? doesnt seem requi
reveman
2013/02/13 15:32:57
not really but they will complete in fifo order on
| |
134 if (pending_tasks_.front()->IsPending()) | |
135 return; | |
136 | |
137 OnTaskCompleted(); | |
138 } | |
127 } | 139 } |
128 | 140 |
129 WorkerPool::WorkerPool(size_t num_threads) | 141 WorkerPool::WorkerPool(size_t num_threads) |
130 : workers_need_sorting_(false), | 142 : origin_loop_(base::MessageLoopProxy::current()), |
131 shutdown_(false) { | 143 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
144 workers_need_sorting_(false), | |
145 task_count_(0), | |
146 shutdown_(false), | |
147 check_for_completed_tasks_pending_(false), | |
148 idle_callback_( | |
149 base::Bind(&WorkerPool::OnIdle, weak_ptr_factory_.GetWeakPtr())) { | |
132 const std::string thread_name_prefix = kWorkerThreadNamePrefix; | 150 const std::string thread_name_prefix = kWorkerThreadNamePrefix; |
133 while (workers_.size() < num_threads) { | 151 while (workers_.size() < num_threads) { |
134 int thread_number = workers_.size() + 1; | 152 int thread_number = workers_.size() + 1; |
135 workers_.push_back(new Worker( | 153 workers_.push_back(new Worker( |
136 this, | 154 this, |
137 thread_name_prefix + StringPrintf("Worker%d", thread_number).c_str())); | 155 thread_name_prefix + StringPrintf("Worker%d", thread_number).c_str())); |
138 } | 156 } |
157 base::subtle::Acquire_Store(&work_count_, 0); | |
139 } | 158 } |
140 | 159 |
141 WorkerPool::~WorkerPool() { | 160 WorkerPool::~WorkerPool() { |
142 Shutdown(); | 161 Shutdown(); |
143 STLDeleteElements(&workers_); | 162 STLDeleteElements(&workers_); |
163 // Cancel all pending callbacks. | |
164 weak_ptr_factory_.InvalidateWeakPtrs(); | |
165 DCHECK_EQ(base::subtle::Acquire_Load(&work_count_), 0); | |
166 DCHECK_EQ(task_count_, 0); | |
144 } | 167 } |
145 | 168 |
146 void WorkerPool::Shutdown() { | 169 void WorkerPool::Shutdown() { |
147 DCHECK(!shutdown_); | 170 DCHECK(!shutdown_); |
148 shutdown_ = true; | 171 shutdown_ = true; |
149 | 172 |
150 for (WorkerVector::iterator it = workers_.begin(); | 173 for (WorkerVector::iterator it = workers_.begin(); |
151 it != workers_.end(); it++) { | 174 it != workers_.end(); it++) { |
152 Worker* worker = *it; | 175 Worker* worker = *it; |
153 worker->StopAfterCompletingAllPendingTasks(); | 176 worker->StopAfterCompletingAllPendingTasks(); |
154 } | 177 } |
155 } | 178 } |
156 | 179 |
157 void WorkerPool::PostTaskAndReply( | 180 void WorkerPool::PostTaskAndReply(const Callback& task, const Reply& reply) { |
158 const Callback& task, const base::Closure& reply) { | |
159 Worker* worker = GetWorkerForNextTask(); | 181 Worker* worker = GetWorkerForNextTask(); |
160 | 182 |
161 worker->PostTask( | 183 worker->PostTask( |
162 make_scoped_ptr(new WorkerPoolTaskImpl( | 184 make_scoped_ptr(new WorkerPoolTaskImpl( |
163 task, | 185 task, |
164 reply)).PassAs<internal::WorkerPoolTask>()); | 186 reply)).PassAs<internal::WorkerPoolTask>()); |
165 } | 187 } |
166 | 188 |
167 bool WorkerPool::IsBusy() { | |
168 Worker* worker = GetWorkerForNextTask(); | |
169 | |
170 return worker->num_pending_tasks() >= kNumPendingTasksPerWorker; | |
171 } | |
172 | |
173 void WorkerPool::SetRecordRenderingStats(bool record_rendering_stats) { | 189 void WorkerPool::SetRecordRenderingStats(bool record_rendering_stats) { |
174 for (WorkerVector::iterator it = workers_.begin(); | 190 for (WorkerVector::iterator it = workers_.begin(); |
175 it != workers_.end(); ++it) { | 191 it != workers_.end(); ++it) { |
176 Worker* worker = *it; | 192 Worker* worker = *it; |
177 worker->set_record_rendering_stats(record_rendering_stats); | 193 worker->set_record_rendering_stats(record_rendering_stats); |
178 } | 194 } |
179 } | 195 } |
180 | 196 |
181 void WorkerPool::GetRenderingStats(RenderingStats* stats) { | 197 void WorkerPool::GetRenderingStats(RenderingStats* stats) { |
182 stats->totalRasterizeTime = base::TimeDelta(); | 198 stats->totalRasterizeTime = base::TimeDelta(); |
(...skipping 14 matching lines...) Expand all Loading... | |
197 worker->rendering_stats()->totalDeferredImageDecodeTime; | 213 worker->rendering_stats()->totalDeferredImageDecodeTime; |
198 } | 214 } |
199 } | 215 } |
200 | 216 |
201 WorkerPool::Worker* WorkerPool::GetWorkerForNextTask() { | 217 WorkerPool::Worker* WorkerPool::GetWorkerForNextTask() { |
202 CHECK(!shutdown_); | 218 CHECK(!shutdown_); |
203 SortWorkersIfNeeded(); | 219 SortWorkersIfNeeded(); |
204 return workers_.front(); | 220 return workers_.front(); |
205 } | 221 } |
206 | 222 |
207 void WorkerPool::DidNumPendingTasksChange() { | 223 void WorkerPool::ScheduleCheckForCompletedTasks() { |
224 if (check_for_completed_tasks_pending_) | |
225 return; | |
226 | |
227 check_for_completed_tasks_callback_.Reset( | |
228 base::Bind(&WorkerPool::CheckForCompletedTasks, | |
229 weak_ptr_factory_.GetWeakPtr())); | |
230 origin_loop_->PostDelayedTask( | |
231 FROM_HERE, | |
232 check_for_completed_tasks_callback_.callback(), | |
233 base::TimeDelta::FromMilliseconds(kCheckForCompletedTasksDelayMs)); | |
234 check_for_completed_tasks_pending_ = true; | |
235 } | |
236 | |
237 void WorkerPool::WillPostWorkTask() { | |
238 base::subtle::Barrier_AtomicIncrement(&work_count_, 1); | |
239 ScheduleCheckForCompletedTasks(); | |
208 workers_need_sorting_ = true; | 240 workers_need_sorting_ = true; |
241 ++task_count_; | |
242 } | |
243 | |
244 void WorkerPool::OnWorkCompletedOnWorkerThread() { | |
245 // Post idle handler task when pool work count reaches 0. | |
246 if (base::subtle::Barrier_AtomicIncrement(&work_count_, -1) == 0) { | |
nduca
2013/02/13 09:06:29
Do we not have atomicdec?
reveman
2013/02/13 15:32:57
no, but Barrier_AtomicIncrement(-1) is commonly us
| |
247 origin_loop_->PostTask(FROM_HERE, idle_callback_); | |
248 } | |
249 } | |
250 | |
251 void WorkerPool::OnIdle() { | |
252 if (base::subtle::Acquire_Load(&work_count_) == 0) { | |
253 check_for_completed_tasks_callback_.Cancel(); | |
254 CheckForCompletedTasks(); | |
255 } | |
256 } | |
257 | |
258 void WorkerPool::CheckForCompletedTasks() { | |
259 check_for_completed_tasks_pending_ = false; | |
260 | |
261 for (WorkerVector::iterator it = workers_.begin(); | |
262 it != workers_.end(); it++) { | |
263 Worker* worker = *it; | |
264 worker->CheckForCompletedTasks(); | |
265 } | |
266 | |
267 if (task_count_) | |
268 ScheduleCheckForCompletedTasks(); | |
269 } | |
270 | |
271 void WorkerPool::OnTaskCompleted() { | |
272 workers_need_sorting_ = true; | |
273 --task_count_; | |
274 } | |
275 | |
276 bool WorkerPool::MoreTasksCompleted() { | |
277 return base::subtle::Acquire_Load(&work_count_) < task_count_; | |
209 } | 278 } |
210 | 279 |
211 void WorkerPool::SortWorkersIfNeeded() { | 280 void WorkerPool::SortWorkersIfNeeded() { |
212 if (!workers_need_sorting_) | 281 if (!workers_need_sorting_) |
213 return; | 282 return; |
214 | 283 |
215 std::sort(workers_.begin(), workers_.end(), NumPendingTasksComparator()); | 284 std::sort(workers_.begin(), workers_.end(), NumPendingTasksComparator()); |
216 workers_need_sorting_ = false; | 285 workers_need_sorting_ = false; |
217 } | 286 } |
218 | 287 |
219 } // namespace cc | 288 } // namespace cc |
OLD | NEW |