Index: base/task_scheduler/scheduler_worker_pool_impl.cc |
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc |
index 06933eb32db62386195ba3619fd71c03914346fd..954bdfab3b905e2a6653fe5db7ecaacd4247fba3 100644 |
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc |
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc |
@@ -9,6 +9,7 @@ |
#include <algorithm> |
#include <utility> |
+#include "base/atomicops.h" |
#include "base/bind.h" |
#include "base/bind_helpers.h" |
#include "base/lazy_instance.h" |
@@ -21,6 +22,7 @@ |
#include "base/threading/platform_thread.h" |
#include "base/threading/thread_local.h" |
#include "base/threading/thread_restrictions.h" |
+#include "base/time/time.h" |
namespace base { |
namespace internal { |
@@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner { |
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
SchedulerParallelTaskRunner(const TaskTraits& traits, |
SchedulerWorkerPool* worker_pool) |
- : traits_(traits), worker_pool_(worker_pool) {} |
+ : traits_(traits), worker_pool_(worker_pool) { |
+ DCHECK(worker_pool_); |
+ } |
// TaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
@@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner { |
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. |
SchedulerSequencedTaskRunner(const TaskTraits& traits, |
SchedulerWorkerPool* worker_pool) |
- : traits_(traits), worker_pool_(worker_pool) {} |
+ : traits_(traits), worker_pool_(worker_pool) { |
+ DCHECK(worker_pool_); |
+ } |
// SequencedTaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
@@ -121,11 +127,15 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
// TODO(robliao): Find a concrete way to manage the memory of |worker_pool| |
// and |worker|. |
SchedulerSingleThreadTaskRunner(const TaskTraits& traits, |
- SchedulerWorkerPool* worker_pool, |
+ SchedulerWorkerPoolImpl* worker_pool, |
SchedulerWorker* worker) |
: traits_(traits), |
worker_pool_(worker_pool), |
- worker_(worker) {} |
+ worker_(worker) { |
+ DCHECK(worker_pool_); |
+ DCHECK(worker_); |
+ worker_pool_->RegisterSingleThreadTaskRunner(worker_); |
+ } |
// SingleThreadTaskRunner: |
bool PostDelayedTask(const tracked_objects::Location& from_here, |
@@ -151,13 +161,15 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { |
} |
private: |
- ~SchedulerSingleThreadTaskRunner() override = default; |
+ ~SchedulerSingleThreadTaskRunner() override { |
+ worker_pool_->UnregisterSingleThreadTaskRunner(worker_); |
+ } |
// Sequence for all Tasks posted through this TaskRunner. |
const scoped_refptr<Sequence> sequence_ = new Sequence; |
const TaskTraits traits_; |
- SchedulerWorkerPool* const worker_pool_; |
+ SchedulerWorkerPoolImpl* const worker_pool_; |
SchedulerWorker* const worker_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); |
@@ -187,6 +199,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
// the pool name to label the underlying worker threads. |
SchedulerWorkerDelegateImpl( |
SchedulerWorkerPoolImpl* outer, |
+ const TimeDelta& suggested_reclaim_time, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
const PriorityQueue* shared_priority_queue, |
int index); |
@@ -203,8 +216,17 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
TimeDelta GetSleepTimeout() override; |
bool CanDetach(SchedulerWorker* worker) override; |
+ void RegisterSingleThreadTaskRunner() { |
+ subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1); |
+ } |
+ |
+ void UnregisterSingleThreadTaskRunner() { |
+ subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1); |
+ } |
+ |
private: |
SchedulerWorkerPoolImpl* outer_; |
+ const TimeDelta suggested_reclaim_time_; |
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_; |
// Single-threaded PriorityQueue for the worker. |
@@ -214,6 +236,8 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl |
// |single_threaded_priority_queue_|. |
bool last_sequence_is_single_threaded_ = false; |
+ subtle::Atomic32 num_single_threaded_runners_ = 0; |
+ |
const int index_; |
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); |
@@ -231,6 +255,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
ThreadPriority thread_priority, |
size_t max_threads, |
IORestriction io_restriction, |
+ const TimeDelta& suggested_reclaim_time, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
TaskTracker* task_tracker, |
DelayedTaskManager* delayed_task_manager) { |
@@ -238,6 +263,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create( |
new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker, |
delayed_task_manager)); |
if (worker_pool->Initialize(thread_priority, max_threads, |
+ suggested_reclaim_time, |
re_enqueue_sequence_callback)) { |
return worker_pool; |
} |
@@ -251,6 +277,10 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { |
} |
void SchedulerWorkerPoolImpl::JoinForTesting() { |
+ { |
+ AutoSchedulerLock auto_lock(join_for_testing_called_lock_); |
+ join_for_testing_called_ = true; |
+ } |
for (const auto& worker : workers_) |
worker->JoinForTesting(); |
@@ -368,13 +398,29 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow( |
} |
} |
+void SchedulerWorkerPoolImpl::RegisterSingleThreadTaskRunner( |
+ SchedulerWorker* worker) { |
+ auto delegate = |
+ static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()); |
+ delegate->RegisterSingleThreadTaskRunner(); |
+} |
+ |
+void SchedulerWorkerPoolImpl::UnregisterSingleThreadTaskRunner( |
+ SchedulerWorker* worker) { |
+ auto delegate = |
+ static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate()); |
+ delegate->UnregisterSingleThreadTaskRunner(); |
+} |
+ |
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
SchedulerWorkerDelegateImpl( |
SchedulerWorkerPoolImpl* outer, |
+ const TimeDelta& suggested_reclaim_time, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback, |
const PriorityQueue* shared_priority_queue, |
int index) |
: outer_(outer), |
+ suggested_reclaim_time_(suggested_reclaim_time), |
re_enqueue_sequence_callback_(re_enqueue_sequence_callback), |
single_threaded_priority_queue_(shared_priority_queue), |
index_(index) {} |
@@ -476,12 +522,14 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: |
GetSleepTimeout() { |
- return TimeDelta::Max(); |
+ return suggested_reclaim_time_; |
fdoray
2016/07/04 19:41:32
From scheduler_worker.cc:
while (...) {
scoped_
robliao
2016/07/07 17:49:16
Nice catch! Nothing at the moment. Fixed with an i
|
} |
bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach( |
SchedulerWorker* worker) { |
- return false; |
+ return worker != outer_->PeekAtIdleWorkersStack() && |
+ !subtle::Release_Load(&num_single_threaded_runners_) && |
fdoray
2016/07/04 19:41:32
Add comment explaining why it doesn't matter if |n
gab
2016/07/07 15:58:30
Why do you need Release_Load() here?
robliao
2016/07/07 17:49:16
Done.
gab
2016/07/13 18:36:31
Hmmm, I don't think this matters, idle thread acco
robliao
2016/07/13 20:19:46
In either case, it's the safest thing to do here.
gab
2016/07/13 21:07:55
I disagree. It's not safer. It's equivalent (plus
robliao
2016/07/19 22:03:47
In light of the atomic ops discussion, any strong
gab
2016/07/20 01:45:21
From the conclusion we derived, I think the inc/de
robliao
2016/07/20 19:44:00
I've my understanding is right, the atomic ops are
gab
2016/07/20 20:26:31
We have different understandings on a few things (
robliao
2016/07/20 22:18:03
data types (the whole point of atomic RMW is to ma
gab
2016/07/21 13:43:21
I'm not convinced this is true. The RMW will add a
robliao
2016/07/21 18:44:23
Digging into this deeper, the RMW will happen atom
gab
2016/07/21 21:24:24
Refcount is different, no one ever wants to just "
robliao
2016/07/22 16:44:06
Fenced the load.
|
+ !outer_->HasJoinedForTesting(); |
} |
SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
@@ -496,6 +544,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
idle_workers_stack_lock_.CreateConditionVariable()), |
join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL, |
WaitableEvent::InitialState::NOT_SIGNALED), |
+ join_for_testing_called_(false), |
#if DCHECK_IS_ON() |
workers_created_(WaitableEvent::ResetPolicy::MANUAL, |
WaitableEvent::InitialState::NOT_SIGNALED), |
@@ -509,6 +558,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( |
bool SchedulerWorkerPoolImpl::Initialize( |
ThreadPriority thread_priority, |
size_t max_threads, |
+ const TimeDelta& suggested_reclaim_time, |
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) { |
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
@@ -518,10 +568,13 @@ bool SchedulerWorkerPoolImpl::Initialize( |
std::unique_ptr<SchedulerWorker> worker = |
SchedulerWorker::Create( |
thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl( |
- this, re_enqueue_sequence_callback, |
+ this, suggested_reclaim_time, |
+ re_enqueue_sequence_callback, |
&shared_priority_queue_, static_cast<int>(i))), |
task_tracker_, |
- SchedulerWorker::InitialState::ALIVE); |
+ i == 0 |
+ ? SchedulerWorker::InitialState::ALIVE |
+ : SchedulerWorker::InitialState::DETACHED); |
if (!worker) |
break; |
idle_workers_stack_.Push(worker.get()); |
@@ -548,18 +601,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() { |
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack( |
SchedulerWorker* worker) { |
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
- idle_workers_stack_.Push(worker); |
+ // Detachment may cause multiple attempts to add because the delegate cannot |
+ // determine who woke it up. As a result, when it wakes up, it may conclude |
+ // there's no work to be done and attempt to add itself to the idle stack |
+ // again. |
+ if (!idle_workers_stack_.Contains(worker)) |
fdoray
2016/07/04 19:41:32
A detached worker is only re-created when its Wake
robliao
2016/07/07 17:49:16
Prior to this change, GetWork had two precondition
fdoray
2016/07/07 20:45:56
You're right. Thanks!
|
+ idle_workers_stack_.Push(worker); |
+ |
DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); |
if (idle_workers_stack_.Size() == workers_.size()) |
idle_workers_stack_cv_for_testing_->Broadcast(); |
} |
+const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const { |
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
+ return idle_workers_stack_.Peek(); |
+} |
+ |
void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack( |
SchedulerWorker* worker) { |
AutoSchedulerLock auto_lock(idle_workers_stack_lock_); |
idle_workers_stack_.Remove(worker); |
} |
+bool SchedulerWorkerPoolImpl::HasJoinedForTesting() { |
+ AutoSchedulerLock auto_lock(join_for_testing_called_lock_); |
+ return join_for_testing_called_; |
+} |
+ |
} // namespace internal |
} // namespace base |