Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4054)

Unified Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2116163002: Add Lazy Creation and Thread Detachment Support in the Scheduler Worker Pool (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_worker_pool_impl.cc
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc
index 06933eb32db62386195ba3619fd71c03914346fd..954bdfab3b905e2a6653fe5db7ecaacd4247fba3 100644
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc
@@ -9,6 +9,7 @@
#include <algorithm>
#include <utility>
+#include "base/atomicops.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
@@ -21,6 +22,7 @@
#include "base/threading/platform_thread.h"
#include "base/threading/thread_local.h"
#include "base/threading/thread_restrictions.h"
+#include "base/time/time.h"
namespace base {
namespace internal {
@@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner {
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerParallelTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
- : traits_(traits), worker_pool_(worker_pool) {}
+ : traits_(traits), worker_pool_(worker_pool) {
+ DCHECK(worker_pool_);
+ }
// TaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerSequencedTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
- : traits_(traits), worker_pool_(worker_pool) {}
+ : traits_(traits), worker_pool_(worker_pool) {
+ DCHECK(worker_pool_);
+ }
// SequencedTaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -121,11 +127,15 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
// TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
// and |worker|.
SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
- SchedulerWorkerPool* worker_pool,
+ SchedulerWorkerPoolImpl* worker_pool,
SchedulerWorker* worker)
: traits_(traits),
worker_pool_(worker_pool),
- worker_(worker) {}
+ worker_(worker) {
+ DCHECK(worker_pool_);
+ DCHECK(worker_);
+ worker_pool_->RegisterSingleThreadTaskRunner(worker_);
+ }
// SingleThreadTaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -151,13 +161,15 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
}
private:
- ~SchedulerSingleThreadTaskRunner() override = default;
+ ~SchedulerSingleThreadTaskRunner() override {
+ worker_pool_->UnregisterSingleThreadTaskRunner(worker_);
+ }
// Sequence for all Tasks posted through this TaskRunner.
const scoped_refptr<Sequence> sequence_ = new Sequence;
const TaskTraits traits_;
- SchedulerWorkerPool* const worker_pool_;
+ SchedulerWorkerPoolImpl* const worker_pool_;
SchedulerWorker* const worker_;
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
@@ -187,6 +199,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
// the pool name to label the underlying worker threads.
SchedulerWorkerDelegateImpl(
SchedulerWorkerPoolImpl* outer,
+ const TimeDelta& suggested_reclaim_time,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
const PriorityQueue* shared_priority_queue,
int index);
@@ -203,8 +216,17 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
TimeDelta GetSleepTimeout() override;
bool CanDetach(SchedulerWorker* worker) override;
+ void RegisterSingleThreadTaskRunner() {
+ subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1);
+ }
+
+ void UnregisterSingleThreadTaskRunner() {
+ subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1);
+ }
+
private:
SchedulerWorkerPoolImpl* outer_;
+ const TimeDelta suggested_reclaim_time_;
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
// Single-threaded PriorityQueue for the worker.
@@ -214,6 +236,8 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
// |single_threaded_priority_queue_|.
bool last_sequence_is_single_threaded_ = false;
+ subtle::Atomic32 num_single_threaded_runners_ = 0;
+
const int index_;
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
@@ -231,6 +255,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
ThreadPriority thread_priority,
size_t max_threads,
IORestriction io_restriction,
+ const TimeDelta& suggested_reclaim_time,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager) {
@@ -238,6 +263,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker,
delayed_task_manager));
if (worker_pool->Initialize(thread_priority, max_threads,
+ suggested_reclaim_time,
re_enqueue_sequence_callback)) {
return worker_pool;
}
@@ -251,6 +277,10 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
}
void SchedulerWorkerPoolImpl::JoinForTesting() {
+ {
+ AutoSchedulerLock auto_lock(join_for_testing_called_lock_);
+ join_for_testing_called_ = true;
+ }
for (const auto& worker : workers_)
worker->JoinForTesting();
@@ -368,13 +398,29 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
}
}
+void SchedulerWorkerPoolImpl::RegisterSingleThreadTaskRunner(
+ SchedulerWorker* worker) {
+ auto delegate =
+ static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate());
+ delegate->RegisterSingleThreadTaskRunner();
+}
+
+void SchedulerWorkerPoolImpl::UnregisterSingleThreadTaskRunner(
+ SchedulerWorker* worker) {
+ auto delegate =
+ static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate());
+ delegate->UnregisterSingleThreadTaskRunner();
+}
+
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
SchedulerWorkerDelegateImpl(
SchedulerWorkerPoolImpl* outer,
+ const TimeDelta& suggested_reclaim_time,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
const PriorityQueue* shared_priority_queue,
int index)
: outer_(outer),
+ suggested_reclaim_time_(suggested_reclaim_time),
re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
single_threaded_priority_queue_(shared_priority_queue),
index_(index) {}
@@ -476,12 +522,14 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
GetSleepTimeout() {
- return TimeDelta::Max();
+ return suggested_reclaim_time_;
fdoray 2016/07/04 19:41:32 From scheduler_worker.cc: while (...) { scoped_
robliao 2016/07/07 17:49:16 Nice catch! Nothing at the moment. Fixed with an i
}
bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
SchedulerWorker* worker) {
- return false;
+ return worker != outer_->PeekAtIdleWorkersStack() &&
+ !subtle::Release_Load(&num_single_threaded_runners_) &&
fdoray 2016/07/04 19:41:32 Add comment explaining why it doesn't matter if |n
gab 2016/07/07 15:58:30 Why do you need Release_Load() here?
robliao 2016/07/07 17:49:16 Done.
gab 2016/07/13 18:36:31 Hmmm, I don't think this matters, idle thread acco
robliao 2016/07/13 20:19:46 In either case, it's the safest thing to do here.
gab 2016/07/13 21:07:55 I disagree. It's not safer. It's equivalent (plus
robliao 2016/07/19 22:03:47 In light of the atomic ops discussion, any strong
gab 2016/07/20 01:45:21 From the conclusion we derived, I think the inc/de
robliao 2016/07/20 19:44:00 I've my understanding is right, the atomic ops are
gab 2016/07/20 20:26:31 We have different understandings on a few things (
robliao 2016/07/20 22:18:03 data types (the whole point of atomic RMW is to ma
gab 2016/07/21 13:43:21 I'm not convinced this is true. The RMW will add a
robliao 2016/07/21 18:44:23 Digging into this deeper, the RMW will happen atom
gab 2016/07/21 21:24:24 Refcount is different, no one ever wants to just "
robliao 2016/07/22 16:44:06 Fenced the load.
+ !outer_->HasJoinedForTesting();
}
SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
@@ -496,6 +544,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
idle_workers_stack_lock_.CreateConditionVariable()),
join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
+ join_for_testing_called_(false),
#if DCHECK_IS_ON()
workers_created_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
@@ -509,6 +558,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
bool SchedulerWorkerPoolImpl::Initialize(
ThreadPriority thread_priority,
size_t max_threads,
+ const TimeDelta& suggested_reclaim_time,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
@@ -518,10 +568,13 @@ bool SchedulerWorkerPoolImpl::Initialize(
std::unique_ptr<SchedulerWorker> worker =
SchedulerWorker::Create(
thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl(
- this, re_enqueue_sequence_callback,
+ this, suggested_reclaim_time,
+ re_enqueue_sequence_callback,
&shared_priority_queue_, static_cast<int>(i))),
task_tracker_,
- SchedulerWorker::InitialState::ALIVE);
+ i == 0
+ ? SchedulerWorker::InitialState::ALIVE
+ : SchedulerWorker::InitialState::DETACHED);
if (!worker)
break;
idle_workers_stack_.Push(worker.get());
@@ -548,18 +601,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
- idle_workers_stack_.Push(worker);
+ // Detachment may cause multiple attempts to add because the delegate cannot
+ // determine who woke it up. As a result, when it wakes up, it may conclude
+ // there's no work to be done and attempt to add itself to the idle stack
+ // again.
+ if (!idle_workers_stack_.Contains(worker))
fdoray 2016/07/04 19:41:32 A detached worker is only re-created when its Wake
robliao 2016/07/07 17:49:16 Prior to this change, GetWork had two precondition
fdoray 2016/07/07 20:45:56 You're right. Thanks!
+ idle_workers_stack_.Push(worker);
+
DCHECK_LE(idle_workers_stack_.Size(), workers_.size());
if (idle_workers_stack_.Size() == workers_.size())
idle_workers_stack_cv_for_testing_->Broadcast();
}
+const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const {
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
+ return idle_workers_stack_.Peek();
+}
+
void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack(
SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
idle_workers_stack_.Remove(worker);
}
+bool SchedulerWorkerPoolImpl::HasJoinedForTesting() {
+ AutoSchedulerLock auto_lock(join_for_testing_called_lock_);
+ return join_for_testing_called_;
+}
+
} // namespace internal
} // namespace base

Powered by Google App Engine
This is Rietveld 408576698