Index: base/task_scheduler/scheduler_worker.cc |
diff --git a/base/task_scheduler/scheduler_worker.cc b/base/task_scheduler/scheduler_worker.cc |
index 2b6be08bafc8c63a9982e46339e2ae5ee52c5ea4..fcbc28382fa071ce56e0190cc5f03f57a30cb1a9 100644 |
--- a/base/task_scheduler/scheduler_worker.cc |
+++ b/base/task_scheduler/scheduler_worker.cc |
@@ -14,25 +14,152 @@ |
namespace base { |
namespace internal { |
+class SchedulerWorker::Thread : public PlatformThread::Delegate { |
+ public: |
+ ~Thread() override = default; |
+ |
+ static std::unique_ptr<Thread> Create(SchedulerWorker* outer) { |
+ std::unique_ptr<Thread> thread(new Thread(outer)); |
+ thread->Initialize(); |
+ if (thread->thread_handle_.is_null()) |
+ return nullptr; |
+ return thread; |
+ } |
+ |
+ // PlatformThread::Delegate. |
+ void ThreadMain() override { |
+ // Set if this thread was detached. |
+ std::unique_ptr<Thread> detached_thread; |
+ |
+ outer_->delegate_->OnMainEntry(outer_); |
+ |
+ // A SchedulerWorker starts out waiting for work. |
+ WaitForWork(); |
+ |
+ while (!outer_->task_tracker_->shutdown_completed() && |
+ !outer_->ShouldExitForTesting()) { |
+ DCHECK(outer_); |
+ // Get the sequence containing the next task to execute. |
+ scoped_refptr<Sequence> sequence = outer_->delegate_->GetWork(outer_); |
+ if (!sequence) { |
+ if (outer_->delegate_->CanDetach(outer_)) { |
+ detached_thread = outer_->Detach(); |
+ if (detached_thread) { |
+ DCHECK_EQ(detached_thread.get(), this); |
+ PlatformThread::Detach(thread_handle_); |
+ outer_ = nullptr; |
+ break; |
+ } |
+ } |
+ WaitForWork(); |
+ continue; |
+ } |
+ |
+ outer_->task_tracker_->RunTask(sequence->PeekTask()); |
+ |
+ const bool sequence_became_empty = sequence->PopTask(); |
+ |
+ // If |sequence| isn't empty immediately after the pop, re-enqueue it to |
+ // maintain the invariant that a non-empty Sequence is always referenced |
+ // by either a PriorityQueue or a SchedulerWorker. If it is empty |
+ // and there are live references to it, it will be enqueued when a Task is |
+ // added to it. Otherwise, it will be destroyed at the end of this scope. |
+ if (!sequence_became_empty) |
+ outer_->delegate_->ReEnqueueSequence(std::move(sequence)); |
+ |
+ // Calling WakeUp() guarantees that this SchedulerWorker will run |
+ // Tasks from Sequences returned by the GetWork() method of |delegate_| |
+ // until it returns nullptr. Resetting |wake_up_event_| here doesn't break |
+ // this invariant and avoids a useless loop iteration before going to |
+ // sleep if WakeUp() is called while this SchedulerWorker is awake. |
+ wake_up_event_.Reset(); |
+ } |
+ |
+ // If a wake up is pending and we successfully detached, somehow |outer_| |
+ // was able to signal us which means it probably thinks we're still alive. |
+ // This is bad as it will cause the WakeUp to no-op and |outer_| will be |
+ // stuck forever. |
+ DCHECK(!detached_thread || !IsWakeUpPending()) << |
+ "This thread was detached and woken up at the same time."; |
+ } |
+ |
+ void Join() { PlatformThread::Join(thread_handle_); } |
+ |
+ void WakeUp() { wake_up_event_.Signal(); } |
+ |
+ bool IsWakeUpPending() { return wake_up_event_.IsSignaled(); } |
+ |
+ private: |
+ Thread(SchedulerWorker* outer) |
+ : outer_(outer), |
+ wake_up_event_(WaitableEvent::ResetPolicy::MANUAL, |
+ WaitableEvent::InitialState::NOT_SIGNALED) { |
+ DCHECK(outer_); |
+ } |
+ |
+ void Initialize() { |
+ constexpr size_t kDefaultStackSize = 0; |
+ PlatformThread::CreateWithPriority(kDefaultStackSize, this, |
+ &thread_handle_, |
+ outer_->thread_priority_); |
+ } |
+ |
+ void WaitForWork() { |
+ DCHECK(outer_); |
+ const TimeDelta sleep_time = outer_->delegate_->GetSleepTimeout(); |
+ if (sleep_time.is_max()) { |
+ // Calling TimedWait with TimeDelta::Max is not recommended per |
+ // http://crbug.com/465948. |
+ wake_up_event_.Wait(); |
+ } else { |
+ wake_up_event_.TimedWait(sleep_time); |
+ } |
+ wake_up_event_.Reset(); |
+ } |
+ |
+ PlatformThreadHandle thread_handle_; |
+ |
+ SchedulerWorker* outer_; |
+ |
+ // Event signaled to wake up this thread. |
+ WaitableEvent wake_up_event_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(Thread); |
+}; |
+ |
std::unique_ptr<SchedulerWorker> SchedulerWorker::Create( |
ThreadPriority thread_priority, |
std::unique_ptr<Delegate> delegate, |
- TaskTracker* task_tracker) { |
+ TaskTracker* task_tracker, |
+ InitialState initial_state) { |
std::unique_ptr<SchedulerWorker> worker( |
- new SchedulerWorker(thread_priority, std::move(delegate), |
- task_tracker)); |
+ new SchedulerWorker(thread_priority, std::move(delegate), task_tracker)); |
+ // Creation happens before any other thread can reference this one, so no |
+ // synchronization is necessary. |
+ if (initial_state == SchedulerWorker::InitialState::ALIVE) { |
+ worker->CreateThread(); |
+ if (!worker->thread_) { |
+ return nullptr; |
+ } |
+ } |
- if (worker->thread_handle_.is_null()) |
- return nullptr; |
return worker; |
} |
SchedulerWorker::~SchedulerWorker() { |
- DCHECK(ShouldExitForTesting()); |
+ // It is unexpected for |thread_| to be alive and for SchedulerWorker to |
+ // destroy since SchedulerWorker owns the delegate needed by |thread_|. |
+ // For testing, this generally means JoinForTesting was not called. |
+ DCHECK(!thread_); |
} |
void SchedulerWorker::WakeUp() { |
- wake_up_event_.Signal(); |
+ AutoSchedulerLock auto_lock(thread_lock_); |
+ if (!thread_) |
+ CreateThreadAssertSynchronized(); |
+ |
+ if (thread_) |
+ thread_->WakeUp(); |
} |
void SchedulerWorker::JoinForTesting() { |
@@ -41,65 +168,49 @@ void SchedulerWorker::JoinForTesting() { |
should_exit_for_testing_ = true; |
} |
WakeUp(); |
- PlatformThread::Join(thread_handle_); |
+ |
+ // Normally holding a lock and joining is dangerous. However, since this is |
+ // only for testing, we're okay since the only scenario that could impact this |
+ // is a call to Detach, which is disallowed by having the delegate always |
+ // return false for the CanDetach call. |
+ AutoSchedulerLock auto_lock(thread_lock_); |
+ if (thread_) |
+ thread_->Join(); |
+ |
+ thread_.reset(); |
+} |
+ |
+bool SchedulerWorker::ThreadAliveForTesting() const { |
+ AutoSchedulerLock auto_lock(thread_lock_); |
+ return !!thread_; |
} |
SchedulerWorker::SchedulerWorker(ThreadPriority thread_priority, |
std::unique_ptr<Delegate> delegate, |
TaskTracker* task_tracker) |
- : wake_up_event_(WaitableEvent::ResetPolicy::AUTOMATIC, |
- WaitableEvent::InitialState::NOT_SIGNALED), |
+ : thread_priority_(thread_priority), |
delegate_(std::move(delegate)), |
task_tracker_(task_tracker) { |
DCHECK(delegate_); |
DCHECK(task_tracker_); |
- |
- const size_t kDefaultStackSize = 0; |
- PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, |
- thread_priority); |
} |
-void SchedulerWorker::ThreadMain() { |
- delegate_->OnMainEntry(this); |
- |
- // A SchedulerWorker starts out sleeping. |
- wake_up_event_.Wait(); |
- |
- while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { |
- // Get the sequence containing the next task to execute. |
- scoped_refptr<Sequence> sequence = delegate_->GetWork(this); |
- |
- if (!sequence) { |
- TimeDelta sleep_time = delegate_->GetSleepTimeout(); |
- if (sleep_time.is_max()) { |
- // Calling TimedWait with TimeDelta::Max is not recommended per |
- // http://crbug.com/465948. |
- wake_up_event_.Wait(); |
- } else { |
- wake_up_event_.TimedWait(sleep_time); |
- } |
- continue; |
- } |
- |
- task_tracker_->RunTask(sequence->PeekTask()); |
- |
- const bool sequence_became_empty = sequence->PopTask(); |
+std::unique_ptr<SchedulerWorker::Thread> SchedulerWorker::Detach() { |
+ DCHECK(!ShouldExitForTesting()) << "Worker was already joined"; |
+ AutoSchedulerLock auto_lock(thread_lock_); |
+ // If a wakeup is pending, then a WakeUp() came in while we were deciding to |
+ // detach. This means we can't go away anymore since we would break the |
+ // guarantee that we call GetWork() after a successful wakeup. |
+ return thread_->IsWakeUpPending() ? nullptr : std::move(thread_); |
+} |
- // If |sequence| isn't empty immediately after the pop, re-enqueue it to |
- // maintain the invariant that a non-empty Sequence is always referenced by |
- // either a PriorityQueue or a SchedulerWorker. If it is empty and there are |
- // live references to it, it will be enqueued when a Task is added to it. |
- // Otherwise, it will be destroyed at the end of this scope. |
- if (!sequence_became_empty) |
- delegate_->ReEnqueueSequence(std::move(sequence)); |
+void SchedulerWorker::CreateThread() { |
+ thread_ = Thread::Create(this); |
+} |
- // Calling WakeUp() guarantees that this SchedulerWorker will run Tasks from |
- // Sequences returned by the GetWork() method of |delegate_| until it |
- // returns nullptr. Resetting |wake_up_event_| here doesn't break this |
- // invariant and avoids a useless loop iteration before going to sleep if |
- // WakeUp() is called while this SchedulerWorker is awake. |
- wake_up_event_.Reset(); |
- } |
+void SchedulerWorker::CreateThreadAssertSynchronized() { |
+ thread_lock_.AssertAcquired(); |
+ CreateThread(); |
} |
bool SchedulerWorker::ShouldExitForTesting() const { |