OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <algorithm> | 9 #include <algorithm> |
10 #include <utility> | 10 #include <utility> |
(...skipping 174 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
185 | 185 |
186 // Number of tasks executed since the last time the | 186 // Number of tasks executed since the last time the |
187 // TaskScheduler.NumTasksBeforeDetach histogram was recorded. | 187 // TaskScheduler.NumTasksBeforeDetach histogram was recorded. |
188 size_t num_tasks_since_last_detach_ = 0; | 188 size_t num_tasks_since_last_detach_ = 0; |
189 | 189 |
190 const int index_; | 190 const int index_; |
191 | 191 |
192 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); | 192 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
193 }; | 193 }; |
194 | 194 |
195 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | |
196 const std::string& name, | |
197 ThreadPriority priority_hint, | |
198 ReEnqueueSequenceCallback re_enqueue_sequence_callback, | |
199 TaskTracker* task_tracker, | |
200 DelayedTaskManager* delayed_task_manager) | |
201 : name_(name), | |
202 priority_hint_(priority_hint), | |
203 re_enqueue_sequence_callback_(std::move(re_enqueue_sequence_callback)), | |
204 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | |
205 idle_workers_stack_cv_for_testing_( | |
206 idle_workers_stack_lock_.CreateConditionVariable()), | |
207 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | |
208 WaitableEvent::InitialState::NOT_SIGNALED), | |
209 #if DCHECK_IS_ON() | |
210 workers_created_(WaitableEvent::ResetPolicy::MANUAL, | |
211 WaitableEvent::InitialState::NOT_SIGNALED), | |
212 #endif | |
213 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. | |
214 detach_duration_histogram_(Histogram::FactoryTimeGet( | |
215 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, | |
216 TimeDelta::FromMilliseconds(1), | |
217 TimeDelta::FromHours(1), | |
218 50, | |
219 HistogramBase::kUmaTargetedHistogramFlag)), | |
220 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more | |
221 // than 1000 tasks before detaching, there is no need to know the exact | |
222 // number of tasks that ran. | |
223 num_tasks_before_detach_histogram_(Histogram::FactoryGet( | |
224 kNumTasksBeforeDetachHistogramPrefix + name_ + kPoolNameSuffix, | |
225 1, | |
226 1000, | |
227 50, | |
228 HistogramBase::kUmaTargetedHistogramFlag)), | |
229 // Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A SchedulerWorker is | |
230 // expected to run between zero and a few tens of tasks between waits. | |
231 // When it runs more than 100 tasks, there is no need to know the exact | |
232 // number of tasks that ran. | |
233 num_tasks_between_waits_histogram_(Histogram::FactoryGet( | |
234 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix, | |
235 1, | |
236 100, | |
237 50, | |
238 HistogramBase::kUmaTargetedHistogramFlag)), | |
239 task_tracker_(task_tracker), | |
240 delayed_task_manager_(delayed_task_manager) { | |
241 DCHECK(task_tracker_); | |
242 DCHECK(delayed_task_manager_); | |
243 } | |
244 | |
245 void SchedulerWorkerPoolImpl::Start(const SchedulerWorkerPoolParams& params) { | |
246 suggested_reclaim_time_ = params.suggested_reclaim_time(); | |
247 | |
248 std::vector<SchedulerWorker*> workers_to_wake_up; | |
249 | |
250 { | |
251 // Get the number of enqueued Sequences to know how many workers to wake up | |
252 // at the end of this method. Keep the transaction alive to prevent new | |
253 // Sequences from being enqueued in the meantime. | |
254 auto priority_queue_transaction = shared_priority_queue_.BeginTransaction(); | |
255 const size_t num_workers_to_wake_up = priority_queue_transaction->Size(); | |
256 | |
257 // The number of workers created alive is |num_workers_to_wake_up|, plus one | |
258 // if the standby thread policy is ONE. | |
259 const size_t num_alive_workers = | |
robliao
2017/04/05 23:29:23
Can do this computation at the end to avoid holdin
fdoray
2017/04/06 15:07:12
PQ lock is a predecessor of |idle_workers_stack_lo
| |
260 num_workers_to_wake_up + | |
261 (params.standby_thread_policy() == | |
262 SchedulerWorkerPoolParams::StandbyThreadPolicy::ONE | |
263 ? 1 | |
264 : 0); | |
265 | |
266 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | |
267 | |
268 #if DCHECK_IS_ON() | |
269 DCHECK(!workers_created_.IsSignaled()); | |
270 #endif | |
271 | |
272 DCHECK(workers_.empty()); | |
273 workers_.resize(params.max_threads()); | |
274 | |
275 // Create workers in reverse order of index so that the worker with the | |
276 // highest index is at the bottom of the idle stack. | |
277 for (int index = params.max_threads() - 1; index >= 0; --index) { | |
278 const SchedulerWorker::InitialState initial_state = | |
279 static_cast<size_t>(index) < num_alive_workers | |
280 ? SchedulerWorker::InitialState::ALIVE | |
281 : SchedulerWorker::InitialState::DETACHED; | |
282 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( | |
283 params.priority_hint(), | |
284 MakeUnique<SchedulerWorkerDelegateImpl>( | |
285 this, re_enqueue_sequence_callback_, index), | |
286 task_tracker_, initial_state, params.backward_compatibility()); | |
287 if (!worker) | |
288 break; | |
289 | |
290 if (static_cast<size_t>(index) < num_workers_to_wake_up) | |
291 workers_to_wake_up.push_back(worker.get()); | |
292 else | |
293 idle_workers_stack_.Push(worker.get()); | |
294 | |
295 workers_[index] = std::move(worker); | |
296 } | |
297 | |
298 #if DCHECK_IS_ON() | |
299 workers_created_.Signal(); | |
300 #endif | |
301 | |
302 CHECK(!workers_.empty()); | |
303 } | |
304 | |
305 for (SchedulerWorker* worker : workers_to_wake_up) | |
306 worker->WakeUp(); | |
307 } | |
308 | |
195 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { | 309 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
196 // SchedulerWorkerPool should never be deleted in production unless its | 310 // SchedulerWorkerPool should never be deleted in production unless its |
197 // initialization failed. | 311 // initialization failed. |
198 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 312 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
199 } | 313 } |
200 | 314 |
201 // static | |
202 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( | |
203 const SchedulerWorkerPoolParams& params, | |
204 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | |
205 TaskTracker* task_tracker, | |
206 DelayedTaskManager* delayed_task_manager) { | |
207 auto worker_pool = WrapUnique( | |
208 new SchedulerWorkerPoolImpl(params, task_tracker, delayed_task_manager)); | |
209 if (worker_pool->Initialize(params, re_enqueue_sequence_callback)) | |
210 return worker_pool; | |
211 return nullptr; | |
212 } | |
213 | |
214 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( | 315 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
215 const TaskTraits& traits) { | 316 const TaskTraits& traits) { |
216 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 317 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
217 } | 318 } |
218 | 319 |
219 scoped_refptr<SequencedTaskRunner> | 320 scoped_refptr<SequencedTaskRunner> |
220 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( | 321 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( |
221 const TaskTraits& traits) { | 322 const TaskTraits& traits) { |
222 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 323 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
223 } | 324 } |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
293 } | 394 } |
294 } | 395 } |
295 | 396 |
296 void SchedulerWorkerPoolImpl::GetHistograms( | 397 void SchedulerWorkerPoolImpl::GetHistograms( |
297 std::vector<const HistogramBase*>* histograms) const { | 398 std::vector<const HistogramBase*>* histograms) const { |
298 histograms->push_back(detach_duration_histogram_); | 399 histograms->push_back(detach_duration_histogram_); |
299 histograms->push_back(num_tasks_between_waits_histogram_); | 400 histograms->push_back(num_tasks_between_waits_histogram_); |
300 } | 401 } |
301 | 402 |
302 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { | 403 int SchedulerWorkerPoolImpl::GetMaxConcurrentTasksDeprecated() const { |
404 #if DCHECK_IS_ON() | |
405 DCHECK(workers_created_.IsSignaled()); | |
406 #endif | |
303 return workers_.size(); | 407 return workers_.size(); |
304 } | 408 } |
305 | 409 |
306 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { | 410 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
411 #if DCHECK_IS_ON() | |
412 DCHECK(workers_created_.IsSignaled()); | |
413 #endif | |
307 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 414 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
308 while (idle_workers_stack_.Size() < workers_.size()) | 415 while (idle_workers_stack_.Size() < workers_.size()) |
309 idle_workers_stack_cv_for_testing_->Wait(); | 416 idle_workers_stack_cv_for_testing_->Wait(); |
310 } | 417 } |
311 | 418 |
312 void SchedulerWorkerPoolImpl::JoinForTesting() { | 419 void SchedulerWorkerPoolImpl::JoinForTesting() { |
420 #if DCHECK_IS_ON() | |
421 DCHECK(workers_created_.IsSignaled()); | |
422 #endif | |
313 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) | 423 DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) |
314 << "Workers can detach during join."; | 424 << "Workers can detach during join."; |
315 for (const auto& worker : workers_) | 425 for (const auto& worker : workers_) |
316 worker->JoinForTesting(); | 426 worker->JoinForTesting(); |
317 | 427 |
318 DCHECK(!join_for_testing_returned_.IsSignaled()); | 428 DCHECK(!join_for_testing_returned_.IsSignaled()); |
319 join_for_testing_returned_.Signal(); | 429 join_for_testing_returned_.Signal(); |
320 } | 430 } |
321 | 431 |
322 void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { | 432 void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { |
323 worker_detachment_disallowed_.Set(); | 433 worker_detachment_disallowed_.Set(); |
324 } | 434 } |
325 | 435 |
326 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { | 436 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { |
437 #if DCHECK_IS_ON() | |
438 DCHECK(workers_created_.IsSignaled()); | |
439 #endif | |
327 size_t num_alive_workers = 0; | 440 size_t num_alive_workers = 0; |
328 for (const auto& worker : workers_) { | 441 for (const auto& worker : workers_) { |
329 if (worker->ThreadAliveForTesting()) | 442 if (worker->ThreadAliveForTesting()) |
330 ++num_alive_workers; | 443 ++num_alive_workers; |
331 } | 444 } |
332 return num_alive_workers; | 445 return num_alive_workers; |
333 } | 446 } |
334 | 447 |
335 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 448 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
336 SchedulerWorkerDelegateImpl( | 449 SchedulerWorkerDelegateImpl( |
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
458 } | 571 } |
459 | 572 |
460 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { | 573 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { |
461 DCHECK(!did_detach_since_last_get_work_); | 574 DCHECK(!did_detach_since_last_get_work_); |
462 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); | 575 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); |
463 num_tasks_since_last_detach_ = 0; | 576 num_tasks_since_last_detach_ = 0; |
464 did_detach_since_last_get_work_ = true; | 577 did_detach_since_last_get_work_ = true; |
465 last_detach_time_ = TimeTicks::Now(); | 578 last_detach_time_ = TimeTicks::Now(); |
466 } | 579 } |
467 | 580 |
468 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | |
469 const SchedulerWorkerPoolParams& params, | |
470 TaskTracker* task_tracker, | |
471 DelayedTaskManager* delayed_task_manager) | |
472 : name_(params.name()), | |
473 suggested_reclaim_time_(params.suggested_reclaim_time()), | |
474 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | |
475 idle_workers_stack_cv_for_testing_( | |
476 idle_workers_stack_lock_.CreateConditionVariable()), | |
477 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | |
478 WaitableEvent::InitialState::NOT_SIGNALED), | |
479 #if DCHECK_IS_ON() | |
480 workers_created_(WaitableEvent::ResetPolicy::MANUAL, | |
481 WaitableEvent::InitialState::NOT_SIGNALED), | |
482 #endif | |
483 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. | |
484 detach_duration_histogram_(Histogram::FactoryTimeGet( | |
485 kDetachDurationHistogramPrefix + name_ + kPoolNameSuffix, | |
486 TimeDelta::FromMilliseconds(1), | |
487 TimeDelta::FromHours(1), | |
488 50, | |
489 HistogramBase::kUmaTargetedHistogramFlag)), | |
490 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more | |
491 // than 1000 tasks before detaching, there is no need to know the exact | |
492 // number of tasks that ran. | |
493 num_tasks_before_detach_histogram_(Histogram::FactoryGet( | |
494 kNumTasksBeforeDetachHistogramPrefix + name_ + kPoolNameSuffix, | |
495 1, | |
496 1000, | |
497 50, | |
498 HistogramBase::kUmaTargetedHistogramFlag)), | |
499 // Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A SchedulerWorker is | |
500 // expected to run between zero and a few tens of tasks between waits. | |
501 // When it runs more than 100 tasks, there is no need to know the exact | |
502 // number of tasks that ran. | |
503 num_tasks_between_waits_histogram_(Histogram::FactoryGet( | |
504 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix, | |
505 1, | |
506 100, | |
507 50, | |
508 HistogramBase::kUmaTargetedHistogramFlag)), | |
509 task_tracker_(task_tracker), | |
510 delayed_task_manager_(delayed_task_manager) { | |
511 DCHECK(task_tracker_); | |
512 DCHECK(delayed_task_manager_); | |
513 } | |
514 | |
515 bool SchedulerWorkerPoolImpl::Initialize( | |
516 const SchedulerWorkerPoolParams& params, | |
517 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { | |
518 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | |
519 | |
520 DCHECK(workers_.empty()); | |
521 workers_.resize(params.max_threads()); | |
522 | |
523 // Create workers and push them to the idle stack in reverse order of index. | |
524 // This ensures that they are woken up in order of index and that the ALIVE | |
525 // worker is on top of the stack. | |
526 for (int index = params.max_threads() - 1; index >= 0; --index) { | |
527 const bool is_standby_lazy = | |
528 params.standby_thread_policy() == | |
529 SchedulerWorkerPoolParams::StandbyThreadPolicy::LAZY; | |
530 const SchedulerWorker::InitialState initial_state = | |
531 (index == 0 && !is_standby_lazy) | |
532 ? SchedulerWorker::InitialState::ALIVE | |
533 : SchedulerWorker::InitialState::DETACHED; | |
534 scoped_refptr<SchedulerWorker> worker = SchedulerWorker::Create( | |
535 params.priority_hint(), | |
536 MakeUnique<SchedulerWorkerDelegateImpl>( | |
537 this, re_enqueue_sequence_callback, index), | |
538 task_tracker_, initial_state, params.backward_compatibility()); | |
539 if (!worker) | |
540 break; | |
541 idle_workers_stack_.Push(worker.get()); | |
542 workers_[index] = std::move(worker); | |
543 } | |
544 | |
545 #if DCHECK_IS_ON() | |
546 workers_created_.Signal(); | |
547 #endif | |
548 | |
549 return !workers_.empty(); | |
550 } | |
551 | |
552 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { | 581 void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
553 SchedulerWorker* worker; | 582 SchedulerWorker* worker; |
554 { | 583 { |
555 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 584 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
556 worker = idle_workers_stack_.Pop(); | 585 worker = idle_workers_stack_.Pop(); |
557 } | 586 } |
558 if (worker) | 587 if (worker) |
559 worker->WakeUp(); | 588 worker->WakeUp(); |
560 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding | 589 // TODO(robliao): Honor StandbyThreadPolicy::ONE here and consider adding |
561 // hysteresis to the CanDetach check. See https://crbug.com/666041. | 590 // hysteresis to the CanDetach check. See https://crbug.com/666041. |
(...skipping 25 matching lines...) Expand all Loading... | |
587 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 616 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
588 idle_workers_stack_.Remove(worker); | 617 idle_workers_stack_.Remove(worker); |
589 } | 618 } |
590 | 619 |
591 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | 620 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
592 return !worker_detachment_disallowed_.IsSet(); | 621 return !worker_detachment_disallowed_.IsSet(); |
593 } | 622 } |
594 | 623 |
595 } // namespace internal | 624 } // namespace internal |
596 } // namespace base | 625 } // namespace base |
OLD | NEW |