Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
index 06933eb32db62386195ba3619fd71c03914346fd..677e53358866f5787b549eaab00bcf8467c443a7 100644 |
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
@@ -9,6 +9,7 @@ |
#include <algorithm> |
#include <utility> |
+#include "base/atomicops.h" |
#include "base/bind.h" |
#include "base/bind_helpers.h" |
#include "base/lazy_instance.h" |
@@ -21,6 +22,7 @@ |
#include "base/threading/platform_thread.h" |
#include "base/threading/thread_local.h" |
#include "base/threading/thread_restrictions.h" |
+#include "base/time/time.h" |
namespace base { |
namespace internal { |
@@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
SchedulerParallelTaskRunner(const TaskTraits& traits, |
SchedulerWorkerPool* worker_pool) |
- : traits_(traits), worker_pool_(worker_pool) {} |
+ : traits_(traits), worker_pool_(worker_pool) { |
+ DCHECK(worker_pool_); |
+ } |
// TaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
@@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
SchedulerSequencedTaskRunner(const TaskTraits& traits, |
SchedulerWorkerPool* worker_pool) |
- : traits_(traits), worker_pool_(worker_pool) {} |
+ : traits_(traits), worker_pool_(worker_pool) { |
+ DCHECK(worker_pool_); |
+ } |
// SequencedTaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
@@ -113,56 +119,6 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); |
}; |
-// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
-class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
- public: |
- // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
- // tasks so long as |worker_pool| and |worker| are alive. |
- // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
- // and |worker|. |
- SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
- SchedulerWorkerPool* worker_pool, |
- SchedulerWorker* worker) |
- : traits_(traits), |
- worker_pool_(worker_pool), |
- worker_(worker) {} |
- |
- // SingleThreadTaskRunner: |
- bool PostDelayedTask(const tracked_objects::Location& from_here, |
- const Closure& closure, |
- TimeDelta delay) override { |
- std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
- task->single_thread_task_runner_ref = this; |
- |
- // Post the task to be executed by |worker_| as part of |sequence_|. |
- return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
- worker_); |
- } |
- |
- bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
- const Closure& closure, |
- base::TimeDelta delay) override { |
- // Tasks are never nested within the task scheduler. |
- return PostDelayedTask(from_here, closure, delay); |
- } |
- |
- bool RunsTasksOnCurrentThread() const override { |
- return tls_current_worker.Get().Get() == worker_; |
- } |
- |
- private: |
- ~SchedulerSingleThreadTaskRunner() override = default; |
- |
- // Sequence for all Tasks posted through this TaskRunner. |
- const scoped_refptr<Sequence> sequence_ = new Sequence; |
- |
- const TaskTraits traits_; |
- SchedulerWorkerPool* const worker_pool_; |
- SchedulerWorker* const worker_; |
- |
- DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
-}; |
- |
// Only used in DCHECKs. |
bool ContainsWorker( |
const std::vector<std::unique_ptr<SchedulerWorker>>& workers, |
@@ -187,6 +143,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
// the pool name to label the underlying worker threads. |
SchedulerWorkerDelegateImpl( |
SchedulerWorkerPoolImpl* outer, |
+ const TimeDelta& suggested_reclaim_time, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
const PriorityQueue* shared_priority_queue, |
int index); |
@@ -203,8 +160,17 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
TimeDelta GetSleepTimeout() override; |
bool CanDetach(SchedulerWorker* worker) override; |
+ void RegisterSingleThreadTaskRunner() { |
+ subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
+ } |
+ |
+ void UnregisterSingleThreadTaskRunner() { |
+ subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
+ } |
+ |
private: |
SchedulerWorkerPoolImpl* outer_; |
+ const TimeDelta suggested_reclaim_time_; |
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
// Single-threaded PriorityQueue for the worker. |
@@ -214,11 +180,75 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
// |single_threaded_priority_queue_|. |
bool last_sequence_is_single_threaded_ = false; |
+ // True if the worker performed and idle cycle. Workers start out idle. |
fdoray
2016/07/08 18:58:06
an* idle cycle
robliao
2016/07/08 21:17:03
Done.
|
+ bool performed_idle_cycle_ = true; |
+ |
+ subtle::Atomic32 num_single_threaded_runners_ = 0; |
+ |
const int index_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
}; |
+// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode. |
+class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner : |
+ public SingleThreadTaskRunner { |
+ public: |
+ // Constructs a SchedulerSingleThreadTaskRunner which can be used to post |
+ // tasks so long as |worker_pool| and |worker| are alive. |
+ // TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
+ // and |worker|. |
+ SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
+ SchedulerWorkerPoolImpl* worker_pool, |
+ SchedulerWorker* worker) |
+ : traits_(traits), |
+ worker_pool_(worker_pool), |
+ worker_(worker) { |
+ DCHECK(worker_pool_); |
+ DCHECK(worker_); |
+ static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
+ RegisterSingleThreadTaskRunner(); |
+ } |
+ |
+ // SingleThreadTaskRunner: |
+ bool PostDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& closure, |
+ TimeDelta delay) override { |
+ std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay)); |
+ task->single_thread_task_runner_ref = this; |
+ |
+ // Post the task to be executed by |worker_| as part of |sequence_|. |
+ return worker_pool_->PostTaskWithSequence(std::move(task), sequence_, |
+ worker_); |
+ } |
+ |
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, |
+ const Closure& closure, |
+ base::TimeDelta delay) override { |
+ // Tasks are never nested within the task scheduler. |
+ return PostDelayedTask(from_here, closure, delay); |
+ } |
+ |
+ bool RunsTasksOnCurrentThread() const override { |
+ return tls_current_worker.Get().Get() == worker_; |
+ } |
+ |
+ private: |
+ ~SchedulerSingleThreadTaskRunner() override { |
+ static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())-> |
+ UnregisterSingleThreadTaskRunner(); |
+ } |
+ |
+ // Sequence for all Tasks posted through this TaskRunner. |
+ const scoped_refptr<Sequence> sequence_ = new Sequence; |
+ |
+ const TaskTraits traits_; |
+ SchedulerWorkerPoolImpl* const worker_pool_; |
+ SchedulerWorker* const worker_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
+}; |
+ |
SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { |
// SchedulerWorkerPool should never be deleted in production unless its |
// initialization failed. |
@@ -231,6 +261,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
ThreadPriority thread_priority, |
size_t max_threads, |
IORestriction io_restriction, |
+ const TimeDelta& suggested_reclaim_time, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
TaskTracker* task_tracker, |
DelayedTaskManager* delayed_task_manager) { |
@@ -238,6 +269,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker, |
delayed_task_manager)); |
if (worker_pool->Initialize(thread_priority, max_threads, |
+ suggested_reclaim_time, |
re_enqueue_sequence_callback)) { |
return worker_pool; |
} |
@@ -251,6 +283,10 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
} |
void SchedulerWorkerPoolImpl::JoinForTesting() { |
+ { |
+ AutoSchedulerLock auto_lock(join_for_testing_called_lock_); |
+ join_for_testing_called_ = true; |
+ } |
for (const auto& worker : workers_) |
worker->JoinForTesting(); |
@@ -371,10 +407,12 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
SchedulerWorkerDelegateImpl( |
SchedulerWorkerPoolImpl* outer, |
+ const TimeDelta& suggested_reclaim_time, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
const PriorityQueue* shared_priority_queue, |
int index) |
: outer_(outer), |
+ suggested_reclaim_time_(suggested_reclaim_time), |
re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
single_threaded_priority_queue_(shared_priority_queue), |
index_(index) {} |
@@ -399,6 +437,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( |
tls_current_worker.Get().Set(worker); |
tls_current_worker_pool.Get().Set(outer_); |
+ // Workers start out idle, so this counts as an idle cycle. |
+ performed_idle_cycle_ = true; |
+ |
ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ == |
IORestriction::ALLOWED); |
} |
@@ -455,6 +496,9 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( |
} |
DCHECK(sequence); |
+ // We're doing work, so this is not an idle cycle. |
+ performed_idle_cycle_ = false; |
+ |
outer_->RemoveFromIdleWorkersStack(worker); |
return sequence; |
} |
@@ -476,12 +520,21 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
GetSleepTimeout() { |
- return TimeDelta::Max(); |
+ return suggested_reclaim_time_; |
} |
bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
SchedulerWorker* worker) { |
- return false; |
+ // It's not an issue if num_single_threaded_runners_ is incremented after this |
fdoray
2016/07/08 18:58:06
|num_single_threaded_runners_|
robliao
2016/07/08 21:17:03
Done.
|
+ // because the next single-threaded task will simply pick up a new physical |
+ // thread. |
+ bool can_detach = performed_idle_cycle_ && |
fdoray
2016/07/08 18:58:07
const bool
robliao
2016/07/08 21:17:03
Done.
|
+ worker != outer_->PeekAtIdleWorkersStack() && |
+ !subtle::Release_Load(&num_single_threaded_runners_) && |
+ !outer_->HasJoinedForTesting(); |
fdoray
2016/07/08 18:58:07
Sadly this is racy:
- Thread A: SchedulerWorker::C
robliao
2016/07/08 21:17:03
Yup. This became the first test that flew in the f
fdoray
2016/07/20 14:15:43
Acknowledged.
|
+ // CanDetach is part of a SchedulerWorker's idle cycle. |
+ performed_idle_cycle_ = true; |
+ return can_detach; |
} |
SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
@@ -496,6 +549,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
idle_workers_stack_lock_.CreateConditionVariable()), |
join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
WaitableEvent::InitialState::NOT_SIGNALED), |
+ join_for_testing_called_(false), |
#if DCHECK_IS_ON() |
workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
WaitableEvent::InitialState::NOT_SIGNALED), |
@@ -509,6 +563,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
bool SchedulerWorkerPoolImpl::Initialize( |
ThreadPriority thread_priority, |
size_t max_threads, |
+ const TimeDelta& suggested_reclaim_time, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
@@ -518,10 +573,13 @@ bool SchedulerWorkerPoolImpl::Initialize( |
std::unique_ptr<SchedulerWorker> worker = |
SchedulerWorker::Create( |
thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( |
- this, re_enqueue_sequence_callback, |
+ this, suggested_reclaim_time, |
+ re_enqueue_sequence_callback, |
&shared_priority_queue_, static_cast<int>(i))), |
task_tracker_, |
- SchedulerWorker::InitialState::ALIVE); |
+ i == 0 |
+ ? SchedulerWorker::InitialState::ALIVE |
+ : SchedulerWorker::InitialState::DETACHED); |
if (!worker) |
break; |
idle_workers_stack_.Push(worker.get()); |
@@ -548,18 +606,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
SchedulerWorker* worker) { |
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
- idle_workers_stack_.Push(worker); |
+ // Detachment may cause multiple attempts to add because the delegate cannot |
+ // determine who woke it up. As a result, when it wakes up, it may conclude |
+ // there's no work to be done and attempt to add itself to the idle stack |
+ // again. |
+ if (!idle_workers_stack_.Contains(worker)) |
+ idle_workers_stack_.Push(worker); |
+ |
DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
if (idle_workers_stack_.Size() == workers_.size()) |
idle_workers_stack_cv_for_testing_->Broadcast(); |
} |
+const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { |
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
+ return idle_workers_stack_.Peek(); |
+} |
+ |
void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
SchedulerWorker* worker) { |
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
idle_workers_stack_.Remove(worker); |
} |
+bool SchedulerWorkerPoolImpl::HasJoinedForTesting() { |
+ AutoSchedulerLock auto_lock(join_for_testing_called_lock_); |
+ return join_for_testing_called_; |
+} |
+ |
} // namespace internal |
} // namespace base |