Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1612)

Unified Diff: base/task_scheduler/scheduler_worker_thread.cc

Issue 2044023003: Virtualize The Existence of a Scheduler Worker Thread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@detach
Patch Set: CR Feedback Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/scheduler_worker_thread.cc
diff --git a/base/task_scheduler/scheduler_worker_thread.cc b/base/task_scheduler/scheduler_worker_thread.cc
index afc70ec360ba136aee7b0e1dc719facaf678d284..8a8e1d6ad2304a766a8eda7eb8664f43926aaffa 100644
--- a/base/task_scheduler/scheduler_worker_thread.cc
+++ b/base/task_scheduler/scheduler_worker_thread.cc
@@ -14,25 +14,145 @@
namespace base {
namespace internal {
+class SchedulerWorkerThread::Worker : public PlatformThread::Delegate {
+ public:
+ ~Worker() override = default;
+
+ static std::unique_ptr<Worker> Create(SchedulerWorkerThread* outer) {
+ std::unique_ptr<Worker> worker(new Worker(outer));
+ worker->Initialize();
+ if (worker->thread_handle_.is_null())
+ return nullptr;
+ return worker;
+ }
+
+ // PlatformThread::Delegate.
+ void ThreadMain() override {
+ // Set if this thread was detached.
+ std::unique_ptr<Worker> detached_worker;
+
+ outer_->delegate_->OnMainEntry(outer_);
+
+ // A SchedulerWorkerThread starts out waiting for work.
+ WaitForWork();
+
+ while (!outer_->task_tracker_->shutdown_completed() &&
+ !outer_->ShouldExitForTesting()) {
+ DCHECK(outer_);
+ // Get the sequence containing the next task to execute.
+ scoped_refptr<Sequence> sequence = outer_->delegate_->GetWork(outer_);
+ if (!sequence) {
+ if (outer_->delegate_->CanDetach(outer_)) {
+ detached_worker = outer_->Detach();
+ if (detached_worker) {
+ DCHECK_EQ(detached_worker.get(), this);
+ PlatformThread::Detach(thread_handle_);
+ outer_ = nullptr;
+ break;
+ }
+ }
+ WaitForWork();
+ continue;
+ }
+
+ outer_->task_tracker_->RunTask(sequence->PeekTask());
+
+ const bool sequence_became_empty = sequence->PopTask();
+
+ // If |sequence| isn't empty immediately after the pop, re-enqueue it to
+ // maintain the invariant that a non-empty Sequence is always referenced
+ // by either a PriorityQueue or a SchedulerWorkerThread. If it is empty
+ // andthere are live references to it, it will be enqueued when a Task is
gab 2016/06/10 16:15:20 "and there"
robliao 2016/06/10 18:03:41 Done.
+ // added to it. Otherwise, it will be destroyed at the end of this scope.
+ if (!sequence_became_empty)
+ outer_->delegate_->ReEnqueueSequence(std::move(sequence));
+
+ // Calling WakeUp() guarantees that this SchedulerWorkerThread will run
+ // Tasks from Sequences returned by the GetWork() method of |delegate_|
+ // until it returns nullptr. Resetting |wake_up_event_| here doesn't break
+ // this invariant and avoids a useless loop iteration before going to
+ // sleep if WakeUp() is called while this SchedulerWorkerThread is awake.
+ wake_up_event_.Reset();
+ }
+
+ DCHECK(!detached_worker || !IsWakeUpPending()) <<
+ "This thread was detached and woken up at the same time.";
gab 2016/06/10 16:15:20 Is this bad (i.e. someone did something wrong) or
robliao 2016/06/10 18:03:41 Yep, very bad. If the thread is signaled after the
+ }
+
+ void Join() { PlatformThread::Join(thread_handle_); }
+
+ void WakeUp() { wake_up_event_.Signal(); }
+
+ bool IsWakeUpPending() { return wake_up_event_.IsSignaled(); }
+
+ private:
+ Worker(SchedulerWorkerThread* outer)
+ : outer_(outer),
+ wake_up_event_(WaitableEvent::ResetPolicy::MANUAL,
+ WaitableEvent::InitialState::NOT_SIGNALED) {
+ DCHECK(outer_);
+ }
+
+ void Initialize() {
+ const size_t kDefaultStackSize = 0;
+ PlatformThread::CreateWithPriority(kDefaultStackSize, this,
+ &thread_handle_,
+ outer_->thread_priority_);
+ }
+
+ void WaitForWork() {
+ DCHECK(outer_);
+ const TimeDelta sleep_time = outer_->delegate_->GetSleepTimeout();
+ if (sleep_time.is_max()) {
+ // Calling TimedWait with TimeDelta::Max is not recommended per
+ // http://crbug.com/465948.
+ wake_up_event_.Wait();
+ } else {
+ wake_up_event_.TimedWait(sleep_time);
+ }
+ wake_up_event_.Reset();
gab 2016/06/10 16:15:20 Why not keep this event AUTOMATIC if you're going
robliao 2016/06/10 18:03:41 IsSignaled apparently resets the WaitableEvent! It
+ }
+
+ PlatformThreadHandle thread_handle_;
+
+ SchedulerWorkerThread* outer_;
+
+ // Event signaled to wake up this SchedulerWorkerThread.
+ WaitableEvent wake_up_event_;
+
+ DISALLOW_COPY_AND_ASSIGN(Worker);
+};
+
std::unique_ptr<SchedulerWorkerThread> SchedulerWorkerThread::Create(
ThreadPriority thread_priority,
std::unique_ptr<Delegate> delegate,
- TaskTracker* task_tracker) {
+ TaskTracker* task_tracker,
+ InitialWorkerState worker_state) {
std::unique_ptr<SchedulerWorkerThread> worker_thread(
new SchedulerWorkerThread(thread_priority, std::move(delegate),
task_tracker));
+ // Creation is single-threaded, so no synchronization is necessary.
gab 2016/06/10 16:15:20 I think what you mean is: // Creation happens bef
robliao 2016/06/10 18:03:41 Yep, exactly what I mean. I was going for the one-
+ if (worker_state == SchedulerWorkerThread::InitialWorkerState::ALIVE) {
+ worker_thread->CreateWorker();
+ if (!worker_thread->worker_) {
+ return nullptr;
+ }
+ }
- if (worker_thread->thread_handle_.is_null())
- return nullptr;
return worker_thread;
}
SchedulerWorkerThread::~SchedulerWorkerThread() {
- DCHECK(ShouldExitForTesting());
+ DCHECK(ShouldExitForTesting() || !worker_);
gab 2016/06/10 16:15:20 So I thought we said the SWT object itself would a
robliao 2016/06/10 18:03:41 I don't recall a case where we would want the Sche
gab 2016/06/13 19:23:05 Feels we want to keep the ShouldExitForTesting() D
robliao 2016/06/13 23:23:28 It seemed redundant to me as the goal of ShouldExi
gab 2016/06/15 19:40:42 Wasn't it also because the rest of the scheduler a
}
void SchedulerWorkerThread::WakeUp() {
- wake_up_event_.Signal();
+ AutoSchedulerLock auto_lock(worker_lock_);
+ if (!worker_)
+ CreateWorkerAssertSynchronized();
+
+ if (worker_)
+ worker_->WakeUp();
}
void SchedulerWorkerThread::JoinForTesting() {
@@ -41,65 +161,47 @@ void SchedulerWorkerThread::JoinForTesting() {
should_exit_for_testing_ = true;
}
WakeUp();
- PlatformThread::Join(thread_handle_);
+
+ // Normally holding a lock and joining is dangerous. However, since this is
+ // only for testing, we're okay since the only scenario that could impact this
+ // is a call to Detach, which is disallowed by having the delegate always
+ // return false for the CanDetach call.
+ AutoSchedulerLock auto_lock(worker_lock_);
+ if (worker_)
+ worker_->Join();
+}
+
+bool SchedulerWorkerThread::WorkerAliveForTesting() const {
+ AutoSchedulerLock auto_lock(worker_lock_);
+ return !!worker_;
}
SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority,
std::unique_ptr<Delegate> delegate,
TaskTracker* task_tracker)
- : wake_up_event_(WaitableEvent::ResetPolicy::AUTOMATIC,
- WaitableEvent::InitialState::NOT_SIGNALED),
+ : thread_priority_(thread_priority),
delegate_(std::move(delegate)),
task_tracker_(task_tracker) {
DCHECK(delegate_);
DCHECK(task_tracker_);
-
- const size_t kDefaultStackSize = 0;
- PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
- thread_priority);
}
-void SchedulerWorkerThread::ThreadMain() {
- delegate_->OnMainEntry(this);
-
- // A SchedulerWorkerThread starts out sleeping.
- wake_up_event_.Wait();
-
- while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) {
- // Get the sequence containing the next task to execute.
- scoped_refptr<Sequence> sequence = delegate_->GetWork(this);
-
- if (!sequence) {
- TimeDelta sleep_time = delegate_->GetSleepTimeout();
- if (sleep_time.is_max()) {
- // Calling TimedWait with TimeDelta::Max is not recommended per
- // http://crbug.com/465948.
- wake_up_event_.Wait();
- } else {
- wake_up_event_.TimedWait(sleep_time);
- }
- continue;
- }
-
- task_tracker_->RunTask(sequence->PeekTask());
-
- const bool sequence_became_empty = sequence->PopTask();
+std::unique_ptr<SchedulerWorkerThread::Worker> SchedulerWorkerThread::Detach() {
+ DCHECK(!ShouldExitForTesting()) << "Worker was already joined";
+ AutoSchedulerLock auto_lock(worker_lock_);
+ // If a wakeup is pending, then a WakeUp came in while we were deciding to
+ // detach. This means we can't go away anymore since we would break the
+ // guarantee that we call GetWork after a successful wakeup.
+ return worker_->IsWakeUpPending() ? nullptr : std::move(worker_);
+}
- // If |sequence| isn't empty immediately after the pop, re-enqueue it to
- // maintain the invariant that a non-empty Sequence is always referenced by
- // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and
- // there are live references to it, it will be enqueued when a Task is added
- // to it. Otherwise, it will be destroyed at the end of this scope.
- if (!sequence_became_empty)
- delegate_->ReEnqueueSequence(std::move(sequence));
+void SchedulerWorkerThread::CreateWorker() {
+ worker_ = Worker::Create(this);
+}
- // Calling WakeUp() guarantees that this SchedulerWorkerThread will run
- // Tasks from Sequences returned by the GetWork() method of |delegate_|
- // until it returns nullptr. Resetting |wake_up_event_| here doesn't break
- // this invariant and avoids a useless loop iteration before going to sleep
- // if WakeUp() is called while this SchedulerWorkerThread is awake.
- wake_up_event_.Reset();
- }
+void SchedulerWorkerThread::CreateWorkerAssertSynchronized() {
+ worker_lock_.AssertAcquired();
+ CreateWorker();
}
bool SchedulerWorkerThread::ShouldExitForTesting() const {

Powered by Google App Engine
This is Rietveld 408576698