Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
index 9f521e229cabb9dd7c52abcc7bc92be9d7b9aa69..cd50c95ca2634db8921e2a49e1692662767561e3 100644 |
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
@@ -9,6 +9,7 @@ |
#include <algorithm> |
#include <utility> |
+#include "base/atomicops.h" |
#include "base/bind.h" |
#include "base/bind_helpers.h" |
#include "base/lazy_instance.h" |
@@ -21,6 +22,7 @@ |
#include "base/threading/platform_thread.h" |
#include "base/threading/thread_local.h" |
#include "base/threading/thread_restrictions.h" |
+#include "base/time/time.h" |
namespace base { |
namespace internal { |
@@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
SchedulerParallelTaskRunner(const TaskTraits& traits, |
SchedulerWorkerPool* worker_pool) |
- : traits_(traits), worker_pool_(worker_pool) {} |
+ : traits_(traits), worker_pool_(worker_pool) { |
+ DCHECK(worker_pool_); |
+ } |
// TaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
@@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
SchedulerSequencedTaskRunner(const TaskTraits& traits, |
SchedulerWorkerPool* worker_pool) |
- : traits_(traits), worker_pool_(worker_pool) {} |
+ : traits_(traits), worker_pool_(worker_pool) { |
+ DCHECK(worker_pool_); |
+ } |
// SequencedTaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
@@ -113,8 +119,22 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
}; |
+// Only used in DCHECKs. |
+bool ContainsWorker( |
+ const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
+ const SchedulerWorker* worker) { |
+ auto it = std::find_if(workers.begin(), workers.end(), |
+ [worker](const std::unique_ptr<SchedulerWorker>& i) { |
+ return i.get() == worker; |
+ }); |
+ return it != workers.end(); |
+} |
+ |
+} // namespace |
+ |
// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
-class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
+class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
+ public SingleThreadTaskRunner { |
public: |
// Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
// tasks so long as |worker_pool| and |worker| are alive. |
@@ -122,10 +142,7 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
// and |worker|. |
SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
SchedulerWorkerPool* worker_pool, |
- SchedulerWorker* worker) |
- : traits_(traits), |
- worker_pool_(worker_pool), |
- worker_(worker) {} |
+ SchedulerWorker* worker); |
// SingleThreadTaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
@@ -151,7 +168,7 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
} |
private: |
- ~SchedulerSingleThreadTaskRunner() override = default; |
+ ~SchedulerSingleThreadTaskRunner() override; |
// Sequence for all Tasks posted through this TaskRunner. |
const scoped_refptr<Sequence> sequence_ = new Sequence; |
@@ -163,19 +180,6 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
}; |
-// Only used in DCHECKs. |
-bool ContainsWorker( |
- const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
- const SchedulerWorker* worker) { |
- auto it = std::find_if(workers.begin(), workers.end(), |
- [worker](const std::unique_ptr<SchedulerWorker>& i) { |
- return i.get() == worker; |
- }); |
- return it != workers.end(); |
-} |
- |
-} // namespace |
- |
class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
: public SchedulerWorker::Delegate { |
public: |
@@ -203,6 +207,14 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
TimeDelta GetSleepTimeout() override; |
bool CanDetach(SchedulerWorker* worker) override; |
+ void RegisterSingleThreadTaskRunner() { |
+ subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
+ } |
+ |
+ void UnregisterSingleThreadTaskRunner() { |
+ subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
+ } |
+ |
private: |
SchedulerWorkerPoolImpl* outer_; |
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
@@ -214,6 +226,11 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
// |single_threaded_priority_queue_|. |
bool last_sequence_is_single_threaded_ = false; |
+ // Time when GetWork() first returned nullptr. |
+ TimeTicks idle_start_time_; |
+ |
+ subtle::Atomic32 num_single_threaded_runners_ = 0; |
+ |
const int index_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
@@ -234,6 +251,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool( |
new SchedulerWorkerPoolImpl(params.name(), |
params.io_restriction(), |
+ params.suggested_reclaim_time(), |
task_tracker, delayed_task_manager)); |
if (worker_pool->Initialize(params.thread_priority(), |
params.max_threads(), |
@@ -250,6 +268,8 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
} |
void SchedulerWorkerPoolImpl::JoinForTesting() { |
+ DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) << |
+ "Workers can detach during join."; |
for (const auto& worker : workers_) |
worker->JoinForTesting(); |
@@ -257,6 +277,11 @@ void SchedulerWorkerPoolImpl::JoinForTesting() { |
join_for_testing_returned_.Signal(); |
} |
+void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() { |
+ AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
+ worker_detachment_allowed_ = false; |
+} |
+ |
scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits( |
const TaskTraits& traits, |
ExecutionMode execution_mode) { |
@@ -367,6 +392,25 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
} |
} |
+SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
+ SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
+ SchedulerWorkerPool* worker_pool, |
+ SchedulerWorker* worker) |
+ : traits_(traits), |
+ worker_pool_(worker_pool), |
+ worker_(worker) { |
+ DCHECK(worker_pool_); |
+ DCHECK(worker_); |
+ static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
+ RegisterSingleThreadTaskRunner(); |
+} |
+ |
+SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner:: |
+ ~SchedulerSingleThreadTaskRunner() { |
+ static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
+ UnregisterSingleThreadTaskRunner(); |
+} |
+ |
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
SchedulerWorkerDelegateImpl( |
SchedulerWorkerPoolImpl* outer, |
@@ -398,6 +442,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
tls_current_worker.Get().Set(worker); |
tls_current_worker_pool.Get().Set(outer_); |
+ // New threads haven't run GetWork() yet, so reset the idle_start_time_. |
+ idle_start_time_ = TimeTicks(); |
+ |
ThreadRestrictions::SetIOAllowed( |
outer_->io_restriction_ == |
SchedulerWorkerPoolParams::IORestriction::ALLOWED); |
@@ -432,6 +479,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
// 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. |
// No thread runs the Sequence inserted in step 2. |
outer_->AddToIdleWorkersStack(worker); |
+ if (idle_start_time_.is_null()) |
+ idle_start_time_ = TimeTicks::Now(); |
return nullptr; |
} |
@@ -455,6 +504,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
} |
DCHECK(sequence); |
+ idle_start_time_ = TimeTicks(); |
+ |
outer_->RemoveFromIdleWorkersStack(worker); |
return sequence; |
} |
@@ -476,21 +527,33 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
GetSleepTimeout() { |
- return TimeDelta::Max(); |
+ return outer_->suggested_reclaim_time_; |
} |
bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
SchedulerWorker* worker) { |
- return false; |
+ // It's not an issue if |num_single_threaded_runners_| is incremented after |
+ // this because the newly created TaskRunner (from which no task has run yet) |
+ // will simply run all its tasks on the next physical thread created by the |
+ // worker. |
+ const bool can_detach = |
+ !idle_start_time_.is_null() && |
+ (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ && |
+ worker != outer_->PeekAtIdleWorkersStack() && |
+ !subtle::Acquire_Load(&num_single_threaded_runners_) && |
+ outer_->CanWorkerDetachForTesting(); |
+ return can_detach; |
} |
SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
StringPiece name, |
SchedulerWorkerPoolParams::IORestriction io_restriction, |
+ const TimeDelta& suggested_reclaim_time, |
TaskTracker* task_tracker, |
DelayedTaskManager* delayed_task_manager) |
: name_(name.as_string()), |
io_restriction_(io_restriction), |
+ suggested_reclaim_time_(suggested_reclaim_time), |
idle_workers_stack_lock_(shared_priority_queue_.container_lock()), |
idle_workers_stack_cv_for_testing_( |
idle_workers_stack_lock_.CreateConditionVariable()), |
@@ -521,7 +584,9 @@ bool SchedulerWorkerPoolImpl::Initialize( |
this, re_enqueue_sequence_callback, |
&shared_priority_queue_, static_cast<int>(i))), |
task_tracker_, |
- SchedulerWorker::InitialState::ALIVE); |
+ i == 0 |
+ ? SchedulerWorker::InitialState::ALIVE |
+ : SchedulerWorker::InitialState::DETACHED); |
if (!worker) |
break; |
idle_workers_stack_.Push(worker.get()); |
@@ -548,18 +613,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
SchedulerWorker* worker) { |
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
- idle_workers_stack_.Push(worker); |
+ // Detachment may cause multiple attempts to add because the delegate cannot |
+ // determine who woke it up. As a result, when it wakes up, it may conclude |
+ // there's no work to be done and attempt to add itself to the idle stack |
+ // again. |
+ if (!idle_workers_stack_.Contains(worker)) |
+ idle_workers_stack_.Push(worker); |
+ |
DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
if (idle_workers_stack_.Size() == workers_.size()) |
idle_workers_stack_cv_for_testing_->Broadcast(); |
} |
+const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { |
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
+ return idle_workers_stack_.Peek(); |
+} |
+ |
void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
SchedulerWorker* worker) { |
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
idle_workers_stack_.Remove(worker); |
} |
+bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() { |
+ AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_); |
+ return worker_detachment_allowed_; |
+} |
+ |
} // namespace internal |
} // namespace base |