Chromium Code Reviews| Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
| diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| index 06933eb32db62386195ba3619fd71c03914346fd..954bdfab3b905e2a6653fe5db7ecaacd4247fba3 100644 |
| --- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
| +++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| @@ -9,6 +9,7 @@ |
| #include <algorithm> |
| #include <utility> |
| +#include "base/atomicops.h" |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/lazy_instance.h" |
| @@ -21,6 +22,7 @@ |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/thread_local.h" |
| #include "base/threading/thread_restrictions.h" |
| +#include "base/time/time.h" |
| namespace base { |
| namespace internal { |
| @@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerParallelTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| - : traits_(traits), worker_pool_(worker_pool) {} |
| + : traits_(traits), worker_pool_(worker_pool) { |
| + DCHECK(worker_pool_); |
| + } |
| // TaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| - : traits_(traits), worker_pool_(worker_pool) {} |
| + : traits_(traits), worker_pool_(worker_pool) { |
| + DCHECK(worker_pool_); |
| + } |
| // SequencedTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -121,11 +127,15 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
| // and |worker|. |
| SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| - SchedulerWorkerPool* worker_pool, |
| + SchedulerWorkerPoolImpl* worker_pool, |
| SchedulerWorker* worker) |
| : traits_(traits), |
| worker_pool_(worker_pool), |
| - worker_(worker) {} |
| + worker_(worker) { |
| + DCHECK(worker_pool_); |
| + DCHECK(worker_); |
| + worker_pool_->RegisterSingleThreadTaskRunner(worker_); |
| + } |
| // SingleThreadTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -151,13 +161,15 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| } |
| private: |
| - ~SchedulerSingleThreadTaskRunner() override = default; |
| + ~SchedulerSingleThreadTaskRunner() override { |
| + worker_pool_->UnregisterSingleThreadTaskRunner(worker_); |
| + } |
| // Sequence for all Tasks posted through this TaskRunner. |
| const scoped_refptr<Sequence> sequence_ = new Sequence; |
| const TaskTraits traits_; |
| - SchedulerWorkerPool* const worker_pool_; |
| + SchedulerWorkerPoolImpl* const worker_pool_; |
| SchedulerWorker* const worker_; |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| @@ -187,6 +199,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| // the pool name to label the underlying worker threads. |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| + const TimeDelta& suggested_reclaim_time, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| const PriorityQueue* shared_priority_queue, |
| int index); |
| @@ -203,8 +216,17 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| TimeDelta GetSleepTimeout() override; |
| bool CanDetach(SchedulerWorker* worker) override; |
| + void RegisterSingleThreadTaskRunner() { |
| + subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
| + } |
| + |
| + void UnregisterSingleThreadTaskRunner() { |
| + subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
| + } |
| + |
| private: |
| SchedulerWorkerPoolImpl* outer_; |
| + const TimeDelta suggested_reclaim_time_; |
| const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| // Single-threaded PriorityQueue for the worker. |
| @@ -214,6 +236,8 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| // |single_threaded_priority_queue_|. |
| bool last_sequence_is_single_threaded_ = false; |
| + subtle::Atomic32 num_single_threaded_runners_ = 0; |
| + |
| const int index_; |
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| @@ -231,6 +255,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
| ThreadPriority thread_priority, |
| size_t max_threads, |
| IORestriction io_restriction, |
| + const TimeDelta& suggested_reclaim_time, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| TaskTracker* task_tracker, |
| DelayedTaskManager* delayed_task_manager) { |
| @@ -238,6 +263,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
| new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker, |
| delayed_task_manager)); |
| if (worker_pool->Initialize(thread_priority, max_threads, |
| + suggested_reclaim_time, |
| re_enqueue_sequence_callback)) { |
| return worker_pool; |
| } |
| @@ -251,6 +277,10 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
| } |
| void SchedulerWorkerPoolImpl::JoinForTesting() { |
| + { |
| + AutoSchedulerLock auto_lock(join_for_testing_called_lock_); |
| + join_for_testing_called_ = true; |
| + } |
| for (const auto& worker : workers_) |
| worker->JoinForTesting(); |
| @@ -368,13 +398,29 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
| } |
| } |
| +void SchedulerWorkerPoolImpl::RegisterSingleThreadTaskRunner( |
| + SchedulerWorker* worker) { |
| + auto delegate = |
| + static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()); |
| + delegate->RegisterSingleThreadTaskRunner(); |
| +} |
| + |
| +void SchedulerWorkerPoolImpl::UnregisterSingleThreadTaskRunner( |
| + SchedulerWorker* worker) { |
| + auto delegate = |
| + static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()); |
| + delegate->UnregisterSingleThreadTaskRunner(); |
| +} |
| + |
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| + const TimeDelta& suggested_reclaim_time, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| const PriorityQueue* shared_priority_queue, |
| int index) |
| : outer_(outer), |
| + suggested_reclaim_time_(suggested_reclaim_time), |
| re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
| single_threaded_priority_queue_(shared_priority_queue), |
| index_(index) {} |
| @@ -476,12 +522,14 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| GetSleepTimeout() { |
| - return TimeDelta::Max(); |
| + return suggested_reclaim_time_; |
|
fdoray
2016/07/04 19:41:32
From scheduler_worker.cc:
while (...) {
scoped_
robliao
2016/07/07 17:49:16
Nice catch! Nothing at the moment. Fixed with an i
|
| } |
| bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| SchedulerWorker* worker) { |
| - return false; |
| + return worker != outer_->PeekAtIdleWorkersStack() && |
| + !subtle::Release_Load(&num_single_threaded_runners_) && |
|
fdoray
2016/07/04 19:41:32
Add comment explaining why it doesn't matter if |n
gab
2016/07/07 15:58:30
Why do you need Release_Load() here?
robliao
2016/07/07 17:49:16
Done.
gab
2016/07/13 18:36:31
Hmmm, I don't think this matters, idle thread acco
robliao
2016/07/13 20:19:46
In either case, it's the safest thing to do here.
gab
2016/07/13 21:07:55
I disagree. It's not safer. It's equivalent (plus
robliao
2016/07/19 22:03:47
In light of the atomic ops discussion, any strong
gab
2016/07/20 01:45:21
From the conclusion we derived, I think the inc/de
robliao
2016/07/20 19:44:00
I've my understanding is right, the atomic ops are
gab
2016/07/20 20:26:31
We have different understandings on a few things (
robliao
2016/07/20 22:18:03
data types (the whole point of atomic RMW is to ma
gab
2016/07/21 13:43:21
I'm not convinced this is true. The RMW will add a
robliao
2016/07/21 18:44:23
Digging into this deeper, the RMW will happen atom
gab
2016/07/21 21:24:24
Refcount is different, no one ever wants to just "
robliao
2016/07/22 16:44:06
Fenced the load.
|
| + !outer_->HasJoinedForTesting(); |
| } |
| SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| @@ -496,6 +544,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| idle_workers_stack_lock_.CreateConditionVariable()), |
| join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::NOT_SIGNALED), |
| + join_for_testing_called_(false), |
| #if DCHECK_IS_ON() |
| workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::NOT_SIGNALED), |
| @@ -509,6 +558,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| bool SchedulerWorkerPoolImpl::Initialize( |
| ThreadPriority thread_priority, |
| size_t max_threads, |
| + const TimeDelta& suggested_reclaim_time, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| @@ -518,10 +568,13 @@ bool SchedulerWorkerPoolImpl::Initialize( |
| std::unique_ptr<SchedulerWorker> worker = |
| SchedulerWorker::Create( |
| thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( |
| - this, re_enqueue_sequence_callback, |
| + this, suggested_reclaim_time, |
| + re_enqueue_sequence_callback, |
| &shared_priority_queue_, static_cast<int>(i))), |
| task_tracker_, |
| - SchedulerWorker::InitialState::ALIVE); |
| + i == 0 |
| + ? SchedulerWorker::InitialState::ALIVE |
| + : SchedulerWorker::InitialState::DETACHED); |
| if (!worker) |
| break; |
| idle_workers_stack_.Push(worker.get()); |
| @@ -548,18 +601,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| - idle_workers_stack_.Push(worker); |
| + // Detachment may cause multiple attempts to add because the delegate cannot |
| + // determine who woke it up. As a result, when it wakes up, it may conclude |
| + // there's no work to be done and attempt to add itself to the idle stack |
| + // again. |
| + if (!idle_workers_stack_.Contains(worker)) |
|
fdoray
2016/07/04 19:41:32
A detached worker is only re-created when its Wake
robliao
2016/07/07 17:49:16
Prior to this change, GetWork had two precondition
fdoray
2016/07/07 20:45:56
You're right. Thanks!
|
| + idle_workers_stack_.Push(worker); |
| + |
| DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
| if (idle_workers_stack_.Size() == workers_.size()) |
| idle_workers_stack_cv_for_testing_->Broadcast(); |
| } |
| +const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { |
| + AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| + return idle_workers_stack_.Peek(); |
| +} |
| + |
| void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| idle_workers_stack_.Remove(worker); |
| } |
| +bool SchedulerWorkerPoolImpl::HasJoinedForTesting() { |
| + AutoSchedulerLock auto_lock(join_for_testing_called_lock_); |
| + return join_for_testing_called_; |
| +} |
| + |
| } // namespace internal |
| } // namespace base |