Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(129)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2650383007: Move Task Scheduler Single Thread Task Runners to Dedicated Threads (Closed)
Patch Set: CR Feedback Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <utility> 10 #include <utility>
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 } // namespace 143 } // namespace
144 144
145 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. 145 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
146 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : 146 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner :
147 public SingleThreadTaskRunner { 147 public SingleThreadTaskRunner {
148 public: 148 public:
149 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post 149 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
150 // tasks so long as |worker_pool| and |worker| are alive. 150 // tasks so long as |worker_pool| and |worker| are alive.
151 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| 151 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
152 // and |worker|. 152 // and |worker|.
153 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, 153 SchedulerSingleThreadTaskRunner(
154 SchedulerWorkerPool* worker_pool, 154 const TaskTraits& traits,
155 SchedulerWorker* worker); 155 SchedulerWorkerPool* worker_pool,
156 SchedulerWorker* worker,
157 UnregisterSingleThreadWorkerPoolCallback
158 unregister_single_thread_worker_pool_callback);
156 159
157 // SingleThreadTaskRunner: 160 // SingleThreadTaskRunner:
158 bool PostDelayedTask(const tracked_objects::Location& from_here, 161 bool PostDelayedTask(const tracked_objects::Location& from_here,
159 const Closure& closure, 162 const Closure& closure,
160 TimeDelta delay) override { 163 TimeDelta delay) override {
161 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); 164 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
162 task->single_thread_task_runner_ref = this; 165 task->single_thread_task_runner_ref = this;
163 166
164 // Post the task to be executed by |worker_| as part of |sequence_|. 167 // Post the task to be executed by |worker_| as part of |sequence_|.
165 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, 168 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
277 // initialization failed. 280 // initialization failed.
278 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty()); 281 DCHECK(join_for_testing_returned_.IsSignaled() || workers_.empty());
279 } 282 }
280 283
281 // static 284 // static
282 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( 285 std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
283 const SchedulerWorkerPoolParams& params, 286 const SchedulerWorkerPoolParams& params,
284 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 287 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
285 TaskTracker* task_tracker, 288 TaskTracker* task_tracker,
286 DelayedTaskManager* delayed_task_manager) { 289 DelayedTaskManager* delayed_task_manager) {
287 auto worker_pool = WrapUnique( 290 return CreateInternal(params, task_tracker, delayed_task_manager,
288 new SchedulerWorkerPoolImpl(params, task_tracker, delayed_task_manager)); 291 re_enqueue_sequence_callback,
289 if (worker_pool->Initialize(params, re_enqueue_sequence_callback)) 292 UnregisterSingleThreadWorkerPoolCallback());
290 return worker_pool; 293 }
291 return nullptr; 294
295 // static
296 std::unique_ptr<SchedulerWorkerPoolImpl>
297 SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(
298 const SchedulerWorkerPoolParams& params,
299 UnregisterSingleThreadWorkerPoolCallback
300 unregister_single_thread_worker_pool_callback,
301 TaskTracker* task_tracker,
302 DelayedTaskManager* delayed_task_manager) {
303 DCHECK_EQ(params.max_threads(), 1U);
304 return CreateInternal(params, task_tracker, delayed_task_manager,
305 ReEnqueueSequenceCallback(),
306 unregister_single_thread_worker_pool_callback);
292 } 307 }
293 308
294 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( 309 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
295 const TaskTraits& traits) { 310 const TaskTraits& traits) {
296 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); 311 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this));
297 } 312 }
298 313
299 scoped_refptr<SequencedTaskRunner> 314 scoped_refptr<SequencedTaskRunner>
300 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( 315 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits(
301 const TaskTraits& traits) { 316 const TaskTraits& traits) {
302 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); 317 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
303 } 318 }
304 319
305 scoped_refptr<SingleThreadTaskRunner> 320 scoped_refptr<SingleThreadTaskRunner>
306 SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits( 321 SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits(
307 const TaskTraits& traits) { 322 const TaskTraits& traits) {
323 DCHECK(unregister_single_thread_worker_pool_callback_);
308 // TODO(fdoray): Find a way to take load into account when assigning a 324 // TODO(fdoray): Find a way to take load into account when assigning a
309 // SchedulerWorker to a SingleThreadTaskRunner. 325 // SchedulerWorker to a SingleThreadTaskRunner.
310 size_t worker_index; 326 size_t worker_index;
311 { 327 {
312 AutoSchedulerLock auto_lock(next_worker_index_lock_); 328 AutoSchedulerLock auto_lock(next_worker_index_lock_);
313 worker_index = next_worker_index_; 329 worker_index = next_worker_index_;
314 next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); 330 next_worker_index_ = (next_worker_index_ + 1) % workers_.size();
315 } 331 }
316 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( 332 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
317 traits, this, workers_[worker_index].get())); 333 traits, this, workers_[worker_index].get(),
334 unregister_single_thread_worker_pool_callback_));
318 } 335 }
319 336
320 void SchedulerWorkerPoolImpl::ReEnqueueSequence( 337 void SchedulerWorkerPoolImpl::ReEnqueueSequence(
321 scoped_refptr<Sequence> sequence, 338 scoped_refptr<Sequence> sequence,
322 const SequenceSortKey& sequence_sort_key) { 339 const SequenceSortKey& sequence_sort_key) {
323 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), 340 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
324 sequence_sort_key); 341 sequence_sort_key);
325 342
326 // The thread calling this method just ran a Task from |sequence| and will 343 // The thread calling this method just ran a Task from |sequence| and will
327 // soon try to get another Sequence from which to run a Task. If the thread 344 // soon try to get another Sequence from which to run a Task. If the thread
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
426 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { 443 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() {
427 size_t num_alive_workers = 0; 444 size_t num_alive_workers = 0;
428 for (const auto& worker : workers_) { 445 for (const auto& worker : workers_) {
429 if (worker->ThreadAliveForTesting()) 446 if (worker->ThreadAliveForTesting())
430 ++num_alive_workers; 447 ++num_alive_workers;
431 } 448 }
432 return num_alive_workers; 449 return num_alive_workers;
433 } 450 }
434 451
435 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: 452 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
436 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, 453 SchedulerSingleThreadTaskRunner(
437 SchedulerWorkerPool* worker_pool, 454 const TaskTraits& traits,
438 SchedulerWorker* worker) 455 SchedulerWorkerPool* worker_pool,
439 : traits_(traits), 456 SchedulerWorker* worker,
440 worker_pool_(worker_pool), 457 UnregisterSingleThreadWorkerPoolCallback
441 worker_(worker) { 458 unregister_single_thread_worker_pool_callback)
459 : traits_(traits), worker_pool_(worker_pool), worker_(worker) {
442 DCHECK(worker_pool_); 460 DCHECK(worker_pool_);
443 DCHECK(worker_); 461 DCHECK(worker_);
444 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> 462 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
445 RegisterSingleThreadTaskRunner(); 463 RegisterSingleThreadTaskRunner();
446 } 464 }
447 465
448 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: 466 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
449 ~SchedulerSingleThreadTaskRunner() { 467 ~SchedulerSingleThreadTaskRunner() {
450 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> 468 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
451 UnregisterSingleThreadTaskRunner(); 469 UnregisterSingleThreadTaskRunner();
470 auto worker_pool_impl = static_cast<SchedulerWorkerPoolImpl*>(worker_pool_);
471 worker_pool_impl->unregister_single_thread_worker_pool_callback_.Run(
472 worker_pool_impl);
452 } 473 }
453 474
454 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 475 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
455 SchedulerWorkerDelegateImpl( 476 SchedulerWorkerDelegateImpl(
456 SchedulerWorkerPoolImpl* outer, 477 SchedulerWorkerPoolImpl* outer,
457 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 478 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
458 const PriorityQueue* shared_priority_queue, 479 const PriorityQueue* shared_priority_queue,
459 int index) 480 int index)
460 : outer_(outer), 481 : outer_(outer),
461 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), 482 re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after
599 620
600 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( 621 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
601 SchedulerWorker* worker) { 622 SchedulerWorker* worker) {
602 // It's not an issue if |num_single_threaded_runners_| is incremented after 623 // It's not an issue if |num_single_threaded_runners_| is incremented after
603 // this because the newly created SingleThreadTaskRunner (from which no task 624 // this because the newly created SingleThreadTaskRunner (from which no task
604 // has run yet) will simply run all its tasks on the next physical thread 625 // has run yet) will simply run all its tasks on the next physical thread
605 // created by the worker. 626 // created by the worker.
606 const bool can_detach = 627 const bool can_detach =
607 !idle_start_time_.is_null() && 628 !idle_start_time_.is_null() &&
608 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && 629 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ &&
609 worker != outer_->PeekAtIdleWorkersStack() && 630 // In a pool which runs sequenced and parallel tasks, always keep
610 !subtle::NoBarrier_Load(&num_single_threaded_runners_) && 631 // on idle thread alive. In a pool which runs single-threaded tasks,
632 // keep the thread alive only when there are active TaskRunners.
633 (worker != outer_->PeekAtIdleWorkersStack() ||
634 (outer_->unregister_single_thread_worker_pool_callback_ &&
635 !subtle::NoBarrier_Load(&num_single_threaded_runners_))) &&
636 outer_->CanWorkerDetachForTesting();
637 !subtle::NoBarrier_Load(&num_single_threaded_runners_) &&
611 outer_->CanWorkerDetachForTesting(); 638 outer_->CanWorkerDetachForTesting();
612 return can_detach; 639 return can_detach;
613 } 640 }
614 641
615 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { 642 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() {
616 DCHECK(!did_detach_since_last_get_work_); 643 DCHECK(!did_detach_since_last_get_work_);
617 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); 644 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_);
618 num_tasks_since_last_detach_ = 0; 645 num_tasks_since_last_detach_ = 0;
619 did_detach_since_last_get_work_ = true; 646 did_detach_since_last_get_work_ = true;
620 last_detach_time_ = TimeTicks::Now(); 647 last_detach_time_ = TimeTicks::Now();
621 } 648 }
622 649
650 // static
651 std::unique_ptr<SchedulerWorkerPoolImpl>
652 SchedulerWorkerPoolImpl::CreateInternal(
653 const SchedulerWorkerPoolParams& params,
654 TaskTracker* task_tracker,
655 DelayedTaskManager* delayed_task_manager,
656 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
657 UnregisterSingleThreadWorkerPoolCallback
658 unregister_single_thread_worker_callback) {
659 auto worker_pool = WrapUnique(
660 new SchedulerWorkerPoolImpl(params, task_tracker, delayed_task_manager,
661 unregister_single_thread_worker_callback));
662 if (worker_pool->Initialize(params, re_enqueue_sequence_callback))
663 return worker_pool;
664 return nullptr;
665 }
666
623 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( 667 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
624 const SchedulerWorkerPoolParams& params, 668 const SchedulerWorkerPoolParams& params,
625 TaskTracker* task_tracker, 669 TaskTracker* task_tracker,
626 DelayedTaskManager* delayed_task_manager) 670 DelayedTaskManager* delayed_task_manager,
671 UnregisterSingleThreadWorkerPoolCallback
672 unregister_single_thread_worker_pool_callback)
627 : name_(params.name()), 673 : name_(params.name()),
628 suggested_reclaim_time_(params.suggested_reclaim_time()), 674 suggested_reclaim_time_(params.suggested_reclaim_time()),
629 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), 675 idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
630 idle_workers_stack_cv_for_testing_( 676 idle_workers_stack_cv_for_testing_(
631 idle_workers_stack_lock_.CreateConditionVariable()), 677 idle_workers_stack_lock_.CreateConditionVariable()),
632 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, 678 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
633 WaitableEvent::InitialState::NOT_SIGNALED), 679 WaitableEvent::InitialState::NOT_SIGNALED),
634 #if DCHECK_IS_ON() 680 #if DCHECK_IS_ON()
635 workers_created_(WaitableEvent::ResetPolicy::MANUAL, 681 workers_created_(WaitableEvent::ResetPolicy::MANUAL,
636 WaitableEvent::InitialState::NOT_SIGNALED), 682 WaitableEvent::InitialState::NOT_SIGNALED),
(...skipping 18 matching lines...) Expand all
655 // expected to run between zero and a few tens of tasks between waits. 701 // expected to run between zero and a few tens of tasks between waits.
656 // When it runs more than 100 tasks, there is no need to know the exact 702 // When it runs more than 100 tasks, there is no need to know the exact
657 // number of tasks that ran. 703 // number of tasks that ran.
658 num_tasks_between_waits_histogram_(Histogram::FactoryGet( 704 num_tasks_between_waits_histogram_(Histogram::FactoryGet(
659 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix, 705 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix,
660 1, 706 1,
661 100, 707 100,
662 50, 708 50,
663 HistogramBase::kUmaTargetedHistogramFlag)), 709 HistogramBase::kUmaTargetedHistogramFlag)),
664 task_tracker_(task_tracker), 710 task_tracker_(task_tracker),
665 delayed_task_manager_(delayed_task_manager) { 711 delayed_task_manager_(delayed_task_manager),
712 unregister_single_thread_worker_pool_callback_(
713 unregister_single_thread_worker_pool_callback) {
666 DCHECK(task_tracker_); 714 DCHECK(task_tracker_);
667 DCHECK(delayed_task_manager_); 715 DCHECK(delayed_task_manager_);
668 } 716 }
669 717
670 bool SchedulerWorkerPoolImpl::Initialize( 718 bool SchedulerWorkerPoolImpl::Initialize(
671 const SchedulerWorkerPoolParams& params, 719 const SchedulerWorkerPoolParams& params,
672 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { 720 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
673 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 721 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
674 722
675 DCHECK(workers_.empty()); 723 DCHECK(workers_.empty());
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
748 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 796 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
749 idle_workers_stack_.Remove(worker); 797 idle_workers_stack_.Remove(worker);
750 } 798 }
751 799
752 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { 800 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() {
753 return !worker_detachment_disallowed_.IsSet(); 801 return !worker_detachment_disallowed_.IsSet();
754 } 802 }
755 803
756 } // namespace internal 804 } // namespace internal
757 } // namespace base 805 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/scheduler_worker_pool_impl.h ('k') | base/task_scheduler/scheduler_worker_pool_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698