Chromium Code Reviews| Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
| diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| index 06933eb32db62386195ba3619fd71c03914346fd..6c1140ae39f9eecb7c32a15d12487cb571dee99d 100644 |
| --- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
| +++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| @@ -9,6 +9,7 @@ |
| #include <algorithm> |
| #include <utility> |
| +#include "base/atomicops.h" |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/lazy_instance.h" |
| @@ -21,6 +22,7 @@ |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/thread_local.h" |
| #include "base/threading/thread_restrictions.h" |
| +#include "base/time/time.h" |
| namespace base { |
| namespace internal { |
| @@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerParallelTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| - : traits_(traits), worker_pool_(worker_pool) {} |
| + : traits_(traits), worker_pool_(worker_pool) { |
| + DCHECK(worker_pool_); |
| + } |
| // TaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| - : traits_(traits), worker_pool_(worker_pool) {} |
| + : traits_(traits), worker_pool_(worker_pool) { |
| + DCHECK(worker_pool_); |
| + } |
| // SequencedTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -113,19 +119,30 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
| }; |
| +// Only used in DCHECKs. |
| +bool ContainsWorker( |
| + const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
| + const SchedulerWorker* worker) { |
| + auto it = std::find_if(workers.begin(), workers.end(), |
| + [worker](const std::unique_ptr<SchedulerWorker>& i) { |
| + return i.get() == worker; |
| + }); |
| + return it != workers.end(); |
| +} |
| + |
| +} // namespace |
| + |
| // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
| -class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| +class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
| + public SingleThreadTaskRunner { |
| public: |
| // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| // tasks so long as |worker_pool| and |worker| are alive. |
| // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
| // and |worker|. |
| SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| - SchedulerWorkerPool* worker_pool, |
| - SchedulerWorker* worker) |
| - : traits_(traits), |
| - worker_pool_(worker_pool), |
| - worker_(worker) {} |
| + SchedulerWorkerPoolImpl* worker_pool, |
| + SchedulerWorker* worker); |
| // SingleThreadTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -151,31 +168,18 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| } |
| private: |
| - ~SchedulerSingleThreadTaskRunner() override = default; |
| + ~SchedulerSingleThreadTaskRunner() override; |
| // Sequence for all Tasks posted through this TaskRunner. |
| const scoped_refptr<Sequence> sequence_ = new Sequence; |
| const TaskTraits traits_; |
| - SchedulerWorkerPool* const worker_pool_; |
| + SchedulerWorkerPoolImpl* const worker_pool_; |
| SchedulerWorker* const worker_; |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| }; |
| -// Only used in DCHECKs. |
| -bool ContainsWorker( |
| - const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
| - const SchedulerWorker* worker) { |
| - auto it = std::find_if(workers.begin(), workers.end(), |
| - [worker](const std::unique_ptr<SchedulerWorker>& i) { |
| - return i.get() == worker; |
| - }); |
| - return it != workers.end(); |
| -} |
| - |
| -} // namespace |
| - |
| class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| : public SchedulerWorker::Delegate { |
| public: |
| @@ -203,6 +207,14 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| TimeDelta GetSleepTimeout() override; |
| bool CanDetach(SchedulerWorker* worker) override; |
| + void RegisterSingleThreadTaskRunner() { |
| + subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
|
gab
2016/07/13 18:36:31
Let's wait until Dana/Francois settle the barrier/
robliao
2016/07/13 20:19:46
sgtm. As we are all well aware, atomics are tricky
|
| + } |
| + |
| + void UnregisterSingleThreadTaskRunner() { |
| + subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
| + } |
| + |
| private: |
| SchedulerWorkerPoolImpl* outer_; |
| const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| @@ -214,6 +226,11 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| // |single_threaded_priority_queue_|. |
| bool last_sequence_is_single_threaded_ = false; |
| + // True if the worker performed an idle cycle. Workers start out idle. |
| + bool performed_idle_cycle_ = true; |
|
gab
2016/07/13 18:36:31
is_idle_ ? Then it's true whether it's a new worke
robliao
2016/07/13 20:19:46
The delegate needs to distinguish between a thread
gab
2016/07/13 21:07:55
Ah!! Got it :-). Hadn't fully understood the logic
robliao
2016/07/19 22:03:47
I like performed_idle_cycle_ since it lets us expr
|
| + |
| + subtle::Atomic32 num_single_threaded_runners_ = 0; |
| + |
| const int index_; |
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| @@ -231,12 +248,13 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
| ThreadPriority thread_priority, |
| size_t max_threads, |
| IORestriction io_restriction, |
| + const TimeDelta& suggested_reclaim_time, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| TaskTracker* task_tracker, |
| DelayedTaskManager* delayed_task_manager) { |
| std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( |
| - new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker, |
| - delayed_task_manager)); |
| + new SchedulerWorkerPoolImpl(name, io_restriction, suggested_reclaim_time, |
| + task_tracker, delayed_task_manager)); |
| if (worker_pool->Initialize(thread_priority, max_threads, |
| re_enqueue_sequence_callback)) { |
| return worker_pool; |
| @@ -251,6 +269,8 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
| } |
| void SchedulerWorkerPoolImpl::JoinForTesting() { |
| + DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) << |
| + "Workers can detach during join."; |
|
gab
2016/07/13 18:36:31
CanWorker...() versus "Workers" (plural versus sin
robliao
2016/07/13 20:19:46
The act of detaching applies to a single worker, m
|
| for (const auto& worker : workers_) |
| worker->JoinForTesting(); |
| @@ -258,6 +278,11 @@ void SchedulerWorkerPoolImpl::JoinForTesting() { |
| join_for_testing_returned_.Signal(); |
| } |
| +void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { |
| + AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
| + worker_detachment_allowed_ = false; |
| +} |
| + |
| scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
| const TaskTraits& traits, |
| ExecutionMode execution_mode) { |
| @@ -368,6 +393,25 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
| } |
| } |
| +SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| + SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| + SchedulerWorkerPoolImpl* worker_pool, |
| + SchedulerWorker* worker) |
|
gab
2016/07/13 18:36:31
missing space: git cl format?
robliao
2016/07/13 20:19:46
Done.
|
| + : traits_(traits), |
| + worker_pool_(worker_pool), |
| + worker_(worker) { |
| + DCHECK(worker_pool_); |
| + DCHECK(worker_); |
| + static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| + RegisterSingleThreadTaskRunner(); |
| +} |
| + |
| +SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
| + ~SchedulerSingleThreadTaskRunner() { |
| + static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| + UnregisterSingleThreadTaskRunner(); |
| +} |
| + |
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| @@ -399,6 +443,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
| tls_current_worker.Get().Set(worker); |
| tls_current_worker_pool.Get().Set(outer_); |
| + // Workers start out idle, so this counts as an idle cycle. |
| + performed_idle_cycle_ = true; |
|
gab
2016/07/13 18:36:31
This is redundant per the value being initialized
robliao
2016/07/13 20:19:46
The delegate is reused across thread instances, so
|
| + |
| ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == |
| IORestriction::ALLOWED); |
| } |
| @@ -455,6 +502,9 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
| } |
| DCHECK(sequence); |
| + // We're doing work, so this is not an idle cycle. |
| + performed_idle_cycle_ = false; |
|
gab
2016/07/13 18:36:31
is_idle_ = false;
with no comments is sufficient
robliao
2016/07/13 20:19:46
sgtm. Removed.
|
| + |
| outer_->RemoveFromIdleWorkersStack(worker); |
| return sequence; |
| } |
| @@ -476,26 +526,39 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| GetSleepTimeout() { |
| - return TimeDelta::Max(); |
| + return outer_->suggested_reclaim_time_; |
| } |
| bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| SchedulerWorker* worker) { |
| - return false; |
| + // It's not an issue if |num_single_threaded_runners_| is incremented after |
| + // this because the next single-threaded task will simply pick up a new |
| + // physical thread. |
| + const bool can_detach = |
| + performed_idle_cycle_ && |
| + worker != outer_->PeekAtIdleWorkersStack() && |
| + !subtle::Release_Load(&num_single_threaded_runners_) && |
| + outer_->CanWorkerDetachForTesting(); |
| + // CanDetach is part of a SchedulerWorker's idle cycle. |
| + performed_idle_cycle_ = true; |
|
gab
2016/07/13 18:36:31
Shouldn't is_idle_ instead be set when about to re
robliao
2016/07/13 20:19:46
See is_idle_ comment.
gab
2016/07/13 21:07:55
Perhaps expand comment above here to explain the d
robliao
2016/07/19 22:03:47
Done.
|
| + return can_detach; |
| } |
| SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| StringPiece name, |
| IORestriction io_restriction, |
| + const TimeDelta& suggested_reclaim_time, |
| TaskTracker* task_tracker, |
| DelayedTaskManager* delayed_task_manager) |
| : name_(name.as_string()), |
| io_restriction_(io_restriction), |
| + suggested_reclaim_time_(suggested_reclaim_time), |
| idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
| idle_workers_stack_cv_for_testing_( |
| idle_workers_stack_lock_.CreateConditionVariable()), |
| join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::NOT_SIGNALED), |
| + worker_detachment_allowed_(true), |
|
gab
2016/07/13 18:36:31
If keeping the bool (ref. WaitableEvent comment),
robliao
2016/07/13 20:19:46
See earlier comment.
|
| #if DCHECK_IS_ON() |
| workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::NOT_SIGNALED), |
| @@ -521,7 +584,9 @@ bool SchedulerWorkerPoolImpl::Initialize( |
| this, re_enqueue_sequence_callback, |
| &shared_priority_queue_, static_cast<int>(i))), |
| task_tracker_, |
| - SchedulerWorker::InitialState::ALIVE); |
| + i == 0 |
| + ? SchedulerWorker::InitialState::ALIVE |
| + : SchedulerWorker::InitialState::DETACHED); |
| if (!worker) |
| break; |
| idle_workers_stack_.Push(worker.get()); |
| @@ -548,18 +613,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| - idle_workers_stack_.Push(worker); |
| + // Detachment may cause multiple attempts to add because the delegate cannot |
|
gab
2016/07/13 18:36:31
Shouldn't a thread that wakes up after it's sleep
robliao
2016/07/13 20:19:46
We can't distinguish between sleep timeouts and ac
|
| + // determine who woke it up. As a result, when it wakes up, it may conclude |
| + // there's no work to be done and attempt to add itself to the idle stack |
| + // again. |
| + if (!idle_workers_stack_.Contains(worker)) |
| + idle_workers_stack_.Push(worker); |
| + |
| DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
| if (idle_workers_stack_.Size() == workers_.size()) |
| idle_workers_stack_cv_for_testing_->Broadcast(); |
| } |
| +const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { |
| + AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| + return idle_workers_stack_.Peek(); |
| +} |
| + |
| void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| idle_workers_stack_.Remove(worker); |
| } |
| +bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
| + AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
| + return worker_detachment_allowed_; |
| +} |
| + |
| } // namespace internal |
| } // namespace base |