Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "base/task_scheduler/scheduler_worker_thread.h" | 5 #include "base/task_scheduler/scheduler_worker_thread.h" |
| 6 | 6 |
| 7 #include <stddef.h> | 7 #include <stddef.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/logging.h" | 11 #include "base/logging.h" |
| 12 #include "base/task_scheduler/task_tracker.h" | 12 #include "base/task_scheduler/task_tracker.h" |
| 13 | 13 |
| 14 namespace base { | 14 namespace base { |
| 15 namespace internal { | 15 namespace internal { |
| 16 | 16 |
| 17 class SchedulerWorkerThread::Worker : public PlatformThread::Delegate { | |
|
fdoray
2016/06/08 17:51:48
s/::Worker/::Thread/ ?
robliao
2016/06/08 19:00:08
I wanted to avoid SchedulerWorkerThread::Thread. I
gab
2016/06/10 16:15:20
Or we could have SchedulerWorker and SchedulerWork
robliao
2016/06/10 18:03:41
Happy to perform the rename. I'll do that as the v
| |
| 18 public: | |
| 19 ~Worker() override = default; | |
| 20 | |
| 21 static std::unique_ptr<Worker> Create(SchedulerWorkerThread* outer, | |
| 22 ThreadPriority thread_priority) { | |
| 23 std::unique_ptr<Worker> worker(new Worker(outer)); | |
| 24 worker->Initialize(thread_priority); | |
|
fdoray
2016/06/08 17:51:48
Could read priority from |outer|.
robliao
2016/06/08 19:00:08
Indeed! Done.
| |
| 25 if (worker->thread_handle_.is_null()) | |
| 26 return nullptr; | |
| 27 return worker; | |
| 28 } | |
| 29 | |
| 30 // PlatformThread::Delegate. | |
| 31 void ThreadMain() override { | |
| 32 // Set if this thread was detached. | |
| 33 std::unique_ptr<Worker> detached_worker; | |
| 34 | |
| 35 outer_->delegate_->OnMainEntry(outer_); | |
| 36 | |
| 37 // A SchedulerWorkerThread starts out sleeping. | |
| 38 wake_up_event_.Wait(); | |
|
fdoray
2016/06/08 17:51:48
GetSleepTimeout()? Otherwise, a thread that is nev
robliao
2016/06/08 19:00:08
Done.
Huh, interesting. This actually means tha
| |
| 39 | |
| 40 while (outer_ && !outer_->task_tracker_->shutdown_completed() && | |
|
fdoray
2016/06/08 17:51:48
How can |outer_| be nullptr here? You "break;" rig
robliao
2016/06/08 19:00:08
Indeed. I left this over from an earlier revision
| |
| 41 !outer_->ShouldExitForTesting()) { | |
| 42 // Get the sequence containing the next task to execute. | |
| 43 scoped_refptr<Sequence> sequence = outer_->delegate_->GetWork(outer_); | |
| 44 if (!sequence) { | |
| 45 if (outer_->delegate_->CanDetach(outer_)) { | |
| 46 detached_worker = outer_->Detach(); | |
| 47 if (detached_worker) { | |
| 48 DCHECK_EQ(detached_worker.get(), this); | |
| 49 PlatformThread::Detach(thread_handle_); | |
| 50 outer_ = nullptr; | |
|
fdoray
2016/06/08 17:51:48
Is there a reason why this has to be set to nullpt
robliao
2016/06/08 19:00:08
This ensures that two workers aren't talking to a
| |
| 51 break; | |
| 52 } | |
| 53 } | |
| 54 TimeDelta sleep_time = outer_->delegate_->GetSleepTimeout(); | |
|
fdoray
2016/06/08 17:51:48
const TimeDelta sleep_time =
robliao
2016/06/08 19:00:08
Done.
| |
| 55 if (sleep_time.is_max()) { | |
| 56 // Calling TimedWait with TimeDelta::Max is not recommended per | |
| 57 // http://crbug.com/465948. | |
| 58 wake_up_event_.Wait(); | |
| 59 } else { | |
| 60 wake_up_event_.TimedWait(sleep_time); | |
| 61 } | |
| 62 continue; | |
| 63 } | |
| 64 | |
| 65 outer_->task_tracker_->RunTask(sequence->PeekTask()); | |
| 66 | |
| 67 const bool sequence_became_empty = sequence->PopTask(); | |
| 68 | |
| 69 // If |sequence| isn't empty immediately after the pop, re-enqueue it to | |
| 70 // maintain the invariant that a non-empty Sequence is always referenced | |
| 71 // by either a PriorityQueue or a SchedulerWorkerThread. If it is empty | |
| 72 // andthere are live references to it, it will be enqueued when a Task is | |
| 73 // added to it. Otherwise, it will be destroyed at the end of this scope. | |
| 74 if (!sequence_became_empty) | |
| 75 outer_->delegate_->ReEnqueueSequence(std::move(sequence)); | |
| 76 | |
| 77 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run | |
| 78 // Tasks from Sequences returned by the GetWork() method of |delegate_| | |
| 79 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break | |
| 80 // this invariant and avoids a useless loop iteration before going to | |
| 81 // sleep if WakeUp() is called while this SchedulerWorkerThread is awake. | |
| 82 wake_up_event_.Reset(); | |
| 83 } | |
| 84 | |
| 85 DCHECK(!detached_worker || !wake_up_event_.IsSignaled()) << | |
|
fdoray
2016/06/08 17:51:48
!IsWakeUpPending()
robliao
2016/06/08 19:00:08
Done.
| |
| 86 "This thread was detached and woken up at the same time."; | |
| 87 } | |
| 88 | |
| 89 void Join() { PlatformThread::Join(thread_handle_); } | |
| 90 | |
| 91 void WakeUp() { wake_up_event_.Signal(); } | |
| 92 | |
| 93 bool IsWakeUpPending() { return wake_up_event_.IsSignaled(); } | |
|
fdoray
2016/06/08 17:51:48
IsSignaled() resets the event...
https://cs.chromi
robliao
2016/06/08 19:00:08
Nice catch. That certainly seems unexpected. Chang
| |
| 94 | |
| 95 private: | |
| 96 Worker(SchedulerWorkerThread* outer) | |
| 97 : outer_(outer), | |
| 98 wake_up_event_(WaitableEvent::ResetPolicy::AUTOMATIC, | |
| 99 WaitableEvent::InitialState::NOT_SIGNALED) { | |
| 100 DCHECK(outer_); | |
| 101 } | |
| 102 | |
| 103 void Initialize(ThreadPriority thread_priority) { | |
| 104 const size_t kDefaultStackSize = 0; | |
| 105 PlatformThread::CreateWithPriority(kDefaultStackSize, this, | |
| 106 &thread_handle_, thread_priority); | |
| 107 } | |
| 108 | |
| 109 PlatformThreadHandle thread_handle_; | |
| 110 | |
| 111 SchedulerWorkerThread* outer_; | |
| 112 | |
| 113 // Event signaled to wake up this SchedulerWorkerThread. | |
| 114 WaitableEvent wake_up_event_; | |
| 115 | |
| 116 DISALLOW_COPY_AND_ASSIGN(Worker); | |
| 117 }; | |
| 118 | |
| 17 std::unique_ptr<SchedulerWorkerThread> SchedulerWorkerThread::Create( | 119 std::unique_ptr<SchedulerWorkerThread> SchedulerWorkerThread::Create( |
| 18 ThreadPriority thread_priority, | 120 ThreadPriority thread_priority, |
| 19 std::unique_ptr<Delegate> delegate, | 121 std::unique_ptr<Delegate> delegate, |
| 20 TaskTracker* task_tracker) { | 122 TaskTracker* task_tracker, |
| 123 InitialWorkerState worker_state) { | |
| 21 std::unique_ptr<SchedulerWorkerThread> worker_thread( | 124 std::unique_ptr<SchedulerWorkerThread> worker_thread( |
| 22 new SchedulerWorkerThread(thread_priority, std::move(delegate), | 125 new SchedulerWorkerThread(thread_priority, std::move(delegate), |
| 23 task_tracker)); | 126 task_tracker)); |
| 127 // Creation is single-threaded, so no synchronization is necessary. | |
| 128 if (worker_state == SchedulerWorkerThread::InitialWorkerState::ALIVE) { | |
| 129 worker_thread->CreateWorker(); | |
| 130 if (!worker_thread->worker_) { | |
| 131 return nullptr; | |
| 132 } | |
| 133 } | |
| 24 | 134 |
| 25 if (worker_thread->thread_handle_.is_null()) | |
| 26 return nullptr; | |
| 27 return worker_thread; | 135 return worker_thread; |
| 28 } | 136 } |
| 29 | 137 |
| 30 SchedulerWorkerThread::~SchedulerWorkerThread() { | 138 SchedulerWorkerThread::~SchedulerWorkerThread() { |
| 31 DCHECK(ShouldExitForTesting()); | 139 DCHECK(ShouldExitForTesting() || !worker_); |
| 32 } | 140 } |
| 33 | 141 |
| 34 void SchedulerWorkerThread::WakeUp() { | 142 void SchedulerWorkerThread::WakeUp() { |
| 35 wake_up_event_.Signal(); | 143 AutoSchedulerLock auto_lock(worker_lock_); |
| 144 if (!worker_) { | |
| 145 CreateWorkerAssertSynchronized(); | |
| 146 } | |
| 147 worker_->WakeUp(); | |
| 36 } | 148 } |
| 37 | 149 |
| 38 void SchedulerWorkerThread::JoinForTesting() { | 150 void SchedulerWorkerThread::JoinForTesting() { |
| 39 { | 151 { |
| 40 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 152 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
| 41 should_exit_for_testing_ = true; | 153 should_exit_for_testing_ = true; |
| 42 } | 154 } |
| 43 WakeUp(); | 155 WakeUp(); |
| 44 PlatformThread::Join(thread_handle_); | 156 |
| 157 // Normally holding a lock and joining is dangerous. However, since this is | |
| 158 // only for testing, we're okay since the only scenario that could impact this | |
| 159 // is a call to Detach, which is disallowed by having the delegate always | |
| 160 // return false for the CanDetach call. | |
| 161 AutoSchedulerLock auto_lock(worker_lock_); | |
| 162 if (worker_) | |
| 163 worker_->Join(); | |
| 164 } | |
| 165 | |
| 166 bool SchedulerWorkerThread::WorkerAliveForTesting() { | |
| 167 return !!worker_; | |
|
fdoray
2016/06/08 17:51:48
AutoSchedulerLock auto_lock(worker_lock_);
robliao
2016/06/08 19:00:08
Done.
| |
| 45 } | 168 } |
| 46 | 169 |
| 47 SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, | 170 SchedulerWorkerThread::SchedulerWorkerThread(ThreadPriority thread_priority, |
| 48 std::unique_ptr<Delegate> delegate, | 171 std::unique_ptr<Delegate> delegate, |
| 49 TaskTracker* task_tracker) | 172 TaskTracker* task_tracker) |
| 50 : wake_up_event_(WaitableEvent::ResetPolicy::AUTOMATIC, | 173 : thread_priority_(thread_priority), |
| 51 WaitableEvent::InitialState::NOT_SIGNALED), | |
| 52 delegate_(std::move(delegate)), | 174 delegate_(std::move(delegate)), |
| 53 task_tracker_(task_tracker) { | 175 task_tracker_(task_tracker) { |
| 54 DCHECK(delegate_); | 176 DCHECK(delegate_); |
| 55 DCHECK(task_tracker_); | 177 DCHECK(task_tracker_); |
| 56 | |
| 57 const size_t kDefaultStackSize = 0; | |
| 58 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, | |
| 59 thread_priority); | |
| 60 } | 178 } |
| 61 | 179 |
| 62 void SchedulerWorkerThread::ThreadMain() { | 180 std::unique_ptr<SchedulerWorkerThread::Worker> SchedulerWorkerThread::Detach() { |
|
fdoray
2016/06/08 17:51:48
DCHECK(!ShouldExitForTesting()); before acquiring
robliao
2016/06/08 19:00:08
Done.
| |
| 63 delegate_->OnMainEntry(this); | 181 AutoSchedulerLock auto_lock(worker_lock_); |
| 182 // If a wakeup is pending, then a WakeUp came in while we were deciding to | |
| 183 // detach. This means we can't go away anymore since a single threaded task | |
| 184 // could have woken us up. | |
|
fdoray
2016/06/08 17:51:48
It should be the responsibility of the delegate to
robliao
2016/06/08 19:00:08
Added a note about single-threaded work to CanDeta
| |
| 185 return worker_->IsWakeUpPending() ? nullptr : std::move(worker_); | |
| 186 } | |
| 64 | 187 |
| 65 // A SchedulerWorkerThread starts out sleeping. | 188 void SchedulerWorkerThread::CreateWorker() { |
| 66 wake_up_event_.Wait(); | 189 worker_ = Worker::Create(this, thread_priority_); |
| 190 } | |
| 67 | 191 |
| 68 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { | 192 void SchedulerWorkerThread::CreateWorkerAssertSynchronized() { |
| 69 // Get the sequence containing the next task to execute. | 193 worker_lock_.AssertAcquired(); |
| 70 scoped_refptr<Sequence> sequence = delegate_->GetWork(this); | 194 CreateWorker(); |
| 71 | |
| 72 if (!sequence) { | |
| 73 TimeDelta sleep_time = delegate_->GetSleepTimeout(); | |
| 74 if (sleep_time.is_max()) { | |
| 75 // Calling TimedWait with TimeDelta::Max is not recommended per | |
| 76 // http://crbug.com/465948. | |
| 77 wake_up_event_.Wait(); | |
| 78 } else { | |
| 79 wake_up_event_.TimedWait(sleep_time); | |
| 80 } | |
| 81 continue; | |
| 82 } | |
| 83 | |
| 84 task_tracker_->RunTask(sequence->PeekTask()); | |
| 85 | |
| 86 const bool sequence_became_empty = sequence->PopTask(); | |
| 87 | |
| 88 // If |sequence| isn't empty immediately after the pop, re-enqueue it to | |
| 89 // maintain the invariant that a non-empty Sequence is always referenced by | |
| 90 // either a PriorityQueue or a SchedulerWorkerThread. If it is empty and | |
| 91 // there are live references to it, it will be enqueued when a Task is added | |
| 92 // to it. Otherwise, it will be destroyed at the end of this scope. | |
| 93 if (!sequence_became_empty) | |
| 94 delegate_->ReEnqueueSequence(std::move(sequence)); | |
| 95 | |
| 96 // Calling WakeUp() guarantees that this SchedulerWorkerThread will run | |
| 97 // Tasks from Sequences returned by the GetWork() method of |delegate_| | |
| 98 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break | |
| 99 // this invariant and avoids a useless loop iteration before going to sleep | |
| 100 // if WakeUp() is called while this SchedulerWorkerThread is awake. | |
| 101 wake_up_event_.Reset(); | |
| 102 } | |
| 103 } | 195 } |
| 104 | 196 |
| 105 bool SchedulerWorkerThread::ShouldExitForTesting() const { | 197 bool SchedulerWorkerThread::ShouldExitForTesting() const { |
| 106 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 198 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
| 107 return should_exit_for_testing_; | 199 return should_exit_for_testing_; |
| 108 } | 200 } |
| 109 | 201 |
| 110 } // namespace internal | 202 } // namespace internal |
| 111 } // namespace base | 203 } // namespace base |
| OLD | NEW |