Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(26)

Side by Side Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2650383007: Move Task Scheduler Single Thread Task Runners to Dedicated Threads (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h" 5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <utility> 10 #include <utility>
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 } // namespace 143 } // namespace
144 144
145 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. 145 // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
146 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : 146 class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner :
147 public SingleThreadTaskRunner { 147 public SingleThreadTaskRunner {
148 public: 148 public:
149 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post 149 // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
150 // tasks so long as |worker_pool| and |worker| are alive. 150 // tasks so long as |worker_pool| and |worker| are alive.
151 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| 151 // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
152 // and |worker|. 152 // and |worker|.
153 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, 153 SchedulerSingleThreadTaskRunner(
154 SchedulerWorkerPool* worker_pool, 154 const TaskTraits& traits,
155 SchedulerWorker* worker); 155 SchedulerWorkerPool* worker_pool,
156 SchedulerWorker* worker,
157 UnregisterSingleThreadWorkerPoolCallback
158 unregister_single_thread_worker_pool_callback);
156 159
157 // SingleThreadTaskRunner: 160 // SingleThreadTaskRunner:
158 bool PostDelayedTask(const tracked_objects::Location& from_here, 161 bool PostDelayedTask(const tracked_objects::Location& from_here,
159 const Closure& closure, 162 const Closure& closure,
160 TimeDelta delay) override { 163 TimeDelta delay) override {
161 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); 164 std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
162 task->single_thread_task_runner_ref = this; 165 task->single_thread_task_runner_ref = this;
163 166
164 // Post the task to be executed by |worker_| as part of |sequence_|. 167 // Post the task to be executed by |worker_| as part of |sequence_|.
165 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, 168 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
(...skipping 16 matching lines...) Expand all
182 185
183 private: 186 private:
184 ~SchedulerSingleThreadTaskRunner() override; 187 ~SchedulerSingleThreadTaskRunner() override;
185 188
186 // Sequence for all Tasks posted through this TaskRunner. 189 // Sequence for all Tasks posted through this TaskRunner.
187 const scoped_refptr<Sequence> sequence_ = new Sequence; 190 const scoped_refptr<Sequence> sequence_ = new Sequence;
188 191
189 const TaskTraits traits_; 192 const TaskTraits traits_;
190 SchedulerWorkerPool* const worker_pool_; 193 SchedulerWorkerPool* const worker_pool_;
191 SchedulerWorker* const worker_; 194 SchedulerWorker* const worker_;
195 const UnregisterSingleThreadWorkerPoolCallback
fdoray 2017/01/27 16:47:35 Not needed. SchedulerSingleThreadTaskRunner can ac
robliao 2017/01/27 21:25:41 Indeed! Done.
196 unregister_single_thread_worker_pool_callback_;
192 197
193 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); 198 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
194 }; 199 };
195 200
196 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl 201 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
197 : public SchedulerWorker::Delegate { 202 : public SchedulerWorker::Delegate {
198 public: 203 public:
199 // |outer| owns the worker for which this delegate is constructed. 204 // |outer| owns the worker for which this delegate is constructed.
200 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is 205 // |re_enqueue_sequence_callback| is invoked when ReEnqueueSequence() is
201 // called with a non-single-threaded Sequence. |shared_priority_queue| is a 206 // called with a non-single-threaded Sequence. |shared_priority_queue| is a
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after
284 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 289 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
285 TaskTracker* task_tracker, 290 TaskTracker* task_tracker,
286 DelayedTaskManager* delayed_task_manager) { 291 DelayedTaskManager* delayed_task_manager) {
287 auto worker_pool = WrapUnique( 292 auto worker_pool = WrapUnique(
288 new SchedulerWorkerPoolImpl(params, task_tracker, delayed_task_manager)); 293 new SchedulerWorkerPoolImpl(params, task_tracker, delayed_task_manager));
289 if (worker_pool->Initialize(params, re_enqueue_sequence_callback)) 294 if (worker_pool->Initialize(params, re_enqueue_sequence_callback))
290 return worker_pool; 295 return worker_pool;
291 return nullptr; 296 return nullptr;
292 } 297 }
293 298
299 // static
300 std::unique_ptr<SchedulerWorkerPoolImpl>
301 SchedulerWorkerPoolImpl::CreateSingleThreadWorkerPool(
fdoray 2017/01/27 16:47:35 Forward Create() and CreateSingleThreadWorkerPool(
robliao 2017/01/27 21:25:41 I wasn't convinced that it would make things clear
302 const SchedulerWorkerPoolParams& params,
303 UnregisterSingleThreadWorkerPoolCallback
304 unregister_single_thread_worker_pool_callback,
305 TaskTracker* task_tracker,
306 DelayedTaskManager* delayed_task_manager) {
307 DCHECK_EQ(params.max_threads(), 1U);
308 auto worker_pool = WrapUnique(new SchedulerWorkerPoolImpl(
309 params, task_tracker, delayed_task_manager,
310 unregister_single_thread_worker_pool_callback));
311 if (worker_pool->Initialize(params, ReEnqueueSequenceCallback())) {
fdoray 2017/01/27 16:47:35 no braces
robliao 2017/01/27 21:25:41 Done.
312 return worker_pool;
313 }
314 return nullptr;
315 }
316
294 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( 317 scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
295 const TaskTraits& traits) { 318 const TaskTraits& traits) {
296 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this)); 319 return make_scoped_refptr(new SchedulerParallelTaskRunner(traits, this));
297 } 320 }
298 321
299 scoped_refptr<SequencedTaskRunner> 322 scoped_refptr<SequencedTaskRunner>
300 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits( 323 SchedulerWorkerPoolImpl::CreateSequencedTaskRunnerWithTraits(
301 const TaskTraits& traits) { 324 const TaskTraits& traits) {
302 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this)); 325 return make_scoped_refptr(new SchedulerSequencedTaskRunner(traits, this));
303 } 326 }
304 327
305 scoped_refptr<SingleThreadTaskRunner> 328 scoped_refptr<SingleThreadTaskRunner>
306 SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits( 329 SchedulerWorkerPoolImpl::CreateSingleThreadTaskRunnerWithTraits(
307 const TaskTraits& traits) { 330 const TaskTraits& traits) {
331 DCHECK(!unregister_single_thread_worker_pool_callback_.is_null());
fdoray 2017/01/27 16:47:35 Tip: You can write DCHECK(unregister_single_thread
robliao 2017/01/27 21:25:41 Nice! Done.
308 // TODO(fdoray): Find a way to take load into account when assigning a 332 // TODO(fdoray): Find a way to take load into account when assigning a
309 // SchedulerWorker to a SingleThreadTaskRunner. 333 // SchedulerWorker to a SingleThreadTaskRunner.
310 size_t worker_index; 334 size_t worker_index;
311 { 335 {
312 AutoSchedulerLock auto_lock(next_worker_index_lock_); 336 AutoSchedulerLock auto_lock(next_worker_index_lock_);
313 worker_index = next_worker_index_; 337 worker_index = next_worker_index_;
314 next_worker_index_ = (next_worker_index_ + 1) % workers_.size(); 338 next_worker_index_ = (next_worker_index_ + 1) % workers_.size();
315 } 339 }
316 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner( 340 return make_scoped_refptr(new SchedulerSingleThreadTaskRunner(
317 traits, this, workers_[worker_index].get())); 341 traits, this, workers_[worker_index].get(),
342 unregister_single_thread_worker_pool_callback_));
318 } 343 }
319 344
320 void SchedulerWorkerPoolImpl::ReEnqueueSequence( 345 void SchedulerWorkerPoolImpl::ReEnqueueSequence(
321 scoped_refptr<Sequence> sequence, 346 scoped_refptr<Sequence> sequence,
322 const SequenceSortKey& sequence_sort_key) { 347 const SequenceSortKey& sequence_sort_key) {
323 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), 348 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
324 sequence_sort_key); 349 sequence_sort_key);
325 350
326 // The thread calling this method just ran a Task from |sequence| and will 351 // The thread calling this method just ran a Task from |sequence| and will
327 // soon try to get another Sequence from which to run a Task. If the thread 352 // soon try to get another Sequence from which to run a Task. If the thread
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
426 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() { 451 size_t SchedulerWorkerPoolImpl::NumberOfAliveWorkersForTesting() {
427 size_t num_alive_workers = 0; 452 size_t num_alive_workers = 0;
428 for (const auto& worker : workers_) { 453 for (const auto& worker : workers_) {
429 if (worker->ThreadAliveForTesting()) 454 if (worker->ThreadAliveForTesting())
430 ++num_alive_workers; 455 ++num_alive_workers;
431 } 456 }
432 return num_alive_workers; 457 return num_alive_workers;
433 } 458 }
434 459
435 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: 460 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
436 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, 461 SchedulerSingleThreadTaskRunner(
437 SchedulerWorkerPool* worker_pool, 462 const TaskTraits& traits,
438 SchedulerWorker* worker) 463 SchedulerWorkerPool* worker_pool,
464 SchedulerWorker* worker,
465 UnregisterSingleThreadWorkerPoolCallback
466 unregister_single_thread_worker_pool_callback)
439 : traits_(traits), 467 : traits_(traits),
440 worker_pool_(worker_pool), 468 worker_pool_(worker_pool),
441 worker_(worker) { 469 worker_(worker),
470 unregister_single_thread_worker_pool_callback_(
471 unregister_single_thread_worker_pool_callback) {
442 DCHECK(worker_pool_); 472 DCHECK(worker_pool_);
443 DCHECK(worker_); 473 DCHECK(worker_);
474 DCHECK(!unregister_single_thread_worker_pool_callback.is_null());
444 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> 475 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
445 RegisterSingleThreadTaskRunner(); 476 RegisterSingleThreadTaskRunner();
446 } 477 }
447 478
448 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: 479 SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
449 ~SchedulerSingleThreadTaskRunner() { 480 ~SchedulerSingleThreadTaskRunner() {
450 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> 481 static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
451 UnregisterSingleThreadTaskRunner(); 482 UnregisterSingleThreadTaskRunner();
483 unregister_single_thread_worker_pool_callback_.Run(
484 static_cast<SchedulerWorkerPoolImpl*>(worker_pool_));
452 } 485 }
453 486
454 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: 487 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
455 SchedulerWorkerDelegateImpl( 488 SchedulerWorkerDelegateImpl(
456 SchedulerWorkerPoolImpl* outer, 489 SchedulerWorkerPoolImpl* outer,
457 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, 490 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
458 const PriorityQueue* shared_priority_queue, 491 const PriorityQueue* shared_priority_queue,
459 int index) 492 int index)
460 : outer_(outer), 493 : outer_(outer),
461 re_enqueue_sequence_callback_(re_enqueue_sequence_callback), 494 re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after
599 632
600 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( 633 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
601 SchedulerWorker* worker) { 634 SchedulerWorker* worker) {
602 // It's not an issue if |num_single_threaded_runners_| is incremented after 635 // It's not an issue if |num_single_threaded_runners_| is incremented after
603 // this because the newly created SingleThreadTaskRunner (from which no task 636 // this because the newly created SingleThreadTaskRunner (from which no task
604 // has run yet) will simply run all its tasks on the next physical thread 637 // has run yet) will simply run all its tasks on the next physical thread
605 // created by the worker. 638 // created by the worker.
606 const bool can_detach = 639 const bool can_detach =
607 !idle_start_time_.is_null() && 640 !idle_start_time_.is_null() &&
608 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && 641 (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ &&
609 worker != outer_->PeekAtIdleWorkersStack() && 642 (worker != outer_->PeekAtIdleWorkersStack() ||
643 outer_->workers_.size() == 1) && // Single Thread Worker Pools
fdoray 2017/01/27 16:47:35 Should be ... // In a pool which runs sequenced a
robliao 2017/01/27 21:25:41 Good point. Done.
610 !subtle::NoBarrier_Load(&num_single_threaded_runners_) && 644 !subtle::NoBarrier_Load(&num_single_threaded_runners_) &&
611 outer_->CanWorkerDetachForTesting(); 645 outer_->CanWorkerDetachForTesting();
612 return can_detach; 646 return can_detach;
613 } 647 }
614 648
615 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() { 649 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnDetach() {
616 DCHECK(!did_detach_since_last_get_work_); 650 DCHECK(!did_detach_since_last_get_work_);
617 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); 651 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_);
618 num_tasks_since_last_detach_ = 0; 652 num_tasks_since_last_detach_ = 0;
619 did_detach_since_last_get_work_ = true; 653 did_detach_since_last_get_work_ = true;
620 last_detach_time_ = TimeTicks::Now(); 654 last_detach_time_ = TimeTicks::Now();
621 } 655 }
622 656
623 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( 657 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
624 const SchedulerWorkerPoolParams& params, 658 const SchedulerWorkerPoolParams& params,
625 TaskTracker* task_tracker, 659 TaskTracker* task_tracker,
626 DelayedTaskManager* delayed_task_manager) 660 DelayedTaskManager* delayed_task_manager)
661 : SchedulerWorkerPoolImpl(params,
662 task_tracker,
663 delayed_task_manager,
664 UnregisterSingleThreadWorkerPoolCallback()) {}
665
666 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
667 const SchedulerWorkerPoolParams& params,
668 TaskTracker* task_tracker,
669 DelayedTaskManager* delayed_task_manager,
670 UnregisterSingleThreadWorkerPoolCallback
671 unregister_single_thread_worker_pool_callback)
627 : name_(params.name()), 672 : name_(params.name()),
628 suggested_reclaim_time_(params.suggested_reclaim_time()), 673 suggested_reclaim_time_(params.suggested_reclaim_time()),
629 idle_workers_stack_lock_(shared_priority_queue_.container_lock()), 674 idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
630 idle_workers_stack_cv_for_testing_( 675 idle_workers_stack_cv_for_testing_(
631 idle_workers_stack_lock_.CreateConditionVariable()), 676 idle_workers_stack_lock_.CreateConditionVariable()),
632 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, 677 join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
633 WaitableEvent::InitialState::NOT_SIGNALED), 678 WaitableEvent::InitialState::NOT_SIGNALED),
634 #if DCHECK_IS_ON() 679 #if DCHECK_IS_ON()
635 workers_created_(WaitableEvent::ResetPolicy::MANUAL, 680 workers_created_(WaitableEvent::ResetPolicy::MANUAL,
636 WaitableEvent::InitialState::NOT_SIGNALED), 681 WaitableEvent::InitialState::NOT_SIGNALED),
(...skipping 18 matching lines...) Expand all
655 // expected to run between zero and a few tens of tasks between waits. 700 // expected to run between zero and a few tens of tasks between waits.
656 // When it runs more than 100 tasks, there is no need to know the exact 701 // When it runs more than 100 tasks, there is no need to know the exact
657 // number of tasks that ran. 702 // number of tasks that ran.
658 num_tasks_between_waits_histogram_(Histogram::FactoryGet( 703 num_tasks_between_waits_histogram_(Histogram::FactoryGet(
659 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix, 704 kNumTasksBetweenWaitsHistogramPrefix + name_ + kPoolNameSuffix,
660 1, 705 1,
661 100, 706 100,
662 50, 707 50,
663 HistogramBase::kUmaTargetedHistogramFlag)), 708 HistogramBase::kUmaTargetedHistogramFlag)),
664 task_tracker_(task_tracker), 709 task_tracker_(task_tracker),
665 delayed_task_manager_(delayed_task_manager) { 710 delayed_task_manager_(delayed_task_manager),
711 unregister_single_thread_worker_pool_callback_(
712 unregister_single_thread_worker_pool_callback) {
666 DCHECK(task_tracker_); 713 DCHECK(task_tracker_);
667 DCHECK(delayed_task_manager_); 714 DCHECK(delayed_task_manager_);
668 } 715 }
669 716
670 bool SchedulerWorkerPoolImpl::Initialize( 717 bool SchedulerWorkerPoolImpl::Initialize(
671 const SchedulerWorkerPoolParams& params, 718 const SchedulerWorkerPoolParams& params,
672 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { 719 const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
673 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 720 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
674 721
675 DCHECK(workers_.empty()); 722 DCHECK(workers_.empty());
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
748 AutoSchedulerLock auto_lock(idle_workers_stack_lock_); 795 AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
749 idle_workers_stack_.Remove(worker); 796 idle_workers_stack_.Remove(worker);
750 } 797 }
751 798
752 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { 799 bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() {
753 return !worker_detachment_disallowed_.IsSet(); 800 return !worker_detachment_disallowed_.IsSet();
754 } 801 }
755 802
756 } // namespace internal 803 } // namespace internal
757 } // namespace base 804 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698