Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(355)

Unified Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2116163002: Add Lazy Creation and Thread Detachment Support in the Scheduler Worker Pool (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: CR Feedback Continuation Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_worker_pool_impl.cc
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc
index 9f521e229cabb9dd7c52abcc7bc92be9d7b9aa69..17529abed518d734faac2d697c8218d19bb69057 100644
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc
@@ -9,6 +9,7 @@
#include <algorithm>
#include <utility>
+#include "base/atomicops.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
@@ -21,6 +22,7 @@
#include "base/threading/platform_thread.h"
#include "base/threading/thread_local.h"
#include "base/threading/thread_restrictions.h"
+#include "base/time/time.h"
namespace base {
namespace internal {
@@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner {
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerParallelTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
- : traits_(traits), worker_pool_(worker_pool) {}
+ : traits_(traits), worker_pool_(worker_pool) {
+ DCHECK(worker_pool_);
+ }
// TaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerSequencedTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
- : traits_(traits), worker_pool_(worker_pool) {}
+ : traits_(traits), worker_pool_(worker_pool) {
+ DCHECK(worker_pool_);
+ }
// SequencedTaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -113,19 +119,30 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
};
+// Only used in DCHECKs.
+bool ContainsWorker(
+ const std::vector<std::unique_ptr<SchedulerWorker>>& workers,
+ const SchedulerWorker* worker) {
+ auto it = std::find_if(workers.begin(), workers.end(),
+ [worker](const std::unique_ptr<SchedulerWorker>& i) {
+ return i.get() == worker;
+ });
+ return it != workers.end();
+}
+
+} // namespace
+
// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
-class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
+class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner :
+ public SingleThreadTaskRunner {
public:
// Constructs a SchedulerSingleThreadTaskRunner which can be used to post
// tasks so long as |worker_pool| and |worker| are alive.
// TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
// and |worker|.
SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
- SchedulerWorkerPool* worker_pool,
- SchedulerWorker* worker)
- : traits_(traits),
- worker_pool_(worker_pool),
- worker_(worker) {}
+ SchedulerWorkerPoolImpl* worker_pool,
fdoray 2016/07/20 14:15:43 Can stay SchedulerWorkerPool*
robliao 2016/07/20 19:44:01 Done.
+ SchedulerWorker* worker);
// SingleThreadTaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -151,31 +168,18 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
}
private:
- ~SchedulerSingleThreadTaskRunner() override = default;
+ ~SchedulerSingleThreadTaskRunner() override;
// Sequence for all Tasks posted through this TaskRunner.
const scoped_refptr<Sequence> sequence_ = new Sequence;
const TaskTraits traits_;
- SchedulerWorkerPool* const worker_pool_;
+ SchedulerWorkerPoolImpl* const worker_pool_;
SchedulerWorker* const worker_;
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
};
-// Only used in DCHECKs.
-bool ContainsWorker(
- const std::vector<std::unique_ptr<SchedulerWorker>>& workers,
- const SchedulerWorker* worker) {
- auto it = std::find_if(workers.begin(), workers.end(),
- [worker](const std::unique_ptr<SchedulerWorker>& i) {
- return i.get() == worker;
- });
- return it != workers.end();
-}
-
-} // namespace
-
class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
: public SchedulerWorker::Delegate {
public:
@@ -203,6 +207,14 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
TimeDelta GetSleepTimeout() override;
bool CanDetach(SchedulerWorker* worker) override;
+ void RegisterSingleThreadTaskRunner() {
+ subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1);
+ }
+
+ void UnregisterSingleThreadTaskRunner() {
+ subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1);
+ }
+
private:
SchedulerWorkerPoolImpl* outer_;
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
@@ -214,6 +226,11 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
// |single_threaded_priority_queue_|.
bool last_sequence_is_single_threaded_ = false;
+ // True if the worker performed an idle cycle. Workers start out idle.
+ bool performed_idle_cycle_ = true;
+
+ subtle::Atomic32 num_single_threaded_runners_ = 0;
+
const int index_;
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
@@ -234,6 +251,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool(
new SchedulerWorkerPoolImpl(params.name(),
params.io_restriction(),
+ params.suggested_reclaim_time(),
task_tracker, delayed_task_manager));
if (worker_pool->Initialize(params.thread_priority(),
params.max_threads(),
@@ -250,6 +268,8 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
}
void SchedulerWorkerPoolImpl::JoinForTesting() {
+ DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) <<
+ "Workers can detach during join.";
for (const auto& worker : workers_)
worker->JoinForTesting();
@@ -257,6 +277,11 @@ void SchedulerWorkerPoolImpl::JoinForTesting() {
join_for_testing_returned_.Signal();
}
+void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() {
+ AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_);
+ worker_detachment_allowed_ = false;
+}
+
scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
const TaskTraits& traits,
ExecutionMode execution_mode) {
@@ -367,6 +392,25 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
}
}
+SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
+ SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
+ SchedulerWorkerPoolImpl* worker_pool,
+ SchedulerWorker* worker)
+ : traits_(traits),
+ worker_pool_(worker_pool),
+ worker_(worker) {
+ DCHECK(worker_pool_);
+ DCHECK(worker_);
+ static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
+ RegisterSingleThreadTaskRunner();
+}
+
+SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
+ ~SchedulerSingleThreadTaskRunner() {
+ static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
+ UnregisterSingleThreadTaskRunner();
+}
+
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
SchedulerWorkerDelegateImpl(
SchedulerWorkerPoolImpl* outer,
@@ -398,6 +442,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
tls_current_worker.Get().Set(worker);
tls_current_worker_pool.Get().Set(outer_);
+ // Workers start out idle, so this counts as an idle cycle.
+ performed_idle_cycle_ = true;
gab 2016/07/20 01:45:21 So if we create/wake one too many thread (i.e. 2 t
robliao 2016/07/20 19:44:01 Yes, if we wake one too many threads with no work,
gab 2016/07/20 20:26:31 It's not necessarily a bug. It can today under a s
robliao 2016/07/20 22:18:03 Fair enough. I've moved this to an idle_start_time
gab 2016/07/21 13:43:21 I like the new version very much, easier to read a
+
ThreadRestrictions::SetIOAllowed(
outer_->io_restriction_ ==
SchedulerWorkerPoolParams::IORestriction::ALLOWED);
@@ -455,6 +502,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
}
DCHECK(sequence);
+ performed_idle_cycle_ = false;
+
outer_->RemoveFromIdleWorkersStack(worker);
return sequence;
}
@@ -476,26 +525,43 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
GetSleepTimeout() {
- return TimeDelta::Max();
+ return outer_->suggested_reclaim_time_;
}
bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
SchedulerWorker* worker) {
- return false;
+ // It's not an issue if |num_single_threaded_runners_| is incremented after
fdoray 2016/07/20 14:15:43 [...] after this because the newly created TaskRun
robliao 2016/07/20 19:44:00 Done.
+ // this because the next single-threaded task will simply pick up a new
+ // physical thread.
+ const bool can_detach =
+ performed_idle_cycle_ &&
+ worker != outer_->PeekAtIdleWorkersStack() &&
+ !subtle::Release_Load(&num_single_threaded_runners_) &&
+ outer_->CanWorkerDetachForTesting();
+ // Mark that we've performed an idle cycle here. This helps distinguish
+ // between a thread that had GetWork() return nullptr and a thread that woke
+ // up and received nullptr again from GetWork(). We can't simply track if
+ // we're idle because it would make detachment happen too early since
+ // the worker calls CanDetach() immediately after GetWork() returns nullptr.
+ performed_idle_cycle_ = true;
+ return can_detach;
}
SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
StringPiece name,
SchedulerWorkerPoolParams::IORestriction io_restriction,
+ const TimeDelta& suggested_reclaim_time,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager)
: name_(name.as_string()),
io_restriction_(io_restriction),
+ suggested_reclaim_time_(suggested_reclaim_time),
idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
idle_workers_stack_cv_for_testing_(
idle_workers_stack_lock_.CreateConditionVariable()),
join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
+ worker_detachment_allowed_(true),
gab 2016/07/20 01:45:21 (bool should be replaced by flag as mentioned in p
robliao 2016/07/20 19:44:00 Moved initialization.
#if DCHECK_IS_ON()
workers_created_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
@@ -521,7 +587,9 @@ bool SchedulerWorkerPoolImpl::Initialize(
this, re_enqueue_sequence_callback,
&shared_priority_queue_, static_cast<int>(i))),
task_tracker_,
- SchedulerWorker::InitialState::ALIVE);
+ i == 0
+ ? SchedulerWorker::InitialState::ALIVE
+ : SchedulerWorker::InitialState::DETACHED);
if (!worker)
break;
idle_workers_stack_.Push(worker.get());
@@ -548,18 +616,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
- idle_workers_stack_.Push(worker);
+ // Detachment may cause multiple attempts to add because the delegate cannot
+ // determine who woke it up. As a result, when it wakes up, it may conclude
+ // there's no work to be done and attempt to add itself to the idle stack
+ // again.
+ if (!idle_workers_stack_.Contains(worker))
+ idle_workers_stack_.Push(worker);
+
DCHECK_LE(idle_workers_stack_.Size(), workers_.size());
if (idle_workers_stack_.Size() == workers_.size())
idle_workers_stack_cv_for_testing_->Broadcast();
}
+const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const {
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
+ return idle_workers_stack_.Peek();
+}
+
void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack(
SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
idle_workers_stack_.Remove(worker);
}
+bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() {
+ AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_);
+ return worker_detachment_allowed_;
+}
+
} // namespace internal
} // namespace base

Powered by Google App Engine
This is Rietveld 408576698