Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(371)

Unified Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2116163002: Add Lazy Creation and Thread Detachment Support in the Scheduler Worker Pool (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Fenced Load Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_worker_pool_impl.cc
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc
index 9f521e229cabb9dd7c52abcc7bc92be9d7b9aa69..cd50c95ca2634db8921e2a49e1692662767561e3 100644
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc
@@ -9,6 +9,7 @@
#include <algorithm>
#include <utility>
+#include "base/atomicops.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
@@ -21,6 +22,7 @@
#include "base/threading/platform_thread.h"
#include "base/threading/thread_local.h"
#include "base/threading/thread_restrictions.h"
+#include "base/time/time.h"
namespace base {
namespace internal {
@@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner {
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerParallelTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
- : traits_(traits), worker_pool_(worker_pool) {}
+ : traits_(traits), worker_pool_(worker_pool) {
+ DCHECK(worker_pool_);
+ }
// TaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerSequencedTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
- : traits_(traits), worker_pool_(worker_pool) {}
+ : traits_(traits), worker_pool_(worker_pool) {
+ DCHECK(worker_pool_);
+ }
// SequencedTaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -113,8 +119,22 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
};
+// Only used in DCHECKs.
+bool ContainsWorker(
+ const std::vector<std::unique_ptr<SchedulerWorker>>& workers,
+ const SchedulerWorker* worker) {
+ auto it = std::find_if(workers.begin(), workers.end(),
+ [worker](const std::unique_ptr<SchedulerWorker>& i) {
+ return i.get() == worker;
+ });
+ return it != workers.end();
+}
+
+} // namespace
+
// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
-class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
+class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner :
+ public SingleThreadTaskRunner {
public:
// Constructs a SchedulerSingleThreadTaskRunner which can be used to post
// tasks so long as |worker_pool| and |worker| are alive.
@@ -122,10 +142,7 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
// and |worker|.
SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool,
- SchedulerWorker* worker)
- : traits_(traits),
- worker_pool_(worker_pool),
- worker_(worker) {}
+ SchedulerWorker* worker);
// SingleThreadTaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -151,7 +168,7 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
}
private:
- ~SchedulerSingleThreadTaskRunner() override = default;
+ ~SchedulerSingleThreadTaskRunner() override;
// Sequence for all Tasks posted through this TaskRunner.
const scoped_refptr<Sequence> sequence_ = new Sequence;
@@ -163,19 +180,6 @@ class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
};
-// Only used in DCHECKs.
-bool ContainsWorker(
- const std::vector<std::unique_ptr<SchedulerWorker>>& workers,
- const SchedulerWorker* worker) {
- auto it = std::find_if(workers.begin(), workers.end(),
- [worker](const std::unique_ptr<SchedulerWorker>& i) {
- return i.get() == worker;
- });
- return it != workers.end();
-}
-
-} // namespace
-
class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
: public SchedulerWorker::Delegate {
public:
@@ -203,6 +207,14 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
TimeDelta GetSleepTimeout() override;
bool CanDetach(SchedulerWorker* worker) override;
+ void RegisterSingleThreadTaskRunner() {
+ subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, 1);
+ }
+
+ void UnregisterSingleThreadTaskRunner() {
+ subtle::Barrier_AtomicIncrement(&num_single_threaded_runners_, -1);
+ }
+
private:
SchedulerWorkerPoolImpl* outer_;
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
@@ -214,6 +226,11 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
// |single_threaded_priority_queue_|.
bool last_sequence_is_single_threaded_ = false;
+ // Time when GetWork() first returned nullptr.
+ TimeTicks idle_start_time_;
+
+ subtle::Atomic32 num_single_threaded_runners_ = 0;
+
const int index_;
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
@@ -234,6 +251,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
std::unique_ptr<SchedulerWorkerPoolImpl> worker_pool(
new SchedulerWorkerPoolImpl(params.name(),
params.io_restriction(),
+ params.suggested_reclaim_time(),
task_tracker, delayed_task_manager));
if (worker_pool->Initialize(params.thread_priority(),
params.max_threads(),
@@ -250,6 +268,8 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
}
void SchedulerWorkerPoolImpl::JoinForTesting() {
+ DCHECK(!CanWorkerDetachForTesting() || suggested_reclaim_time_.is_max()) <<
+ "Workers can detach during join.";
for (const auto& worker : workers_)
worker->JoinForTesting();
@@ -257,6 +277,11 @@ void SchedulerWorkerPoolImpl::JoinForTesting() {
join_for_testing_returned_.Signal();
}
+void SchedulerWorkerPoolImpl::DisallowWorkerDetachmentForTesting() {
+ AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_);
+ worker_detachment_allowed_ = false;
+}
+
scoped_refptr<TaskRunner> SchedulerWorkerPoolImpl::CreateTaskRunnerWithTraits(
const TaskTraits& traits,
ExecutionMode execution_mode) {
@@ -367,6 +392,25 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
}
}
+SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
+ SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
+ SchedulerWorkerPool* worker_pool,
+ SchedulerWorker* worker)
+ : traits_(traits),
+ worker_pool_(worker_pool),
+ worker_(worker) {
+ DCHECK(worker_pool_);
+ DCHECK(worker_);
+ static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
+ RegisterSingleThreadTaskRunner();
+}
+
+SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner::
+ ~SchedulerSingleThreadTaskRunner() {
+ static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
+ UnregisterSingleThreadTaskRunner();
+}
+
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
SchedulerWorkerDelegateImpl(
SchedulerWorkerPoolImpl* outer,
@@ -398,6 +442,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
tls_current_worker.Get().Set(worker);
tls_current_worker_pool.Get().Set(outer_);
+ // New threads haven't run GetWork() yet, so reset the idle_start_time_.
+ idle_start_time_ = TimeTicks();
+
ThreadRestrictions::SetIOAllowed(
outer_->io_restriction_ ==
SchedulerWorkerPoolParams::IORestriction::ALLOWED);
@@ -432,6 +479,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
// 4. This thread adds itself to |idle_workers_stack_| and goes to sleep.
// No thread runs the Sequence inserted in step 2.
outer_->AddToIdleWorkersStack(worker);
+ if (idle_start_time_.is_null())
+ idle_start_time_ = TimeTicks::Now();
return nullptr;
}
@@ -455,6 +504,8 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
}
DCHECK(sequence);
+ idle_start_time_ = TimeTicks();
+
outer_->RemoveFromIdleWorkersStack(worker);
return sequence;
}
@@ -476,21 +527,33 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
GetSleepTimeout() {
- return TimeDelta::Max();
+ return outer_->suggested_reclaim_time_;
}
bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
SchedulerWorker* worker) {
- return false;
+ // It's not an issue if |num_single_threaded_runners_| is incremented after
+ // this because the newly created TaskRunner (from which no task has run yet)
+ // will simply run all its tasks on the next physical thread created by the
+ // worker.
+ const bool can_detach =
+ !idle_start_time_.is_null() &&
+ (TimeTicks::Now() - idle_start_time_) > outer_->suggested_reclaim_time_ &&
+ worker != outer_->PeekAtIdleWorkersStack() &&
+ !subtle::Acquire_Load(&num_single_threaded_runners_) &&
+ outer_->CanWorkerDetachForTesting();
+ return can_detach;
}
SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
StringPiece name,
SchedulerWorkerPoolParams::IORestriction io_restriction,
+ const TimeDelta& suggested_reclaim_time,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager)
: name_(name.as_string()),
io_restriction_(io_restriction),
+ suggested_reclaim_time_(suggested_reclaim_time),
idle_workers_stack_lock_(shared_priority_queue_.container_lock()),
idle_workers_stack_cv_for_testing_(
idle_workers_stack_lock_.CreateConditionVariable()),
@@ -521,7 +584,9 @@ bool SchedulerWorkerPoolImpl::Initialize(
this, re_enqueue_sequence_callback,
&shared_priority_queue_, static_cast<int>(i))),
task_tracker_,
- SchedulerWorker::InitialState::ALIVE);
+ i == 0
+ ? SchedulerWorker::InitialState::ALIVE
+ : SchedulerWorker::InitialState::DETACHED);
if (!worker)
break;
idle_workers_stack_.Push(worker.get());
@@ -548,18 +613,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
- idle_workers_stack_.Push(worker);
+ // Detachment may cause multiple attempts to add because the delegate cannot
+ // determine who woke it up. As a result, when it wakes up, it may conclude
+ // there's no work to be done and attempt to add itself to the idle stack
+ // again.
+ if (!idle_workers_stack_.Contains(worker))
+ idle_workers_stack_.Push(worker);
+
DCHECK_LE(idle_workers_stack_.Size(), workers_.size());
if (idle_workers_stack_.Size() == workers_.size())
idle_workers_stack_cv_for_testing_->Broadcast();
}
+const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const {
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
+ return idle_workers_stack_.Peek();
+}
+
void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack(
SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
idle_workers_stack_.Remove(worker);
}
+bool SchedulerWorkerPoolImpl::CanWorkerDetachForTesting() {
+ AutoSchedulerLock auto_lock(worker_detachment_allowed_lock_);
+ return worker_detachment_allowed_;
+}
+
} // namespace internal
} // namespace base

Powered by Google App Engine
This is Rietveld 408576698