Chromium Code Reviews| Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
| diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| index 06933eb32db62386195ba3619fd71c03914346fd..677e53358866f5787b549eaab00bcf8467c443a7 100644 |
| --- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
| +++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
| @@ -9,6 +9,7 @@ |
| #include <algorithm> |
| #include <utility> |
| +#include "base/atomicops.h" |
| #include "base/bind.h" |
| #include "base/bind_helpers.h" |
| #include "base/lazy_instance.h" |
| @@ -21,6 +22,7 @@ |
| #include "base/threading/platform_thread.h" |
| #include "base/threading/thread_local.h" |
| #include "base/threading/thread_restrictions.h" |
| +#include "base/time/time.h" |
| namespace base { |
| namespace internal { |
| @@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerParallelTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| - : traits_(traits), worker_pool_(worker_pool) {} |
| + : traits_(traits), worker_pool_(worker_pool) { |
| + DCHECK(worker_pool_); |
| + } |
| // TaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
| SchedulerSequencedTaskRunner(const TaskTraits& traits, |
| SchedulerWorkerPool* worker_pool) |
| - : traits_(traits), worker_pool_(worker_pool) {} |
| + : traits_(traits), worker_pool_(worker_pool) { |
| + DCHECK(worker_pool_); |
| + } |
| // SequencedTaskRunner: |
| bool PostDelayedTask(const tracked_objects::Location& from_here, |
| @@ -113,56 +119,6 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
| DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
| }; |
| -// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
| -class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
| - public: |
| - // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| - // tasks so long as |worker_pool| and |worker| are alive. |
| - // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
| - // and |worker|. |
| - SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| - SchedulerWorkerPool* worker_pool, |
| - SchedulerWorker* worker) |
| - : traits_(traits), |
| - worker_pool_(worker_pool), |
| - worker_(worker) {} |
| - |
| - // SingleThreadTaskRunner: |
| - bool PostDelayedTask(const tracked_objects::Location& from_here, |
| - const Closure& closure, |
| - TimeDelta delay) override { |
| - std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
| - task->single_thread_task_runner_ref = this; |
| - |
| - // Post the task to be executed by |worker_| as part of |sequence_|. |
| - return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
| - worker_); |
| - } |
| - |
| - bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| - const Closure& closure, |
| - base::TimeDelta delay) override { |
| - // Tasks are never nested within the task scheduler. |
| - return PostDelayedTask(from_here, closure, delay); |
| - } |
| - |
| - bool RunsTasksOnCurrentThread() const override { |
| - return tls_current_worker.Get().Get() == worker_; |
| - } |
| - |
| - private: |
| - ~SchedulerSingleThreadTaskRunner() override = default; |
| - |
| - // Sequence for all Tasks posted through this TaskRunner. |
| - const scoped_refptr<Sequence> sequence_ = new Sequence; |
| - |
| - const TaskTraits traits_; |
| - SchedulerWorkerPool* const worker_pool_; |
| - SchedulerWorker* const worker_; |
| - |
| - DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| -}; |
| - |
| // Only used in DCHECKs. |
| bool ContainsWorker( |
| const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
| @@ -187,6 +143,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| // the pool name to label the underlying worker threads. |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| + const TimeDelta& suggested_reclaim_time, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| const PriorityQueue* shared_priority_queue, |
| int index); |
| @@ -203,8 +160,17 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| TimeDelta GetSleepTimeout() override; |
| bool CanDetach(SchedulerWorker* worker) override; |
| + void RegisterSingleThreadTaskRunner() { |
| + subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
| + } |
| + |
| + void UnregisterSingleThreadTaskRunner() { |
| + subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
| + } |
| + |
| private: |
| SchedulerWorkerPoolImpl* outer_; |
| + const TimeDelta suggested_reclaim_time_; |
| const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
| // Single-threaded PriorityQueue for the worker. |
| @@ -214,11 +180,75 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
| // |single_threaded_priority_queue_|. |
| bool last_sequence_is_single_threaded_ = false; |
| + // True if the worker performed and idle cycle. Workers start out idle. |
|
fdoray
2016/07/08 18:58:06
an* idle cycle
robliao
2016/07/08 21:17:03
Done.
|
| + bool performed_idle_cycle_ = true; |
| + |
| + subtle::Atomic32 num_single_threaded_runners_ = 0; |
| + |
| const int index_; |
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
| }; |
| +// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
| +class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
| + public SingleThreadTaskRunner { |
| + public: |
| + // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
| + // tasks so long as |worker_pool| and |worker| are alive. |
| + // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
| + // and |worker|. |
| + SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
| + SchedulerWorkerPoolImpl* worker_pool, |
| + SchedulerWorker* worker) |
| + : traits_(traits), |
| + worker_pool_(worker_pool), |
| + worker_(worker) { |
| + DCHECK(worker_pool_); |
| + DCHECK(worker_); |
| + static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| + RegisterSingleThreadTaskRunner(); |
| + } |
| + |
| + // SingleThreadTaskRunner: |
| + bool PostDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + TimeDelta delay) override { |
| + std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
| + task->single_thread_task_runner_ref = this; |
| + |
| + // Post the task to be executed by |worker_| as part of |sequence_|. |
| + return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
| + worker_); |
| + } |
| + |
| + bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
| + const Closure& closure, |
| + base::TimeDelta delay) override { |
| + // Tasks are never nested within the task scheduler. |
| + return PostDelayedTask(from_here, closure, delay); |
| + } |
| + |
| + bool RunsTasksOnCurrentThread() const override { |
| + return tls_current_worker.Get().Get() == worker_; |
| + } |
| + |
| + private: |
| + ~SchedulerSingleThreadTaskRunner() override { |
| + static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
| + UnregisterSingleThreadTaskRunner(); |
| + } |
| + |
| + // Sequence for all Tasks posted through this TaskRunner. |
| + const scoped_refptr<Sequence> sequence_ = new Sequence; |
| + |
| + const TaskTraits traits_; |
| + SchedulerWorkerPoolImpl* const worker_pool_; |
| + SchedulerWorker* const worker_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
| +}; |
| + |
| SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
| // SchedulerWorkerPool should never be deleted in production unless its |
| // initialization failed. |
| @@ -231,6 +261,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
| ThreadPriority thread_priority, |
| size_t max_threads, |
| IORestriction io_restriction, |
| + const TimeDelta& suggested_reclaim_time, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| TaskTracker* task_tracker, |
| DelayedTaskManager* delayed_task_manager) { |
| @@ -238,6 +269,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
| new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker, |
| delayed_task_manager)); |
| if (worker_pool->Initialize(thread_priority, max_threads, |
| + suggested_reclaim_time, |
| re_enqueue_sequence_callback)) { |
| return worker_pool; |
| } |
| @@ -251,6 +283,10 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
| } |
| void SchedulerWorkerPoolImpl::JoinForTesting() { |
| + { |
| + AutoSchedulerLock auto_lock(join_for_testing_called_lock_); |
| + join_for_testing_called_ = true; |
| + } |
| for (const auto& worker : workers_) |
| worker->JoinForTesting(); |
| @@ -371,10 +407,12 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| SchedulerWorkerDelegateImpl( |
| SchedulerWorkerPoolImpl* outer, |
| + const TimeDelta& suggested_reclaim_time, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
| const PriorityQueue* shared_priority_queue, |
| int index) |
| : outer_(outer), |
| + suggested_reclaim_time_(suggested_reclaim_time), |
| re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
| single_threaded_priority_queue_(shared_priority_queue), |
| index_(index) {} |
| @@ -399,6 +437,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
| tls_current_worker.Get().Set(worker); |
| tls_current_worker_pool.Get().Set(outer_); |
| + // Workers start out idle, so this counts as an idle cycle. |
| + performed_idle_cycle_ = true; |
| + |
| ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == |
| IORestriction::ALLOWED); |
| } |
| @@ -455,6 +496,9 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
| } |
| DCHECK(sequence); |
| + // We're doing work, so this is not an idle cycle. |
| + performed_idle_cycle_ = false; |
| + |
| outer_->RemoveFromIdleWorkersStack(worker); |
| return sequence; |
| } |
| @@ -476,12 +520,21 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
| GetSleepTimeout() { |
| - return TimeDelta::Max(); |
| + return suggested_reclaim_time_; |
| } |
| bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
| SchedulerWorker* worker) { |
| - return false; |
| + // It's not an issue if num_single_threaded_runners_ is incremented after this |
|
fdoray
2016/07/08 18:58:06
|num_single_threaded_runners_|
robliao
2016/07/08 21:17:03
Done.
|
| + // because the next single-threaded task will simply pick up a new physical |
| + // thread. |
| + bool can_detach = performed_idle_cycle_ && |
|
fdoray
2016/07/08 18:58:07
const bool
robliao
2016/07/08 21:17:03
Done.
|
| + worker != outer_->PeekAtIdleWorkersStack() && |
| + !subtle::Release_Load(&num_single_threaded_runners_) && |
| + !outer_->HasJoinedForTesting(); |
|
fdoray
2016/07/08 18:58:07
Sadly this is racy:
- Thread A: SchedulerWorker::C
robliao
2016/07/08 21:17:03
Yup. This became the first test that flew in the f
fdoray
2016/07/20 14:15:43
Acknowledged.
|
| + // CanDetach is part of a SchedulerWorker's idle cycle. |
| + performed_idle_cycle_ = true; |
| + return can_detach; |
| } |
| SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| @@ -496,6 +549,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| idle_workers_stack_lock_.CreateConditionVariable()), |
| join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::NOT_SIGNALED), |
| + join_for_testing_called_(false), |
| #if DCHECK_IS_ON() |
| workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
| WaitableEvent::InitialState::NOT_SIGNALED), |
| @@ -509,6 +563,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
| bool SchedulerWorkerPoolImpl::Initialize( |
| ThreadPriority thread_priority, |
| size_t max_threads, |
| + const TimeDelta& suggested_reclaim_time, |
| const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| @@ -518,10 +573,13 @@ bool SchedulerWorkerPoolImpl::Initialize( |
| std::unique_ptr<SchedulerWorker> worker = |
| SchedulerWorker::Create( |
| thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( |
| - this, re_enqueue_sequence_callback, |
| + this, suggested_reclaim_time, |
| + re_enqueue_sequence_callback, |
| &shared_priority_queue_, static_cast<int>(i))), |
| task_tracker_, |
| - SchedulerWorker::InitialState::ALIVE); |
| + i == 0 |
| + ? SchedulerWorker::InitialState::ALIVE |
| + : SchedulerWorker::InitialState::DETACHED); |
| if (!worker) |
| break; |
| idle_workers_stack_.Push(worker.get()); |
| @@ -548,18 +606,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
| void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| - idle_workers_stack_.Push(worker); |
| + // Detachment may cause multiple attempts to add because the delegate cannot |
| + // determine who woke it up. As a result, when it wakes up, it may conclude |
| + // there's no work to be done and attempt to add itself to the idle stack |
| + // again. |
| + if (!idle_workers_stack_.Contains(worker)) |
| + idle_workers_stack_.Push(worker); |
| + |
| DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
| if (idle_workers_stack_.Size() == workers_.size()) |
| idle_workers_stack_cv_for_testing_->Broadcast(); |
| } |
| +const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { |
| + AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| + return idle_workers_stack_.Peek(); |
| +} |
| + |
| void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
| SchedulerWorker* worker) { |
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
| idle_workers_stack_.Remove(worker); |
| } |
| +bool SchedulerWorkerPoolImpl::HasJoinedForTesting() { |
| + AutoSchedulerLock auto_lock(join_for_testing_called_lock_); |
| + return join_for_testing_called_; |
| +} |
| + |
| } // namespace internal |
| } // namespace base |