Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2232)

Unified Diff: base/task_scheduler/scheduler_worker_pool_impl.cc

Issue 2116163002: Add Lazy Creation and Thread Detachment Support in the Scheduler Worker Pool (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: CR Feedback fdoray@ Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_worker_pool_impl.cc
diff --git a/base/task_scheduler/scheduler_worker_pool_impl.cc b/base/task_scheduler/scheduler_worker_pool_impl.cc
index 06933eb32db62386195ba3619fd71c03914346fd..677e53358866f5787b549eaab00bcf8467c443a7 100644
--- a/base/task_scheduler/scheduler_worker_pool_impl.cc
+++ b/base/task_scheduler/scheduler_worker_pool_impl.cc
@@ -9,6 +9,7 @@
#include <algorithm>
#include <utility>
+#include "base/atomicops.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
@@ -21,6 +22,7 @@
#include "base/threading/platform_thread.h"
#include "base/threading/thread_local.h"
#include "base/threading/thread_restrictions.h"
+#include "base/time/time.h"
namespace base {
namespace internal {
@@ -43,7 +45,9 @@ class SchedulerParallelTaskRunner : public TaskRunner {
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerParallelTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
- : traits_(traits), worker_pool_(worker_pool) {}
+ : traits_(traits), worker_pool_(worker_pool) {
+ DCHECK(worker_pool_);
+ }
// TaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -76,7 +80,9 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
// TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerSequencedTaskRunner(const TaskTraits& traits,
SchedulerWorkerPool* worker_pool)
- : traits_(traits), worker_pool_(worker_pool) {}
+ : traits_(traits), worker_pool_(worker_pool) {
+ DCHECK(worker_pool_);
+ }
// SequencedTaskRunner:
bool PostDelayedTask(const tracked_objects::Location& from_here,
@@ -113,56 +119,6 @@ class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
};
-// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
-class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
- public:
- // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
- // tasks so long as |worker_pool| and |worker| are alive.
- // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
- // and |worker|.
- SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
- SchedulerWorkerPool* worker_pool,
- SchedulerWorker* worker)
- : traits_(traits),
- worker_pool_(worker_pool),
- worker_(worker) {}
-
- // SingleThreadTaskRunner:
- bool PostDelayedTask(const tracked_objects::Location& from_here,
- const Closure& closure,
- TimeDelta delay) override {
- std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
- task->single_thread_task_runner_ref = this;
-
- // Post the task to be executed by |worker_| as part of |sequence_|.
- return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
- worker_);
- }
-
- bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
- const Closure& closure,
- base::TimeDelta delay) override {
- // Tasks are never nested within the task scheduler.
- return PostDelayedTask(from_here, closure, delay);
- }
-
- bool RunsTasksOnCurrentThread() const override {
- return tls_current_worker.Get().Get() == worker_;
- }
-
- private:
- ~SchedulerSingleThreadTaskRunner() override = default;
-
- // Sequence for all Tasks posted through this TaskRunner.
- const scoped_refptr<Sequence> sequence_ = new Sequence;
-
- const TaskTraits traits_;
- SchedulerWorkerPool* const worker_pool_;
- SchedulerWorker* const worker_;
-
- DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
-};
-
// Only used in DCHECKs.
bool ContainsWorker(
const std::vector<std::unique_ptr<SchedulerWorker>>& workers,
@@ -187,6 +143,7 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
// the pool name to label the underlying worker threads.
SchedulerWorkerDelegateImpl(
SchedulerWorkerPoolImpl* outer,
+ const TimeDelta& suggested_reclaim_time,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
const PriorityQueue* shared_priority_queue,
int index);
@@ -203,8 +160,17 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
TimeDelta GetSleepTimeout() override;
bool CanDetach(SchedulerWorker* worker) override;
+ void RegisterSingleThreadTaskRunner() {
+ subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, 1);
+ }
+
+ void UnregisterSingleThreadTaskRunner() {
+ subtle::NoBarrier_AtomicIncrement(&num_single_threaded_runners_, -1);
+ }
+
private:
SchedulerWorkerPoolImpl* outer_;
+ const TimeDelta suggested_reclaim_time_;
const ReEnqueueSequenceCallback re_enqueue_sequence_callback_;
// Single-threaded PriorityQueue for the worker.
@@ -214,11 +180,75 @@ class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
// |single_threaded_priority_queue_|.
bool last_sequence_is_single_threaded_ = false;
+ // True if the worker performed and idle cycle. Workers start out idle.
fdoray 2016/07/08 18:58:06 an* idle cycle
robliao 2016/07/08 21:17:03 Done.
+ bool performed_idle_cycle_ = true;
+
+ subtle::Atomic32 num_single_threaded_runners_ = 0;
+
const int index_;
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
};
+// A task runner that runs tasks with the SINGLE_THREADED ExecutionMode.
+class SchedulerWorkerPoolImpl::SchedulerSingleThreadTaskRunner :
+ public SingleThreadTaskRunner {
+ public:
+ // Constructs a SchedulerSingleThreadTaskRunner which can be used to post
+ // tasks so long as |worker_pool| and |worker| are alive.
+ // TODO(robliao): Find a concrete way to manage the memory of |worker_pool|
+ // and |worker|.
+ SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
+ SchedulerWorkerPoolImpl* worker_pool,
+ SchedulerWorker* worker)
+ : traits_(traits),
+ worker_pool_(worker_pool),
+ worker_(worker) {
+ DCHECK(worker_pool_);
+ DCHECK(worker_);
+ static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
+ RegisterSingleThreadTaskRunner();
+ }
+
+ // SingleThreadTaskRunner:
+ bool PostDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ TimeDelta delay) override {
+ std::unique_ptr<Task> task(new Task(from_here, closure, traits_, delay));
+ task->single_thread_task_runner_ref = this;
+
+ // Post the task to be executed by |worker_| as part of |sequence_|.
+ return worker_pool_->PostTaskWithSequence(std::move(task), sequence_,
+ worker_);
+ }
+
+ bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
+ const Closure& closure,
+ base::TimeDelta delay) override {
+ // Tasks are never nested within the task scheduler.
+ return PostDelayedTask(from_here, closure, delay);
+ }
+
+ bool RunsTasksOnCurrentThread() const override {
+ return tls_current_worker.Get().Get() == worker_;
+ }
+
+ private:
+ ~SchedulerSingleThreadTaskRunner() override {
+ static_cast<SchedulerWorkerDelegateImpl*>(worker_->delegate())->
+ UnregisterSingleThreadTaskRunner();
+ }
+
+ // Sequence for all Tasks posted through this TaskRunner.
+ const scoped_refptr<Sequence> sequence_ = new Sequence;
+
+ const TaskTraits traits_;
+ SchedulerWorkerPoolImpl* const worker_pool_;
+ SchedulerWorker* const worker_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
+};
+
SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
// SchedulerWorkerPool should never be deleted in production unless its
// initialization failed.
@@ -231,6 +261,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
ThreadPriority thread_priority,
size_t max_threads,
IORestriction io_restriction,
+ const TimeDelta& suggested_reclaim_time,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
TaskTracker* task_tracker,
DelayedTaskManager* delayed_task_manager) {
@@ -238,6 +269,7 @@ std::unique_ptr<SchedulerWorkerPoolImpl> SchedulerWorkerPoolImpl::Create(
new SchedulerWorkerPoolImpl(name, io_restriction, task_tracker,
delayed_task_manager));
if (worker_pool->Initialize(thread_priority, max_threads,
+ suggested_reclaim_time,
re_enqueue_sequence_callback)) {
return worker_pool;
}
@@ -251,6 +283,10 @@ void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
}
void SchedulerWorkerPoolImpl::JoinForTesting() {
+ {
+ AutoSchedulerLock auto_lock(join_for_testing_called_lock_);
+ join_for_testing_called_ = true;
+ }
for (const auto& worker : workers_)
worker->JoinForTesting();
@@ -371,10 +407,12 @@ void SchedulerWorkerPoolImpl::PostTaskWithSequenceNow(
SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
SchedulerWorkerDelegateImpl(
SchedulerWorkerPoolImpl* outer,
+ const TimeDelta& suggested_reclaim_time,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback,
const PriorityQueue* shared_priority_queue,
int index)
: outer_(outer),
+ suggested_reclaim_time_(suggested_reclaim_time),
re_enqueue_sequence_callback_(re_enqueue_sequence_callback),
single_threaded_priority_queue_(shared_priority_queue),
index_(index) {}
@@ -399,6 +437,9 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
tls_current_worker.Get().Set(worker);
tls_current_worker_pool.Get().Set(outer_);
+ // Workers start out idle, so this counts as an idle cycle.
+ performed_idle_cycle_ = true;
+
ThreadRestrictions::SetIOAllowed(outer_->io_restriction_ ==
IORestriction::ALLOWED);
}
@@ -455,6 +496,9 @@ SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
}
DCHECK(sequence);
+ // We're doing work, so this is not an idle cycle.
+ performed_idle_cycle_ = false;
+
outer_->RemoveFromIdleWorkersStack(worker);
return sequence;
}
@@ -476,12 +520,21 @@ void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
GetSleepTimeout() {
- return TimeDelta::Max();
+ return suggested_reclaim_time_;
}
bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CanDetach(
SchedulerWorker* worker) {
- return false;
+ // It's not an issue if num_single_threaded_runners_ is incremented after this
fdoray 2016/07/08 18:58:06 |num_single_threaded_runners_|
robliao 2016/07/08 21:17:03 Done.
+ // because the next single-threaded task will simply pick up a new physical
+ // thread.
+ bool can_detach = performed_idle_cycle_ &&
fdoray 2016/07/08 18:58:07 const bool
robliao 2016/07/08 21:17:03 Done.
+ worker != outer_->PeekAtIdleWorkersStack() &&
+ !subtle::Release_Load(&num_single_threaded_runners_) &&
+ !outer_->HasJoinedForTesting();
fdoray 2016/07/08 18:58:07 Sadly this is racy: - Thread A: SchedulerWorker::C
robliao 2016/07/08 21:17:03 Yup. This became the first test that flew in the f
fdoray 2016/07/20 14:15:43 Acknowledged.
+ // CanDetach is part of a SchedulerWorker's idle cycle.
+ performed_idle_cycle_ = true;
+ return can_detach;
}
SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
@@ -496,6 +549,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
idle_workers_stack_lock_.CreateConditionVariable()),
join_for_testing_returned_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
+ join_for_testing_called_(false),
#if DCHECK_IS_ON()
workers_created_(WaitableEvent::ResetPolicy::MANUAL,
WaitableEvent::InitialState::NOT_SIGNALED),
@@ -509,6 +563,7 @@ SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
bool SchedulerWorkerPoolImpl::Initialize(
ThreadPriority thread_priority,
size_t max_threads,
+ const TimeDelta& suggested_reclaim_time,
const ReEnqueueSequenceCallback& re_enqueue_sequence_callback) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
@@ -518,10 +573,13 @@ bool SchedulerWorkerPoolImpl::Initialize(
std::unique_ptr<SchedulerWorker> worker =
SchedulerWorker::Create(
thread_priority, WrapUnique(new SchedulerWorkerDelegateImpl(
- this, re_enqueue_sequence_callback,
+ this, suggested_reclaim_time,
+ re_enqueue_sequence_callback,
&shared_priority_queue_, static_cast<int>(i))),
task_tracker_,
- SchedulerWorker::InitialState::ALIVE);
+ i == 0
+ ? SchedulerWorker::InitialState::ALIVE
+ : SchedulerWorker::InitialState::DETACHED);
if (!worker)
break;
idle_workers_stack_.Push(worker.get());
@@ -548,18 +606,34 @@ void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
void SchedulerWorkerPoolImpl::AddToIdleWorkersStack(
SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
- idle_workers_stack_.Push(worker);
+ // Detachment may cause multiple attempts to add because the delegate cannot
+ // determine who woke it up. As a result, when it wakes up, it may conclude
+ // there's no work to be done and attempt to add itself to the idle stack
+ // again.
+ if (!idle_workers_stack_.Contains(worker))
+ idle_workers_stack_.Push(worker);
+
DCHECK_LE(idle_workers_stack_.Size(), workers_.size());
if (idle_workers_stack_.Size() == workers_.size())
idle_workers_stack_cv_for_testing_->Broadcast();
}
+const SchedulerWorker* SchedulerWorkerPoolImpl::PeekAtIdleWorkersStack() const {
+ AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
+ return idle_workers_stack_.Peek();
+}
+
void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStack(
SchedulerWorker* worker) {
AutoSchedulerLock auto_lock(idle_workers_stack_lock_);
idle_workers_stack_.Remove(worker);
}
+bool SchedulerWorkerPoolImpl::HasJoinedForTesting() {
+ AutoSchedulerLock auto_lock(join_for_testing_called_lock_);
+ return join_for_testing_called_;
+}
+
} // namespace internal
} // namespace base
« no previous file with comments | « base/task_scheduler/scheduler_worker_pool_impl.h ('k') | base/task_scheduler/scheduler_worker_pool_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698