Chromium Code Reviews| Index: base/task_scheduler/scheduler_worker.cc |
| diff --git a/base/task_scheduler/scheduler_worker.cc b/base/task_scheduler/scheduler_worker.cc |
| index 2b6be08bafc8c63a9982e46339e2ae5ee52c5ea4..1635b073109282faa6b089ad2c9568fbc7e35eb7 100644 |
| --- a/base/task_scheduler/scheduler_worker.cc |
| +++ b/base/task_scheduler/scheduler_worker.cc |
| @@ -14,25 +14,152 @@ |
| namespace base { |
| namespace internal { |
| +class SchedulerWorker::Thread : public PlatformThread::Delegate { |
| + public: |
| + ~Thread() override = default; |
| + |
| + static std::unique_ptr<Thread> Create(SchedulerWorker* outer) { |
| + std::unique_ptr<Thread> thread(new Thread(outer)); |
| + thread->Initialize(); |
| + if (thread->thread_handle_.is_null()) |
| + return nullptr; |
| + return thread; |
| + } |
| + |
| + // PlatformThread::Delegate. |
| + void ThreadMain() override { |
| + // Set if this thread was detached. |
| + std::unique_ptr<Thread> detached_thread; |
| + |
| + outer_->delegate_->OnMainEntry(outer_); |
| + |
| + // A SchedulerWorker starts out waiting for work. |
| + WaitForWork(); |
| + |
| + while (!outer_->task_tracker_->shutdown_completed() && |
| + !outer_->ShouldExitForTesting()) { |
| + DCHECK(outer_); |
| + // Get the sequence containing the next task to execute. |
| + scoped_refptr<Sequence> sequence = outer_->delegate_->GetWork(outer_); |
| + if (!sequence) { |
| + if (outer_->delegate_->CanDetach(outer_)) { |
| + detached_thread = outer_->Detach(); |
| + if (detached_thread) { |
| + DCHECK_EQ(detached_thread.get(), this); |
| + PlatformThread::Detach(thread_handle_); |
| + outer_ = nullptr; |
| + break; |
| + } |
| + } |
| + WaitForWork(); |
| + continue; |
| + } |
| + |
| + outer_->task_tracker_->RunTask(sequence->PeekTask()); |
| + |
| + const bool sequence_became_empty = sequence->PopTask(); |
| + |
| + // If |sequence| isn't empty immediately after the pop, re-enqueue it to |
| + // maintain the invariant that a non-empty Sequence is always referenced |
| + // by either a PriorityQueue or a SchedulerWorker. If it is empty |
| + // and there are live references to it, it will be enqueued when a Task is |
| + // added to it. Otherwise, it will be destroyed at the end of this scope. |
| + if (!sequence_became_empty) |
| + outer_->delegate_->ReEnqueueSequence(std::move(sequence)); |
| + |
| + // Calling WakeUp() guarantees that this SchedulerWorker will run |
| + // Tasks from Sequences returned by the GetWork() method of |delegate_| |
| + // until it returns nullptr. Resetting |wake_up_event_| here doesn't break |
| + // this invariant and avoids a useless loop iteration before going to |
| + // sleep if WakeUp() is called while this SchedulerWorker is awake. |
| + wake_up_event_.Reset(); |
| + } |
| + |
| + // If a wake up is pending and we successfully detached, somehow |outer_| |
| + // was able to signal us which means it probably thinks we're still alive. |
| + // This is bad as it will cause the WakeUp to no-op and |outer_| will be |
| + // stuck forever. |
| + DCHECK(!detached_thread || !IsWakeUpPending()) << |
| + "This thread was detached and woken up at the same time."; |
| + } |
| + |
| + void Join() { PlatformThread::Join(thread_handle_); } |
| + |
| + void WakeUp() { wake_up_event_.Signal(); } |
| + |
| + bool IsWakeUpPending() { return wake_up_event_.IsSignaled(); } |
| + |
| + private: |
| + Thread(SchedulerWorker* outer) |
| + : outer_(outer), |
| + wake_up_event_(WaitableEvent::ResetPolicy::MANUAL, |
| + WaitableEvent::InitialState::NOT_SIGNALED) { |
| + DCHECK(outer_); |
| + } |
| + |
| + void Initialize() { |
| + constexpr size_t kDefaultStackSize = 0; |
| + PlatformThread::CreateWithPriority(kDefaultStackSize, this, |
| + &thread_handle_, |
| + outer_->thread_priority_); |
| + } |
| + |
| + void WaitForWork() { |
| + DCHECK(outer_); |
| + const TimeDelta sleep_time = outer_->delegate_->GetSleepTimeout(); |
| + if (sleep_time.is_max()) { |
| + // Calling TimedWait with TimeDelta::Max is not recommended per |
| + // http://crbug.com/465948. |
| + wake_up_event_.Wait(); |
| + } else { |
| + wake_up_event_.TimedWait(sleep_time); |
| + } |
| + wake_up_event_.Reset(); |
| + } |
| + |
| + PlatformThreadHandle thread_handle_; |
| + |
| + SchedulerWorker* outer_; |
| + |
| + // Event signaled to wake up this thread. |
| + WaitableEvent wake_up_event_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(Thread); |
| +}; |
| + |
| std::unique_ptr<SchedulerWorker> SchedulerWorker::Create( |
| ThreadPriority thread_priority, |
| std::unique_ptr<Delegate> delegate, |
| - TaskTracker* task_tracker) { |
| + TaskTracker* task_tracker, |
| + InitialState initial_state) { |
| std::unique_ptr<SchedulerWorker> worker( |
| - new SchedulerWorker(thread_priority, std::move(delegate), |
| - task_tracker)); |
| + new SchedulerWorker(thread_priority, std::move(delegate), task_tracker)); |
| + // Creation happens before any other thread can reference this one, so no |
| + // synchronization is necessary. |
| + if (initial_state == SchedulerWorker::InitialState::ALIVE) { |
| + worker->CreateThread(); |
| + if (!worker->thread_) { |
| + return nullptr; |
| + } |
| + } |
| - if (worker->thread_handle_.is_null()) |
| - return nullptr; |
| return worker; |
| } |
| SchedulerWorker::~SchedulerWorker() { |
| - DCHECK(ShouldExitForTesting()); |
| + // It is unexpected for |thread_| to be alive and for SchedulerWorker to |
| + // destroy since SchedulerWorker owns the delegate needed by |thread_|. |
| + // For testing, this generally means JoinForTesting was not called. |
| + DCHECK(!thread_); |
| } |
| void SchedulerWorker::WakeUp() { |
| - wake_up_event_.Signal(); |
| + AutoSchedulerLock auto_lock(thread_lock_); |
| + if (!thread_) |
| + CreateThreadAssertSynchronized(); |
| + |
| + if (thread_) |
| + thread_->WakeUp(); |
| } |
| void SchedulerWorker::JoinForTesting() { |
| @@ -41,65 +168,49 @@ void SchedulerWorker::JoinForTesting() { |
| should_exit_for_testing_ = true; |
| } |
| WakeUp(); |
| - PlatformThread::Join(thread_handle_); |
| + |
| + // Normally holding a lock and joining is dangerous. However, since this is |
| + // only for testing, we're okay since the only scenario that could impact this |
| + // is a call to Detach, which is disallowed by having the delegate always |
| + // return false for the CanDetach call. |
| + AutoSchedulerLock auto_lock(thread_lock_); |
| + if (thread_) |
| + thread_->Join(); |
| + |
| + thread_.reset(); |
| +} |
| + |
| +bool SchedulerWorker::ThreadAliveForTesting() const { |
| + AutoSchedulerLock auto_lock(thread_lock_); |
| + return !!thread_; |
| } |
| SchedulerWorker::SchedulerWorker(ThreadPriority thread_priority, |
| std::unique_ptr<Delegate> delegate, |
| TaskTracker* task_tracker) |
| - : wake_up_event_(WaitableEvent::ResetPolicy::AUTOMATIC, |
| - WaitableEvent::InitialState::NOT_SIGNALED), |
| + : thread_priority_(thread_priority), |
| delegate_(std::move(delegate)), |
| task_tracker_(task_tracker) { |
| DCHECK(delegate_); |
| DCHECK(task_tracker_); |
| - |
| - const size_t kDefaultStackSize = 0; |
| - PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, |
| - thread_priority); |
| } |
| -void SchedulerWorker::ThreadMain() { |
| - delegate_->OnMainEntry(this); |
| - |
| - // A SchedulerWorker starts out sleeping. |
| - wake_up_event_.Wait(); |
| - |
| - while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { |
| - // Get the sequence containing the next task to execute. |
| - scoped_refptr<Sequence> sequence = delegate_->GetWork(this); |
| - |
| - if (!sequence) { |
| - TimeDelta sleep_time = delegate_->GetSleepTimeout(); |
| - if (sleep_time.is_max()) { |
| - // Calling TimedWait with TimeDelta::Max is not recommended per |
| - // http://crbug.com/465948. |
| - wake_up_event_.Wait(); |
| - } else { |
| - wake_up_event_.TimedWait(sleep_time); |
| - } |
| - continue; |
| - } |
| - |
| - task_tracker_->RunTask(sequence->PeekTask()); |
| - |
| - const bool sequence_became_empty = sequence->PopTask(); |
| +std::unique_ptr<SchedulerWorker::Thread> SchedulerWorker::Detach() { |
| + DCHECK(!ShouldExitForTesting()) << "Worker was already joined"; |
| + AutoSchedulerLock auto_lock(thread_lock_); |
| + // If a wakeup is pending, then a WakeUp came in while we were deciding to |
|
fdoray
2016/06/22 18:22:43
WakeUp()
robliao
2016/06/22 19:24:33
Done.
|
| + // detach. This means we can't go away anymore since we would break the |
| + // guarantee that we call GetWork after a successful wakeup. |
|
fdoray
2016/06/22 18:22:43
GetWork()
robliao
2016/06/22 19:24:33
Done.
|
| + return thread_->IsWakeUpPending() ? nullptr : std::move(thread_); |
| +} |
| - // If |sequence| isn't empty immediately after the pop, re-enqueue it to |
| - // maintain the invariant that a non-empty Sequence is always referenced by |
| - // either a PriorityQueue or a SchedulerWorker. If it is empty and there are |
| - // live references to it, it will be enqueued when a Task is added to it. |
| - // Otherwise, it will be destroyed at the end of this scope. |
| - if (!sequence_became_empty) |
| - delegate_->ReEnqueueSequence(std::move(sequence)); |
| +void SchedulerWorker::CreateThread() { |
| + thread_ = Thread::Create(this); |
| +} |
| - // Calling WakeUp() guarantees that this SchedulerWorker will run Tasks from |
| - // Sequences returned by the GetWork() method of |delegate_| until it |
| - // returns nullptr. Resetting |wake_up_event_| here doesn't break this |
| - // invariant and avoids a useless loop iteration before going to sleep if |
| - // WakeUp() is called while this SchedulerWorker is awake. |
| - wake_up_event_.Reset(); |
| - } |
| +void SchedulerWorker::CreateThreadAssertSynchronized() { |
| + thread_lock_.AssertAcquired(); |
| + CreateThread(); |
| } |
| bool SchedulerWorker::ShouldExitForTesting() const { |