OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "base/task_scheduler/scheduler_worker.h" | 5 #include "base/task_scheduler/scheduler_worker.h" |
6 | 6 |
7 #include <stddef.h> | 7 #include <stddef.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
11 #include "base/logging.h" | 11 #include "base/logging.h" |
12 #include "base/task_scheduler/task_tracker.h" | 12 #include "base/task_scheduler/task_tracker.h" |
13 | 13 |
14 namespace base { | 14 namespace base { |
15 namespace internal { | 15 namespace internal { |
16 | 16 |
17 class SchedulerWorker::Thread : public PlatformThread::Delegate { | |
18 public: | |
19 ~Thread() override = default; | |
20 | |
21 static std::unique_ptr<Thread> Create(SchedulerWorker* outer) { | |
22 std::unique_ptr<Thread> thread(new Thread(outer)); | |
23 thread->Initialize(); | |
24 if (thread->thread_handle_.is_null()) | |
25 return nullptr; | |
26 return thread; | |
27 } | |
28 | |
29 // PlatformThread::Delegate. | |
30 void ThreadMain() override { | |
31 // Set if this thread was detached. | |
32 std::unique_ptr<Thread> detached_thread; | |
33 | |
34 outer_->delegate_->OnMainEntry(outer_); | |
35 | |
36 // A SchedulerWorker starts out waiting for work. | |
37 WaitForWork(); | |
38 | |
39 while (!outer_->task_tracker_->shutdown_completed() && | |
40 !outer_->ShouldExitForTesting()) { | |
41 DCHECK(outer_); | |
42 // Get the sequence containing the next task to execute. | |
43 scoped_refptr<Sequence> sequence = outer_->delegate_->GetWork(outer_); | |
44 if (!sequence) { | |
45 if (outer_->delegate_->CanDetach(outer_)) { | |
46 detached_thread = outer_->Detach(); | |
47 if (detached_thread) { | |
48 DCHECK_EQ(detached_thread.get(), this); | |
49 PlatformThread::Detach(thread_handle_); | |
50 outer_ = nullptr; | |
51 break; | |
52 } | |
53 } | |
54 WaitForWork(); | |
55 continue; | |
56 } | |
57 | |
58 outer_->task_tracker_->RunTask(sequence->PeekTask()); | |
59 | |
60 const bool sequence_became_empty = sequence->PopTask(); | |
61 | |
62 // If |sequence| isn't empty immediately after the pop, re-enqueue it to | |
63 // maintain the invariant that a non-empty Sequence is always referenced | |
64 // by either a PriorityQueue or a SchedulerWorker. If it is empty | |
65 // and there are live references to it, it will be enqueued when a Task is | |
66 // added to it. Otherwise, it will be destroyed at the end of this scope. | |
67 if (!sequence_became_empty) | |
68 outer_->delegate_->ReEnqueueSequence(std::move(sequence)); | |
69 | |
70 // Calling WakeUp() guarantees that this SchedulerWorker will run | |
71 // Tasks from Sequences returned by the GetWork() method of |delegate_| | |
72 // until it returns nullptr. Resetting |wake_up_event_| here doesn't break | |
73 // this invariant and avoids a useless loop iteration before going to | |
74 // sleep if WakeUp() is called while this SchedulerWorker is awake. | |
75 wake_up_event_.Reset(); | |
76 } | |
77 | |
78 // If a wake up is pending and we successfully detached, somehow |outer_| | |
79 // was able to signal us which means it probably thinks we're still alive. | |
80 // This is bad as it will cause the WakeUp to no-op and |outer_| will be | |
81 // stuck forever. | |
82 DCHECK(!detached_thread || !IsWakeUpPending()) << | |
83 "This thread was detached and woken up at the same time."; | |
84 } | |
85 | |
86 void Join() { PlatformThread::Join(thread_handle_); } | |
87 | |
88 void WakeUp() { wake_up_event_.Signal(); } | |
89 | |
90 bool IsWakeUpPending() { return wake_up_event_.IsSignaled(); } | |
91 | |
92 private: | |
93 Thread(SchedulerWorker* outer) | |
94 : outer_(outer), | |
95 wake_up_event_(WaitableEvent::ResetPolicy::MANUAL, | |
96 WaitableEvent::InitialState::NOT_SIGNALED) { | |
97 DCHECK(outer_); | |
98 } | |
99 | |
100 void Initialize() { | |
101 constexpr size_t kDefaultStackSize = 0; | |
102 PlatformThread::CreateWithPriority(kDefaultStackSize, this, | |
103 &thread_handle_, | |
104 outer_->thread_priority_); | |
105 } | |
106 | |
107 void WaitForWork() { | |
108 DCHECK(outer_); | |
109 const TimeDelta sleep_time = outer_->delegate_->GetSleepTimeout(); | |
110 if (sleep_time.is_max()) { | |
111 // Calling TimedWait with TimeDelta::Max is not recommended per | |
112 // http://crbug.com/465948. | |
113 wake_up_event_.Wait(); | |
114 } else { | |
115 wake_up_event_.TimedWait(sleep_time); | |
116 } | |
117 wake_up_event_.Reset(); | |
118 } | |
119 | |
120 PlatformThreadHandle thread_handle_; | |
121 | |
122 SchedulerWorker* outer_; | |
123 | |
124 // Event signaled to wake up this thread. | |
125 WaitableEvent wake_up_event_; | |
126 | |
127 DISALLOW_COPY_AND_ASSIGN(Thread); | |
128 }; | |
129 | |
17 std::unique_ptr<SchedulerWorker> SchedulerWorker::Create( | 130 std::unique_ptr<SchedulerWorker> SchedulerWorker::Create( |
18 ThreadPriority thread_priority, | 131 ThreadPriority thread_priority, |
19 std::unique_ptr<Delegate> delegate, | 132 std::unique_ptr<Delegate> delegate, |
20 TaskTracker* task_tracker) { | 133 TaskTracker* task_tracker, |
134 InitialState initial_state) { | |
21 std::unique_ptr<SchedulerWorker> worker( | 135 std::unique_ptr<SchedulerWorker> worker( |
22 new SchedulerWorker(thread_priority, std::move(delegate), | 136 new SchedulerWorker(thread_priority, std::move(delegate), task_tracker)); |
23 task_tracker)); | 137 // Creation happens before any other thread can reference this one, so no |
138 // synchronization is necessary. | |
139 if (initial_state == SchedulerWorker::InitialState::ALIVE) { | |
140 worker->CreateThread(); | |
141 if (!worker->thread_) { | |
142 return nullptr; | |
143 } | |
144 } | |
24 | 145 |
25 if (worker->thread_handle_.is_null()) | |
26 return nullptr; | |
27 return worker; | 146 return worker; |
28 } | 147 } |
29 | 148 |
30 SchedulerWorker::~SchedulerWorker() { | 149 SchedulerWorker::~SchedulerWorker() { |
31 DCHECK(ShouldExitForTesting()); | 150 // It is unexpected for |thread_| to be alive and for SchedulerWorker to |
151 // destroy since SchedulerWorker owns the delegate needed by |thread_|. | |
152 // For testing, this generally means JoinForTesting was not called. | |
153 DCHECK(!thread_); | |
32 } | 154 } |
33 | 155 |
34 void SchedulerWorker::WakeUp() { | 156 void SchedulerWorker::WakeUp() { |
35 wake_up_event_.Signal(); | 157 AutoSchedulerLock auto_lock(thread_lock_); |
158 if (!thread_) | |
159 CreateThreadAssertSynchronized(); | |
160 | |
161 if (thread_) | |
162 thread_->WakeUp(); | |
36 } | 163 } |
37 | 164 |
38 void SchedulerWorker::JoinForTesting() { | 165 void SchedulerWorker::JoinForTesting() { |
39 { | 166 { |
40 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 167 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
41 should_exit_for_testing_ = true; | 168 should_exit_for_testing_ = true; |
42 } | 169 } |
43 WakeUp(); | 170 WakeUp(); |
44 PlatformThread::Join(thread_handle_); | 171 |
172 // Normally holding a lock and joining is dangerous. However, since this is | |
173 // only for testing, we're okay since the only scenario that could impact this | |
174 // is a call to Detach, which is disallowed by having the delegate always | |
175 // return false for the CanDetach call. | |
176 AutoSchedulerLock auto_lock(thread_lock_); | |
177 if (thread_) | |
178 thread_->Join(); | |
179 | |
180 thread_.reset(); | |
181 } | |
182 | |
183 bool SchedulerWorker::ThreadAliveForTesting() const { | |
184 AutoSchedulerLock auto_lock(thread_lock_); | |
185 return !!thread_; | |
45 } | 186 } |
46 | 187 |
47 SchedulerWorker::SchedulerWorker(ThreadPriority thread_priority, | 188 SchedulerWorker::SchedulerWorker(ThreadPriority thread_priority, |
48 std::unique_ptr<Delegate> delegate, | 189 std::unique_ptr<Delegate> delegate, |
49 TaskTracker* task_tracker) | 190 TaskTracker* task_tracker) |
50 : wake_up_event_(WaitableEvent::ResetPolicy::AUTOMATIC, | 191 : thread_priority_(thread_priority), |
51 WaitableEvent::InitialState::NOT_SIGNALED), | |
52 delegate_(std::move(delegate)), | 192 delegate_(std::move(delegate)), |
53 task_tracker_(task_tracker) { | 193 task_tracker_(task_tracker) { |
54 DCHECK(delegate_); | 194 DCHECK(delegate_); |
55 DCHECK(task_tracker_); | 195 DCHECK(task_tracker_); |
56 | |
57 const size_t kDefaultStackSize = 0; | |
58 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, | |
59 thread_priority); | |
60 } | 196 } |
61 | 197 |
62 void SchedulerWorker::ThreadMain() { | 198 std::unique_ptr<SchedulerWorker::Thread> SchedulerWorker::Detach() { |
63 delegate_->OnMainEntry(this); | 199 DCHECK(!ShouldExitForTesting()) << "Worker was already joined"; |
200 AutoSchedulerLock auto_lock(thread_lock_); | |
201 // If a wakeup is pending, then a WakeUp came in while we were deciding to | |
fdoray
2016/06/22 18:22:43
WakeUp()
robliao
2016/06/22 19:24:33
Done.
| |
202 // detach. This means we can't go away anymore since we would break the | |
203 // guarantee that we call GetWork after a successful wakeup. | |
fdoray
2016/06/22 18:22:43
GetWork()
robliao
2016/06/22 19:24:33
Done.
| |
204 return thread_->IsWakeUpPending() ? nullptr : std::move(thread_); | |
205 } | |
64 | 206 |
65 // A SchedulerWorker starts out sleeping. | 207 void SchedulerWorker::CreateThread() { |
66 wake_up_event_.Wait(); | 208 thread_ = Thread::Create(this); |
209 } | |
67 | 210 |
68 while (!task_tracker_->shutdown_completed() && !ShouldExitForTesting()) { | 211 void SchedulerWorker::CreateThreadAssertSynchronized() { |
69 // Get the sequence containing the next task to execute. | 212 thread_lock_.AssertAcquired(); |
70 scoped_refptr<Sequence> sequence = delegate_->GetWork(this); | 213 CreateThread(); |
71 | |
72 if (!sequence) { | |
73 TimeDelta sleep_time = delegate_->GetSleepTimeout(); | |
74 if (sleep_time.is_max()) { | |
75 // Calling TimedWait with TimeDelta::Max is not recommended per | |
76 // http://crbug.com/465948. | |
77 wake_up_event_.Wait(); | |
78 } else { | |
79 wake_up_event_.TimedWait(sleep_time); | |
80 } | |
81 continue; | |
82 } | |
83 | |
84 task_tracker_->RunTask(sequence->PeekTask()); | |
85 | |
86 const bool sequence_became_empty = sequence->PopTask(); | |
87 | |
88 // If |sequence| isn't empty immediately after the pop, re-enqueue it to | |
89 // maintain the invariant that a non-empty Sequence is always referenced by | |
90 // either a PriorityQueue or a SchedulerWorker. If it is empty and there are | |
91 // live references to it, it will be enqueued when a Task is added to it. | |
92 // Otherwise, it will be destroyed at the end of this scope. | |
93 if (!sequence_became_empty) | |
94 delegate_->ReEnqueueSequence(std::move(sequence)); | |
95 | |
96 // Calling WakeUp() guarantees that this SchedulerWorker will run Tasks from | |
97 // Sequences returned by the GetWork() method of |delegate_| until it | |
98 // returns nullptr. Resetting |wake_up_event_| here doesn't break this | |
99 // invariant and avoids a useless loop iteration before going to sleep if | |
100 // WakeUp() is called while this SchedulerWorker is awake. | |
101 wake_up_event_.Reset(); | |
102 } | |
103 } | 214 } |
104 | 215 |
105 bool SchedulerWorker::ShouldExitForTesting() const { | 216 bool SchedulerWorker::ShouldExitForTesting() const { |
106 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); | 217 AutoSchedulerLock auto_lock(should_exit_for_testing_lock_); |
107 return should_exit_for_testing_; | 218 return should_exit_for_testing_; |
108 } | 219 } |
109 | 220 |
110 } // namespace internal | 221 } // namespace internal |
111 } // namespace base | 222 } // namespace base |
OLD | NEW |