Chromium Code Reviews| Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
| diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| index 9f521e229cabb9dd7c52abcc7bc92be9d7b9aa69..93de94913af0b05ac9ea0f64764b116c8ab7a452 100644 |
| --- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
| +++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| @@ -9,6 +9,7 @@ |
| #include <algorithm> |
| #include <utility> |
| +#include "base/atomicops.h" |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/lazy_instance.h" |
| @@ -21,6 +22,7 @@ |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/thread_local.h" |
| #include "base/threading/thread_restrictions.h" |
| +#include "base/time/time.h" |
| namespace base { |
| namespace internal { |
| @@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerParallelTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| - : traits_(traits), worker_pool_(worker_pool) {} |
| + : traits_(traits), worker_pool_(worker_pool) { |
| + DCHECK(worker_pool_); |
| + } |
| // TaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| - : traits_(traits), worker_pool_(worker_pool) {} |
| + : traits_(traits), worker_pool_(worker_pool) { |
| + DCHECK(worker_pool_); |
| + } |
| // SequencedTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -113,8 +119,22 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
| }; |
| +// Only used in DCHECKs. |
| +bool ContainsWorker( |
| + const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
| + const SchedulerWorker* worker) { |
| + auto it = std::find_if(workers.begin(), workers.end(), |
| + [worker](const std::unique_ptr<SchedulerWorker>& i) { |
| + return i.get() == worker; |
| + }); |
| + return it != workers.end(); |
| +} |
| + |
| +} // namespace |
| + |
| // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
| -class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| +class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
| + public SingleThreadTaskRunner { |
| public: |
| // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| // tasks so long as |worker_pool| and |worker| are alive. |
| @@ -122,10 +142,7 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| // and |worker|. |
| SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool, |
| - SchedulerWorker* worker) |
| - : traits_(traits), |
| - worker_pool_(worker_pool), |
| - worker_(worker) {} |
| + SchedulerWorker* worker); |
| // SingleThreadTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -151,7 +168,7 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| } |
| private: |
| - ~SchedulerSingleThreadTaskRunner() override = default; |
| + ~SchedulerSingleThreadTaskRunner() override; |
| // Sequence for all Tasks posted through this TaskRunner. |
| const scoped_refptr<Sequence> sequence_ = new Sequence; |
| @@ -163,19 +180,6 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| }; |
| -// Only used in DCHECKs. |
| -bool ContainsWorker( |
| - const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
| - const SchedulerWorker* worker) { |
| - auto it = std::find_if(workers.begin(), workers.end(), |
| - [worker](const std::unique_ptr<SchedulerWorker>& i) { |
| - return i.get() == worker; |
| - }); |
| - return it != workers.end(); |
| -} |
| - |
| -} // namespace |
| - |
| class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| : public SchedulerWorker::Delegate { |
| public: |
| @@ -203,6 +207,14 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| TimeDelta GetSleepTimeout() override; |
| bool CanDetach(SchedulerWorker* worker) override; |
| + void RegisterSingleThreadTaskRunner() { |
| + subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
| + } |
| + |
| + void UnregisterSingleThreadTaskRunner() { |
| + subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
| + } |
| + |
| private: |
| SchedulerWorkerPoolImpl* outer_; |
| const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| @@ -214,6 +226,11 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| // |single_threaded_priority_queue_|. |
| bool last_sequence_is_single_threaded_ = false; |
| + // Time when GetWork() first returned nullptr. |
| + Time idle_start_time_; |
| + |
| + subtle::Atomic32 num_single_threaded_runners_ = 0; |
| + |
| const int index_; |
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| @@ -234,6 +251,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
| std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( |
| new SchedulerWorkerPoolImpl(params.name(), |
| params.io_restriction(), |
| + params.suggested_reclaim_time(), |
| task_tracker, delayed_task_manager)); |
| if (worker_pool->Initialize(params.thread_priority(), |
| params.max_threads(), |
| @@ -250,6 +268,8 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
| } |
| void SchedulerWorkerPoolImpl::JoinForTesting() { |
| + DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) << |
| + "Workers can detach during join."; |
| for (const auto& worker : workers_) |
| worker->JoinForTesting(); |
| @@ -257,6 +277,11 @@ void SchedulerWorkerPoolImpl::JoinForTesting() { |
| join_for_testing_returned_.Signal(); |
| } |
| +void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { |
| + AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
| + worker_detachment_allowed_ = false; |
| +} |
| + |
| scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
| const TaskTraits& traits, |
| ExecutionMode execution_mode) { |
| @@ -367,6 +392,25 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
| } |
| } |
| +SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| + SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| + SchedulerWorkerPool* worker_pool, |
| + SchedulerWorker* worker) |
| + : traits_(traits), |
| + worker_pool_(worker_pool), |
| + worker_(worker) { |
| + DCHECK(worker_pool_); |
| + DCHECK(worker_); |
| + static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| + RegisterSingleThreadTaskRunner(); |
| +} |
| + |
| +SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| + ~SchedulerSingleThreadTaskRunner() { |
| + static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| + UnregisterSingleThreadTaskRunner(); |
| +} |
| + |
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| @@ -398,6 +442,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
| tls_current_worker.Get().Set(worker); |
| tls_current_worker_pool.Get().Set(outer_); |
| + // New threads haven't run GetWork() yet, so reset the idle_start_time_. |
| + idle_start_time_ = Time(); |
| + |
| ThreadRestrictions::SetIOAllowed( |
| outer_->io_restriction_ == |
| SchedulerWorkerPoolParams::IORestriction::ALLOWED); |
| @@ -432,6 +479,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
| // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. |
| // No thread runs the Sequence inserted in step 2. |
| outer_->AddToIdleWorkersStack(worker); |
| + if (idle_start_time_.is_null()) |
| + idle_start_time_ = Time::Now(); |
| return nullptr; |
| } |
| @@ -455,6 +504,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
| } |
| DCHECK(sequence); |
| + idle_start_time_ = Time(); |
| + |
| outer_->RemoveFromIdleWorkersStack(worker); |
| return sequence; |
| } |
| @@ -476,21 +527,33 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| GetSleepTimeout() { |
| - return TimeDelta::Max(); |
| + return outer_->suggested_reclaim_time_; |
| } |
| bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| SchedulerWorker* worker) { |
| - return false; |
| + // It's not an issue if |num_single_threaded_runners_| is incremented after |
| + // this because the newly created TaskRunner (from which no task has run yet) |
| + // will simply run all its tasks on the next physical thread created by the |
| + // worker. |
| + const bool can_detach = |
| + !idle_start_time_.is_null() && |
| + (Time::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && |
|
gab
2016/07/21 13:43:22
Use TimeTicks? IIRC they're more reliable (and may
fdoray
2016/07/21 13:59:08
Yes, use TimeTicks.
Time = use it to get an absol
robliao
2016/07/21 18:44:23
Done.
|
| + worker != outer_->PeekAtIdleWorkersStack() && |
| + !subtle::NoBarrier_Load(&num_single_threaded_runners_) && |
| + outer_->CanWorkerDetachForTesting(); |
| + return can_detach; |
| } |
| SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| StringPiece name, |
| SchedulerWorkerPoolParams::IORestriction io_restriction, |
| + const TimeDelta& suggested_reclaim_time, |
| TaskTracker* task_tracker, |
| DelayedTaskManager* delayed_task_manager) |
| : name_(name.as_string()), |
| io_restriction_(io_restriction), |
| + suggested_reclaim_time_(suggested_reclaim_time), |
| idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
| idle_workers_stack_cv_for_testing_( |
| idle_workers_stack_lock_.CreateConditionVariable()), |
| @@ -521,7 +584,9 @@ bool SchedulerWorkerPoolImpl::Initialize( |
| this, re_enqueue_sequence_callback, |
| &shared_priority_queue_, static_cast<int>(i))), |
| task_tracker_, |
| - SchedulerWorker::InitialState::ALIVE); |
| + i == 0 |
| + ? SchedulerWorker::InitialState::ALIVE |
| + : SchedulerWorker::InitialState::DETACHED); |
| if (!worker) |
| break; |
| idle_workers_stack_.Push(worker.get()); |
| @@ -548,18 +613,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| - idle_workers_stack_.Push(worker); |
| + // Detachment may cause multiple attempts to add because the delegate cannot |
| + // determine who woke it up. As a result, when it wakes up, it may conclude |
| + // there's no work to be done and attempt to add itself to the idle stack |
| + // again. |
| + if (!idle_workers_stack_.Contains(worker)) |
| + idle_workers_stack_.Push(worker); |
| + |
| DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
| if (idle_workers_stack_.Size() == workers_.size()) |
| idle_workers_stack_cv_for_testing_->Broadcast(); |
| } |
| +const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { |
| + AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| + return idle_workers_stack_.Peek(); |
| +} |
| + |
| void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| idle_workers_stack_.Remove(worker); |
| } |
| +bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
| + AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
| + return worker_detachment_allowed_; |
| +} |
| + |
| } // namespace internal |
| } // namespace base |