| Index: base/task_scheduler/scheduler_worker_pool_impl.cc
|
| diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc
|
| index 9f521e229cabb9dd7c52abcc7bc92be9d7b9aa69..cd50c95ca2634db8921e2a49e1692662767561e3 100644
|
| --- a/base/task_scheduler/scheduler_worker_pool_impl.cc
|
| +++ b/base/task_scheduler/scheduler_worker_pool_impl.cc
|
| @@ -9,6 +9,7 @@
|
| #include <algorithm>
|
| #include <utility>
|
|
|
| +#include "base/atomicops.h"
|
| #include "base/bind.h"
|
| #include "base/bind_helpers.h"
|
| #include "base/lazy_instance.h"
|
| @@ -21,6 +22,7 @@
|
| #include "base/threading/platform_thread.h"
|
| #include "base/threading/thread_local.h"
|
| #include "base/threading/thread_restrictions.h"
|
| +#include "base/time/time.h"
|
|
|
| namespace base {
|
| namespace internal {
|
| @@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner {
|
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
|
| SchedulerParallelTaskRunner(const TaskTraits& traits,
|
| SchedulerWorkerPool* worker_pool)
|
| - : traits_(traits), worker_pool_(worker_pool) {}
|
| + : traits_(traits), worker_pool_(worker_pool) {
|
| + DCHECK(worker_pool_);
|
| + }
|
|
|
| // TaskRunner:
|
| bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| @@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
|
| // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
|
| SchedulerSequencedTaskRunner(const TaskTraits& traits,
|
| SchedulerWorkerPool* worker_pool)
|
| - : traits_(traits), worker_pool_(worker_pool) {}
|
| + : traits_(traits), worker_pool_(worker_pool) {
|
| + DCHECK(worker_pool_);
|
| + }
|
|
|
| // SequencedTaskRunner:
|
| bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| @@ -113,8 +119,22 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
|
| DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
|
| };
|
|
|
| +// Only used in DCHECKs.
|
| +bool ContainsWorker(
|
| + const std::vector<std::unique_ptr<SchedulerWorker>>& workers,
|
| + const SchedulerWorker* worker) {
|
| + auto it = std::find_if(workers.begin(), workers.end(),
|
| + [worker](const std::unique_ptr<SchedulerWorker>& i) {
|
| + return i.get() == worker;
|
| + });
|
| + return it != workers.end();
|
| +}
|
| +
|
| +} // namespace
|
| +
|
| // A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
|
| -class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
|
| +class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner :
|
| + public SingleThreadTaskRunner {
|
| public:
|
| // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
|
| // tasks so long as |worker_pool| and |worker| are alive.
|
| @@ -122,10 +142,7 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
|
| // and |worker|.
|
| SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
|
| SchedulerWorkerPool* worker_pool,
|
| - SchedulerWorker* worker)
|
| - : traits_(traits),
|
| - worker_pool_(worker_pool),
|
| - worker_(worker) {}
|
| + SchedulerWorker* worker);
|
|
|
| // SingleThreadTaskRunner:
|
| bool PostDelayedTask(const tracked_objects::Location& from_here,
|
| @@ -151,7 +168,7 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
|
| }
|
|
|
| private:
|
| - ~SchedulerSingleThreadTaskRunner() override = default;
|
| + ~SchedulerSingleThreadTaskRunner() override;
|
|
|
| // Sequence for all Tasks posted through this TaskRunner.
|
| const scoped_refptr<Sequence> sequence_ = new Sequence;
|
| @@ -163,19 +180,6 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
|
| DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
|
| };
|
|
|
| -// Only used in DCHECKs.
|
| -bool ContainsWorker(
|
| - const std::vector<std::unique_ptr<SchedulerWorker>>& workers,
|
| - const SchedulerWorker* worker) {
|
| - auto it = std::find_if(workers.begin(), workers.end(),
|
| - [worker](const std::unique_ptr<SchedulerWorker>& i) {
|
| - return i.get() == worker;
|
| - });
|
| - return it != workers.end();
|
| -}
|
| -
|
| -} // namespace
|
| -
|
| class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
|
| : public SchedulerWorker::Delegate {
|
| public:
|
| @@ -203,6 +207,14 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
|
| TimeDelta GetSleepTimeout() override;
|
| bool CanDetach(SchedulerWorker* worker) override;
|
|
|
| + void RegisterSingleThreadTaskRunner() {
|
| + subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, 1);
|
| + }
|
| +
|
| + void UnregisterSingleThreadTaskRunner() {
|
| + subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, -1);
|
| + }
|
| +
|
| private:
|
| SchedulerWorkerPoolImpl* outer_;
|
| const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
|
| @@ -214,6 +226,11 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
|
| // |single_threaded_priority_queue_|.
|
| bool last_sequence_is_single_threaded_ = false;
|
|
|
| + // Time when GetWork() first returned nullptr.
|
| + TimeTicks idle_start_time_;
|
| +
|
| + subtle::Atomic32 num_single_threaded_runners_ = 0;
|
| +
|
| const int index_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
|
| @@ -234,6 +251,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
|
| std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool(
|
| new SchedulerWorkerPoolImpl(params.name(),
|
| params.io_restriction(),
|
| + params.suggested_reclaim_time(),
|
| task_tracker, delayed_task_manager));
|
| if (worker_pool->Initialize(params.thread_priority(),
|
| params.max_threads(),
|
| @@ -250,6 +268,8 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
|
| }
|
|
|
| void SchedulerWorkerPoolImpl::JoinForTesting() {
|
| + DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) <<
|
| + "Workers can detach during join.";
|
| for (const auto& worker : workers_)
|
| worker->JoinForTesting();
|
|
|
| @@ -257,6 +277,11 @@ void SchedulerWorkerPoolImpl::JoinForTesting() {
|
| join_for_testing_returned_.Signal();
|
| }
|
|
|
| +void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() {
|
| + AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_);
|
| + worker_detachment_allowed_ = false;
|
| +}
|
| +
|
| scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
|
| const TaskTraits& traits,
|
| ExecutionMode execution_mode) {
|
| @@ -367,6 +392,25 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
|
| }
|
| }
|
|
|
| +SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
|
| + SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
|
| + SchedulerWorkerPool* worker_pool,
|
| + SchedulerWorker* worker)
|
| + : traits_(traits),
|
| + worker_pool_(worker_pool),
|
| + worker_(worker) {
|
| + DCHECK(worker_pool_);
|
| + DCHECK(worker_);
|
| + static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
|
| + RegisterSingleThreadTaskRunner();
|
| +}
|
| +
|
| +SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
|
| + ~SchedulerSingleThreadTaskRunner() {
|
| + static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
|
| + UnregisterSingleThreadTaskRunner();
|
| +}
|
| +
|
| SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
|
| SchedulerWorkerDelegateImpl(
|
| SchedulerWorkerPoolImpl* outer,
|
| @@ -398,6 +442,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
|
| tls_current_worker.Get().Set(worker);
|
| tls_current_worker_pool.Get().Set(outer_);
|
|
|
| + // New threads haven't run GetWork() yet, so reset the idle_start_time_.
|
| + idle_start_time_ = TimeTicks();
|
| +
|
| ThreadRestrictions::SetIOAllowed(
|
| outer_->io_restriction_ ==
|
| SchedulerWorkerPoolParams::IORestriction::ALLOWED);
|
| @@ -432,6 +479,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
|
| // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep.
|
| // No thread runs the Sequence inserted in step 2.
|
| outer_->AddToIdleWorkersStack(worker);
|
| + if (idle_start_time_.is_null())
|
| + idle_start_time_ = TimeTicks::Now();
|
| return nullptr;
|
| }
|
|
|
| @@ -455,6 +504,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
|
| }
|
| DCHECK(sequence);
|
|
|
| + idle_start_time_ = TimeTicks();
|
| +
|
| outer_->RemoveFromIdleWorkersStack(worker);
|
| return sequence;
|
| }
|
| @@ -476,21 +527,33 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
|
|
|
| TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
|
| GetSleepTimeout() {
|
| - return TimeDelta::Max();
|
| + return outer_->suggested_reclaim_time_;
|
| }
|
|
|
| bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
|
| SchedulerWorker* worker) {
|
| - return false;
|
| + // It's not an issue if |num_single_threaded_runners_| is incremented after
|
| + // this because the newly created TaskRunner (from which no task has run yet)
|
| + // will simply run all its tasks on the next physical thread created by the
|
| + // worker.
|
| + const bool can_detach =
|
| + !idle_start_time_.is_null() &&
|
| + (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ &&
|
| + worker != outer_->PeekAtIdleWorkersStack() &&
|
| + !subtle::Acquire_Load(&num_single_threaded_runners_) &&
|
| + outer_->CanWorkerDetachForTesting();
|
| + return can_detach;
|
| }
|
|
|
| SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
|
| StringPiece name,
|
| SchedulerWorkerPoolParams::IORestriction io_restriction,
|
| + const TimeDelta& suggested_reclaim_time,
|
| TaskTracker* task_tracker,
|
| DelayedTaskManager* delayed_task_manager)
|
| : name_(name.as_string()),
|
| io_restriction_(io_restriction),
|
| + suggested_reclaim_time_(suggested_reclaim_time),
|
| idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
|
| idle_workers_stack_cv_for_testing_(
|
| idle_workers_stack_lock_.CreateConditionVariable()),
|
| @@ -521,7 +584,9 @@ bool SchedulerWorkerPoolImpl::Initialize(
|
| this, re_enqueue_sequence_callback,
|
| &shared_priority_queue_, static_cast<int>(i))),
|
| task_tracker_,
|
| - SchedulerWorker::InitialState::ALIVE);
|
| + i == 0
|
| + ? SchedulerWorker::InitialState::ALIVE
|
| + : SchedulerWorker::InitialState::DETACHED);
|
| if (!worker)
|
| break;
|
| idle_workers_stack_.Push(worker.get());
|
| @@ -548,18 +613,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
|
| void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
|
| SchedulerWorker* worker) {
|
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
|
| - idle_workers_stack_.Push(worker);
|
| + // Detachment may cause multiple attempts to add because the delegate cannot
|
| + // determine who woke it up. As a result, when it wakes up, it may conclude
|
| + // there's no work to be done and attempt to add itself to the idle stack
|
| + // again.
|
| + if (!idle_workers_stack_.Contains(worker))
|
| + idle_workers_stack_.Push(worker);
|
| +
|
| DCHECK_LE(idle_workers_stack_.Size(), workers_.size());
|
|
|
| if (idle_workers_stack_.Size() == workers_.size())
|
| idle_workers_stack_cv_for_testing_->Broadcast();
|
| }
|
|
|
| +const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const {
|
| + AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
|
| + return idle_workers_stack_.Peek();
|
| +}
|
| +
|
| void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack(
|
| SchedulerWorker* worker) {
|
| AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
|
| idle_workers_stack_.Remove(worker);
|
| }
|
|
|
| +bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() {
|
| + AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_);
|
| + return worker_detachment_allowed_;
|
| +}
|
| +
|
| } // namespace internal
|
| } // namespace base
|
|
|