| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <algorithm> | 9 #include <algorithm> |
| 10 #include <utility> | 10 #include <utility> |
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 143 } // namespace | 143 } // namespace |
| 144 | 144 |
| 145 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. | 145 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
| 146 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : | 146 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
| 147 public SingleThreadTaskRunner { | 147 public SingleThreadTaskRunner { |
| 148 public: | 148 public: |
| 149 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post | 149 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| 150 // tasks so long as |worker_pool| and |worker| are alive. | 150 // tasks so long as |worker_pool| and |worker| are alive. |
| 151 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| | 151 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
| 152 // and |worker|. | 152 // and |worker|. |
| 153 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | 153 SchedulerSingleThreadTaskRunner( |
| 154 SchedulerWorkerPool* worker_pool, | 154 const TaskTraits& traits, |
| 155 SchedulerWorker* worker); | 155 SchedulerWorkerPool* worker_pool, |
| 156 SchedulerWorker* worker, |
| 157 UnregisterSingleThreadWorkerPoolCallback |
| 158 unregister_single_thread_worker_pool_callback); |
| 156 | 159 |
| 157 // SingleThreadTaskRunner: | 160 // SingleThreadTaskRunner: |
| 158 bool PostDelayedTask(const tracked_objects::Location& from_here, | 161 bool PostDelayedTask(const tracked_objects::Location& from_here, |
| 159 const Closure& closure, | 162 const Closure& closure, |
| 160 TimeDelta delay) override { | 163 TimeDelta delay) override { |
| 161 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); | 164 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
| 162 task->single_thread_task_runner_ref = this; | 165 task->single_thread_task_runner_ref = this; |
| 163 | 166 |
| 164 // Post the task to be executed by |worker_| as part of |sequence_|. | 167 // Post the task to be executed by |worker_| as part of |sequence_|. |
| 165 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, | 168 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 277 // initialization failed. | 280 // initialization failed. |
| 278 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); | 281 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); |
| 279 } | 282 } |
| 280 | 283 |
| 281 // static | 284 // static |
| 282 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( | 285 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
| 283 const SchedulerWorkerPoolParams& params, | 286 const SchedulerWorkerPoolParams& params, |
| 284 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 287 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 285 TaskTracker* task_tracker, | 288 TaskTracker* task_tracker, |
| 286 DelayedTaskManager* delayed_task_manager) { | 289 DelayedTaskManager* delayed_task_manager) { |
| 287 auto worker_pool = WrapUnique( | 290 return CreateInternal(params, task_tracker, delayed_task_manager, |
| 288 new SchedulerWorkerPoolImpl(params, task_tracker, delayed_task_manager)); | 291 re_enqueue_sequence_callback, |
| 289 if (worker_pool->Initialize(params, re_enqueue_sequence_callback)) | 292 UnregisterSingleThreadWorkerPoolCallback()); |
| 290 return worker_pool; | 293 } |
| 291 return nullptr; | 294 |
| 295 // static |
| 296 std::unique_ptr<SchedulerWorkerPoolImpl> |
| 297 SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool( |
| 298 const SchedulerWorkerPoolParams& params, |
| 299 UnregisterSingleThreadWorkerPoolCallback |
| 300 unregister_single_thread_worker_pool_callback, |
| 301 TaskTracker* task_tracker, |
| 302 DelayedTaskManager* delayed_task_manager) { |
| 303 DCHECK_EQ(params.max_threads(), 1U); |
| 304 return CreateInternal(params, task_tracker, delayed_task_manager, |
| 305 ReEnqueueSequenceCallback(), |
| 306 unregister_single_thread_worker_pool_callback); |
| 292 } | 307 } |
| 293 | 308 |
| 294 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( | 309 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
| 295 const TaskTraits& traits) { | 310 const TaskTraits& traits) { |
| 296 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); | 311 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); |
| 297 } | 312 } |
| 298 | 313 |
| 299 scoped_refptr<SequencedTaskRunner> | 314 scoped_refptr<SequencedTaskRunner> |
| 300 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( | 315 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( |
| 301 const TaskTraits& traits) { | 316 const TaskTraits& traits) { |
| 302 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); | 317 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); |
| 303 } | 318 } |
| 304 | 319 |
| 305 scoped_refptr<SingleThreadTaskRunner> | 320 scoped_refptr<SingleThreadTaskRunner> |
| 306 SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits( | 321 SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits( |
| 307 const TaskTraits& traits) { | 322 const TaskTraits& traits) { |
| 323 DCHECK(unregister_single_thread_worker_pool_callback_); |
| 308 // TODO(fdoray): Find a way to take load into account when assigning a | 324 // TODO(fdoray): Find a way to take load into account when assigning a |
| 309 // SchedulerWorker to a SingleThreadTaskRunner. | 325 // SchedulerWorker to a SingleThreadTaskRunner. |
| 310 size_t worker_index; | 326 size_t worker_index; |
| 311 { | 327 { |
| 312 AutoSchedulerLock auto_lock(next_worker_index_lock_); | 328 AutoSchedulerLock auto_lock(next_worker_index_lock_); |
| 313 worker_index = next_worker_index_; | 329 worker_index = next_worker_index_; |
| 314 next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); | 330 next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); |
| 315 } | 331 } |
| 316 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( | 332 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( |
| 317 traits, this, workers_[worker_index].get())); | 333 traits, this, workers_[worker_index].get(), |
| 334 unregister_single_thread_worker_pool_callback_)); |
| 318 } | 335 } |
| 319 | 336 |
| 320 void SchedulerWorkerPoolImpl::ReEnqueueSequence( | 337 void SchedulerWorkerPoolImpl::ReEnqueueSequence( |
| 321 scoped_refptr<Sequence> sequence, | 338 scoped_refptr<Sequence> sequence, |
| 322 const SequenceSortKey& sequence_sort_key) { | 339 const SequenceSortKey& sequence_sort_key) { |
| 323 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), | 340 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), |
| 324 sequence_sort_key); | 341 sequence_sort_key); |
| 325 | 342 |
| 326 // The thread calling this method just ran a Task from |sequence| and will | 343 // The thread calling this method just ran a Task from |sequence| and will |
| 327 // soon try to get another Sequence from which to run a Task. If the thread | 344 // soon try to get another Sequence from which to run a Task. If the thread |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 426 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { | 443 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { |
| 427 size_t num_alive_workers = 0; | 444 size_t num_alive_workers = 0; |
| 428 for (const auto& worker : workers_) { | 445 for (const auto& worker : workers_) { |
| 429 if (worker->ThreadAliveForTesting()) | 446 if (worker->ThreadAliveForTesting()) |
| 430 ++num_alive_workers; | 447 ++num_alive_workers; |
| 431 } | 448 } |
| 432 return num_alive_workers; | 449 return num_alive_workers; |
| 433 } | 450 } |
| 434 | 451 |
| 435 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: | 452 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| 436 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | 453 SchedulerSingleThreadTaskRunner( |
| 437 SchedulerWorkerPool* worker_pool, | 454 const TaskTraits& traits, |
| 438 SchedulerWorker* worker) | 455 SchedulerWorkerPool* worker_pool, |
| 439 : traits_(traits), | 456 SchedulerWorker* worker, |
| 440 worker_pool_(worker_pool), | 457 UnregisterSingleThreadWorkerPoolCallback |
| 441 worker_(worker) { | 458 unregister_single_thread_worker_pool_callback) |
| 459 : traits_(traits), worker_pool_(worker_pool), worker_(worker) { |
| 442 DCHECK(worker_pool_); | 460 DCHECK(worker_pool_); |
| 443 DCHECK(worker_); | 461 DCHECK(worker_); |
| 444 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> | 462 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| 445 RegisterSingleThreadTaskRunner(); | 463 RegisterSingleThreadTaskRunner(); |
| 446 } | 464 } |
| 447 | 465 |
| 448 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: | 466 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| 449 ~SchedulerSingleThreadTaskRunner() { | 467 ~SchedulerSingleThreadTaskRunner() { |
| 450 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> | 468 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| 451 UnregisterSingleThreadTaskRunner(); | 469 UnregisterSingleThreadTaskRunner(); |
| 470 auto worker_pool_impl = static_cast<SchedulerWorkerPoolImpl*>(worker_pool_); |
| 471 worker_pool_impl->unregister_single_thread_worker_pool_callback_.Run( |
| 472 worker_pool_impl); |
| 452 } | 473 } |
| 453 | 474 |
| 454 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: | 475 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| 455 SchedulerWorkerDelegateImpl( | 476 SchedulerWorkerDelegateImpl( |
| 456 SchedulerWorkerPoolImpl* outer, | 477 SchedulerWorkerPoolImpl* outer, |
| 457 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, | 478 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 458 const PriorityQueue* shared_priority_queue, | 479 const PriorityQueue* shared_priority_queue, |
| 459 int index) | 480 int index) |
| 460 : outer_(outer), | 481 : outer_(outer), |
| 461 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), | 482 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
| (...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 599 | 620 |
| 600 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( | 621 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| 601 SchedulerWorker* worker) { | 622 SchedulerWorker* worker) { |
| 602 // It's not an issue if |num_single_threaded_runners_| is incremented after | 623 // It's not an issue if |num_single_threaded_runners_| is incremented after |
| 603 // this because the newly created SingleThreadTaskRunner (from which no task | 624 // this because the newly created SingleThreadTaskRunner (from which no task |
| 604 // has run yet) will simply run all its tasks on the next physical thread | 625 // has run yet) will simply run all its tasks on the next physical thread |
| 605 // created by the worker. | 626 // created by the worker. |
| 606 const bool can_detach = | 627 const bool can_detach = |
| 607 !idle_start_time_.is_null() && | 628 !idle_start_time_.is_null() && |
| 608 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && | 629 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && |
| 609 worker != outer_->PeekAtIdleWorkersStack() && | 630 // In a pool which runs sequenced and parallel tasks, always keep |
| 610 !subtle::NoBarrier_Load(&num_single_threaded_runners_) && | 631 // on idle thread alive. In a pool which runs single-threaded tasks, |
| 632 // keep the thread alive only when there are active TaskRunners. |
| 633 (worker != outer_->PeekAtIdleWorkersStack() || |
| 634 (outer_->unregister_single_thread_worker_pool_callback_ && |
| 635 !subtle::NoBarrier_Load(&num_single_threaded_runners_))) && |
| 636 outer_->CanWorkerDetachForTesting(); |
| 637 !subtle::NoBarrier_Load(&num_single_threaded_runners_) && |
| 611 outer_->CanWorkerDetachForTesting(); | 638 outer_->CanWorkerDetachForTesting(); |
| 612 return can_detach; | 639 return can_detach; |
| 613 } | 640 } |
| 614 | 641 |
| 615 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { | 642 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { |
| 616 DCHECK(!did_detach_since_last_get_work_); | 643 DCHECK(!did_detach_since_last_get_work_); |
| 617 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); | 644 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); |
| 618 num_tasks_since_last_detach_ = 0; | 645 num_tasks_since_last_detach_ = 0; |
| 619 did_detach_since_last_get_work_ = true; | 646 did_detach_since_last_get_work_ = true; |
| 620 last_detach_time_ = TimeTicks::Now(); | 647 last_detach_time_ = TimeTicks::Now(); |
| 621 } | 648 } |
| 622 | 649 |
| 650 // static |
| 651 std::unique_ptr<SchedulerWorkerPoolImpl> |
| 652 SchedulerWorkerPoolImpl::CreateInternal( |
| 653 const SchedulerWorkerPoolParams& params, |
| 654 TaskTracker* task_tracker, |
| 655 DelayedTaskManager* delayed_task_manager, |
| 656 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| 657 UnregisterSingleThreadWorkerPoolCallback |
| 658 unregister_single_thread_worker_callback) { |
| 659 auto worker_pool = WrapUnique( |
| 660 new SchedulerWorkerPoolImpl(params, task_tracker, delayed_task_manager, |
| 661 unregister_single_thread_worker_callback)); |
| 662 if (worker_pool->Initialize(params, re_enqueue_sequence_callback)) |
| 663 return worker_pool; |
| 664 return nullptr; |
| 665 } |
| 666 |
| 623 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( | 667 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| 624 const SchedulerWorkerPoolParams& params, | 668 const SchedulerWorkerPoolParams& params, |
| 625 TaskTracker* task_tracker, | 669 TaskTracker* task_tracker, |
| 626 DelayedTaskManager* delayed_task_manager) | 670 DelayedTaskManager* delayed_task_manager, |
| 671 UnregisterSingleThreadWorkerPoolCallback |
| 672 unregister_single_thread_worker_pool_callback) |
| 627 : name_(params.name()), | 673 : name_(params.name()), |
| 628 suggested_reclaim_time_(params.suggested_reclaim_time()), | 674 suggested_reclaim_time_(params.suggested_reclaim_time()), |
| 629 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), | 675 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
| 630 idle_workers_stack_cv_for_testing_( | 676 idle_workers_stack_cv_for_testing_( |
| 631 idle_workers_stack_lock_.CreateConditionVariable()), | 677 idle_workers_stack_lock_.CreateConditionVariable()), |
| 632 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, | 678 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
| 633 WaitableEvent::InitialState::NOT_SIGNALED), | 679 WaitableEvent::InitialState::NOT_SIGNALED), |
| 634 #if DCHECK_IS_ON() | 680 #if DCHECK_IS_ON() |
| 635 workers_created_(WaitableEvent::ResetPolicy::MANUAL, | 681 workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
| 636 WaitableEvent::InitialState::NOT_SIGNALED), | 682 WaitableEvent::InitialState::NOT_SIGNALED), |
| (...skipping 18 matching lines...) Expand all Loading... |
| 655 // expected to run between zero and a few tens of tasks between waits. | 701 // expected to run between zero and a few tens of tasks between waits. |
| 656 // When it runs more than 100 tasks, there is no need to know the exact | 702 // When it runs more than 100 tasks, there is no need to know the exact |
| 657 // number of tasks that ran. | 703 // number of tasks that ran. |
| 658 num_tasks_between_waits_histogram_(Histogram::FactoryGet( | 704 num_tasks_between_waits_histogram_(Histogram::FactoryGet( |
| 659 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix, | 705 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix, |
| 660 1, | 706 1, |
| 661 100, | 707 100, |
| 662 50, | 708 50, |
| 663 HistogramBase::kUmaTargetedHistogramFlag)), | 709 HistogramBase::kUmaTargetedHistogramFlag)), |
| 664 task_tracker_(task_tracker), | 710 task_tracker_(task_tracker), |
| 665 delayed_task_manager_(delayed_task_manager) { | 711 delayed_task_manager_(delayed_task_manager), |
| 712 unregister_single_thread_worker_pool_callback_( |
| 713 unregister_single_thread_worker_pool_callback) { |
| 666 DCHECK(task_tracker_); | 714 DCHECK(task_tracker_); |
| 667 DCHECK(delayed_task_manager_); | 715 DCHECK(delayed_task_manager_); |
| 668 } | 716 } |
| 669 | 717 |
| 670 bool SchedulerWorkerPoolImpl::Initialize( | 718 bool SchedulerWorkerPoolImpl::Initialize( |
| 671 const SchedulerWorkerPoolParams& params, | 719 const SchedulerWorkerPoolParams& params, |
| 672 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { | 720 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
| 673 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 721 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 674 | 722 |
| 675 DCHECK(workers_.empty()); | 723 DCHECK(workers_.empty()); |
| (...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 748 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); | 796 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| 749 idle_workers_stack_.Remove(worker); | 797 idle_workers_stack_.Remove(worker); |
| 750 } | 798 } |
| 751 | 799 |
| 752 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { | 800 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
| 753 return !worker_detachment_disallowed_.IsSet(); | 801 return !worker_detachment_disallowed_.IsSet(); |
| 754 } | 802 } |
| 755 | 803 |
| 756 } // namespace internal | 804 } // namespace internal |
| 757 } // namespace base | 805 } // namespace base |
| OLD | NEW |