Chromium Code Reviews| Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
| diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| index 9f521e229cabb9dd7c52abcc7bc92be9d7b9aa69..17529abed518d734faac2d697c8218d19bb69057 100644 |
| --- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
| +++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| @@ -9,6 +9,7 @@ |
| #include <algorithm> |
| #include <utility> |
| +#include "base/atomicops.h" |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/lazy_instance.h" |
| @@ -21,6 +22,7 @@ |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/thread_local.h" |
| #include "base/threading/thread_restrictions.h" |
| +#include "base/time/time.h" |
| namespace base { |
| namespace internal { |
| @@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerParallelTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| - : traits_(traits), worker_pool_(worker_pool) {} |
| + : traits_(traits), worker_pool_(worker_pool) { |
| + DCHECK(worker_pool_); |
| + } |
| // TaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| - : traits_(traits), worker_pool_(worker_pool) {} |
| + : traits_(traits), worker_pool_(worker_pool) { |
| + DCHECK(worker_pool_); |
| + } |
| // SequencedTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -113,19 +119,30 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
| }; |
| +// Only used in DCHECKs. |
| +bool ContainsWorker( |
| + const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
| + const SchedulerWorker* worker) { |
| + auto it = std::find_if(workers.begin(), workers.end(), |
| + [worker](const std::unique_ptr<SchedulerWorker>& i) { |
| + return i.get() == worker; |
| + }); |
| + return it != workers.end(); |
| +} |
| + |
| +} // namespace |
| + |
| // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
| -class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| +class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
| + public SingleThreadTaskRunner { |
| public: |
| // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| // tasks so long as |worker_pool| and |worker| are alive. |
| // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
| // and |worker|. |
| SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| - SchedulerWorkerPool* worker_pool, |
| - SchedulerWorker* worker) |
| - : traits_(traits), |
| - worker_pool_(worker_pool), |
| - worker_(worker) {} |
| + SchedulerWorkerPoolImpl* worker_pool, |
|
fdoray
2016/07/20 14:15:43
Can stay SchedulerWorkerPool*
robliao
2016/07/20 19:44:01
Done.
|
| + SchedulerWorker* worker); |
| // SingleThreadTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -151,31 +168,18 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| } |
| private: |
| - ~SchedulerSingleThreadTaskRunner() override = default; |
| + ~SchedulerSingleThreadTaskRunner() override; |
| // Sequence for all Tasks posted through this TaskRunner. |
| const scoped_refptr<Sequence> sequence_ = new Sequence; |
| const TaskTraits traits_; |
| - SchedulerWorkerPool* const worker_pool_; |
| + SchedulerWorkerPoolImpl* const worker_pool_; |
| SchedulerWorker* const worker_; |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| }; |
| -// Only used in DCHECKs. |
| -bool ContainsWorker( |
| - const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
| - const SchedulerWorker* worker) { |
| - auto it = std::find_if(workers.begin(), workers.end(), |
| - [worker](const std::unique_ptr<SchedulerWorker>& i) { |
| - return i.get() == worker; |
| - }); |
| - return it != workers.end(); |
| -} |
| - |
| -} // namespace |
| - |
| class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| : public SchedulerWorker::Delegate { |
| public: |
| @@ -203,6 +207,14 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| TimeDelta GetSleepTimeout() override; |
| bool CanDetach(SchedulerWorker* worker) override; |
| + void RegisterSingleThreadTaskRunner() { |
| + subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
| + } |
| + |
| + void UnregisterSingleThreadTaskRunner() { |
| + subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
| + } |
| + |
| private: |
| SchedulerWorkerPoolImpl* outer_; |
| const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| @@ -214,6 +226,11 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| // |single_threaded_priority_queue_|. |
| bool last_sequence_is_single_threaded_ = false; |
| + // True if the worker performed an idle cycle. Workers start out idle. |
| + bool performed_idle_cycle_ = true; |
| + |
| + subtle::Atomic32 num_single_threaded_runners_ = 0; |
| + |
| const int index_; |
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| @@ -234,6 +251,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
| std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( |
| new SchedulerWorkerPoolImpl(params.name(), |
| params.io_restriction(), |
| + params.suggested_reclaim_time(), |
| task_tracker, delayed_task_manager)); |
| if (worker_pool->Initialize(params.thread_priority(), |
| params.max_threads(), |
| @@ -250,6 +268,8 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
| } |
| void SchedulerWorkerPoolImpl::JoinForTesting() { |
| + DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) << |
| + "Workers can detach during join."; |
| for (const auto& worker : workers_) |
| worker->JoinForTesting(); |
| @@ -257,6 +277,11 @@ void SchedulerWorkerPoolImpl::JoinForTesting() { |
| join_for_testing_returned_.Signal(); |
| } |
| +void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { |
| + AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
| + worker_detachment_allowed_ = false; |
| +} |
| + |
| scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
| const TaskTraits& traits, |
| ExecutionMode execution_mode) { |
| @@ -367,6 +392,25 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
| } |
| } |
| +SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| + SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| + SchedulerWorkerPoolImpl* worker_pool, |
| + SchedulerWorker* worker) |
| + : traits_(traits), |
| + worker_pool_(worker_pool), |
| + worker_(worker) { |
| + DCHECK(worker_pool_); |
| + DCHECK(worker_); |
| + static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| + RegisterSingleThreadTaskRunner(); |
| +} |
| + |
| +SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| + ~SchedulerSingleThreadTaskRunner() { |
| + static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| + UnregisterSingleThreadTaskRunner(); |
| +} |
| + |
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| @@ -398,6 +442,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
| tls_current_worker.Get().Set(worker); |
| tls_current_worker_pool.Get().Set(outer_); |
| + // Workers start out idle, so this counts as an idle cycle. |
| + performed_idle_cycle_ = true; |
|
gab
2016/07/20 01:45:21
So if we create/wake one too many thread (i.e. 2 t
robliao
2016/07/20 19:44:01
Yes, if we wake one too many threads with no work,
gab
2016/07/20 20:26:31
It's not necessarily a bug. It can today under a s
robliao
2016/07/20 22:18:03
Fair enough. I've moved this to an idle_start_time
gab
2016/07/21 13:43:21
I like the new version very much, easier to read a
|
| + |
| ThreadRestrictions::SetIOAllowed( |
| outer_->io_restriction_ == |
| SchedulerWorkerPoolParams::IORestriction::ALLOWED); |
| @@ -455,6 +502,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
| } |
| DCHECK(sequence); |
| + performed_idle_cycle_ = false; |
| + |
| outer_->RemoveFromIdleWorkersStack(worker); |
| return sequence; |
| } |
| @@ -476,26 +525,43 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| GetSleepTimeout() { |
| - return TimeDelta::Max(); |
| + return outer_->suggested_reclaim_time_; |
| } |
| bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| SchedulerWorker* worker) { |
| - return false; |
| + // It's not an issue if |num_single_threaded_runners_| is incremented after |
|
fdoray
2016/07/20 14:15:43
[...] after this because the newly created TaskRun
robliao
2016/07/20 19:44:00
Done.
|
| + // this because the next single-threaded task will simply pick up a new |
| + // physical thread. |
| + const bool can_detach = |
| + performed_idle_cycle_ && |
| + worker != outer_->PeekAtIdleWorkersStack() && |
| + !subtle::Release_Load(&num_single_threaded_runners_) && |
| + outer_->CanWorkerDetachForTesting(); |
| + // Mark that we've performed an idle cycle here. This helps distinguish |
| + // between a thread that had GetWork() return nullptr and a thread that woke |
| + // up and received nullptr again from GetWork(). We can't simply track if |
| + // we're idle because it would make detachment happen too early since |
| + // the worker calls CanDetach() immediately after GetWork() returns nullptr. |
| + performed_idle_cycle_ = true; |
| + return can_detach; |
| } |
| SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| StringPiece name, |
| SchedulerWorkerPoolParams::IORestriction io_restriction, |
| + const TimeDelta& suggested_reclaim_time, |
| TaskTracker* task_tracker, |
| DelayedTaskManager* delayed_task_manager) |
| : name_(name.as_string()), |
| io_restriction_(io_restriction), |
| + suggested_reclaim_time_(suggested_reclaim_time), |
| idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
| idle_workers_stack_cv_for_testing_( |
| idle_workers_stack_lock_.CreateConditionVariable()), |
| join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::NOT_SIGNALED), |
| + worker_detachment_allowed_(true), |
|
gab
2016/07/20 01:45:21
(bool should be replaced by flag as mentioned in p
robliao
2016/07/20 19:44:00
Moved initialization.
|
| #if DCHECK_IS_ON() |
| workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::NOT_SIGNALED), |
| @@ -521,7 +587,9 @@ bool SchedulerWorkerPoolImpl::Initialize( |
| this, re_enqueue_sequence_callback, |
| &shared_priority_queue_, static_cast<int>(i))), |
| task_tracker_, |
| - SchedulerWorker::InitialState::ALIVE); |
| + i == 0 |
| + ? SchedulerWorker::InitialState::ALIVE |
| + : SchedulerWorker::InitialState::DETACHED); |
| if (!worker) |
| break; |
| idle_workers_stack_.Push(worker.get()); |
| @@ -548,18 +616,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| - idle_workers_stack_.Push(worker); |
| + // Detachment may cause multiple attempts to add because the delegate cannot |
| + // determine who woke it up. As a result, when it wakes up, it may conclude |
| + // there's no work to be done and attempt to add itself to the idle stack |
| + // again. |
| + if (!idle_workers_stack_.Contains(worker)) |
| + idle_workers_stack_.Push(worker); |
| + |
| DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
| if (idle_workers_stack_.Size() == workers_.size()) |
| idle_workers_stack_cv_for_testing_->Broadcast(); |
| } |
| +const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { |
| + AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| + return idle_workers_stack_.Peek(); |
| +} |
| + |
| void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| idle_workers_stack_.Remove(worker); |
| } |
| +bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
| + AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
| + return worker_detachment_allowed_; |
| +} |
| + |
| } // namespace internal |
| } // namespace base |