OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "base/task_scheduler/worker_thread.h" | |
6 | |
7 #include <utility> | |
8 | |
9 #include "base/atomicops.h" | |
10 #include "base/bind.h" | |
11 #include "base/debug/task_annotator.h" | |
12 #include "base/logging.h" | |
13 #include "base/task_scheduler/delayed_task_manager.h" | |
14 #include "base/task_scheduler/priority_queue.h" | |
15 #include "base/task_scheduler/shutdown_manager.h" | |
16 #include "base/task_scheduler/utils.h" | |
17 #include "base/time/time.h" | |
18 #include "build/build_config.h" | |
19 | |
20 namespace base { | |
21 namespace internal { | |
22 | |
23 namespace { | |
24 | |
25 // A task runner that runs tasks on a single WorkerThread. | |
26 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { | |
robliao
2016/03/07 19:28:08
Working through the ChildTaskRunner, do these Task
gab
2016/03/07 19:31:37
They do, we will eventually need the underlying ty
| |
27 public: | |
28 // Tasks posted through this task runner have |traits| and are inserted in | |
29 // |single_thread_priority_queue|. |delayed_task_manager| is used to post | |
30 // delayed tasks. |shutdown_manager| is notified when a task is posted. | |
31 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
32 PriorityQueue* single_thread_priority_queue, | |
33 DelayedTaskManager* delayed_task_manager, | |
34 ShutdownManager* shutdown_manager); | |
35 | |
36 // SingleThreadTaskRunner: | |
37 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | |
38 const Closure& task, | |
39 TimeDelta delay) override; | |
40 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
41 const Closure& closure, | |
42 TimeDelta delay) override; | |
43 bool RunsTasksOnCurrentThread() const override; | |
44 | |
45 private: | |
46 ~SchedulerSingleThreadTaskRunner() override; | |
47 | |
48 TaskTraits traits_; | |
49 scoped_refptr<Sequence> sequence_; | |
50 PriorityQueue* priority_queue_; | |
51 DelayedTaskManager* delayed_task_manager_; | |
52 ShutdownManager* shutdown_manager_; | |
53 | |
54 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | |
55 }; | |
56 | |
57 SchedulerSingleThreadTaskRunner::SchedulerSingleThreadTaskRunner( | |
58 const TaskTraits& traits, | |
59 PriorityQueue* priority_queue, | |
60 DelayedTaskManager* delayed_task_manager, | |
61 ShutdownManager* shutdown_manager) | |
62 : traits_(traits), | |
63 sequence_(new Sequence), | |
64 priority_queue_(priority_queue), | |
65 delayed_task_manager_(delayed_task_manager), | |
66 shutdown_manager_(shutdown_manager) {} | |
67 | |
68 bool SchedulerSingleThreadTaskRunner::PostDelayedTask( | |
69 const tracked_objects::Location& from_here, | |
70 const Closure& closure, | |
71 TimeDelta delay) { | |
72 Task task(from_here, closure, traits_, TimeTicks::Now()); | |
73 if (!delay.is_zero()) | |
74 task.delayed_run_time = task.post_time + delay; | |
75 PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_, | |
76 delayed_task_manager_); | |
77 return true; | |
78 } | |
79 | |
80 bool SchedulerSingleThreadTaskRunner::RunsTasksOnCurrentThread() const { | |
81 // TODO(fdoray): Return true only if tasks posted may actually run on the | |
82 // current thread. It is valid, but not ideal, to always return true. | |
83 return true; | |
84 } | |
85 | |
86 bool SchedulerSingleThreadTaskRunner::PostNonNestableDelayedTask( | |
87 const tracked_objects::Location& from_here, | |
88 const Closure& task, | |
89 TimeDelta delay) { | |
90 return PostDelayedTask(from_here, task, delay); | |
91 } | |
92 | |
93 SchedulerSingleThreadTaskRunner::~SchedulerSingleThreadTaskRunner() = default; | |
94 | |
95 } // namespace | |
96 | |
97 scoped_ptr<WorkerThread> WorkerThread::CreateWorkerThread( | |
98 ThreadPriority thread_priority, | |
99 PriorityQueue* shared_priority_queue, | |
100 const MainEntryCallback& main_entry_callback, | |
101 const ReinsertSequenceCallback& reinsert_sequence_callback, | |
102 const BecomesIdleCallback& becomes_idle_callback, | |
103 DelayedTaskManager* delayed_task_manager, | |
104 ShutdownManager* shutdown_manager) { | |
105 scoped_ptr<WorkerThread> worker_thread(new WorkerThread( | |
106 thread_priority, shared_priority_queue, main_entry_callback, | |
107 reinsert_sequence_callback, becomes_idle_callback, delayed_task_manager, | |
108 shutdown_manager)); | |
109 return worker_thread->IsValid() ? std::move(worker_thread) | |
110 : scoped_ptr<WorkerThread>(); | |
111 } | |
112 | |
113 WorkerThread::~WorkerThread() { | |
114 DCHECK(thread_handle_.is_null()); | |
115 } | |
116 | |
117 void WorkerThread::WakeUp() { | |
118 wakeup_event_.Signal(); | |
119 } | |
120 | |
121 scoped_refptr<SingleThreadTaskRunner> WorkerThread::CreateTaskRunnerWithTraits( | |
122 const TaskTraits& traits, | |
123 ExecutionMode execution_mode) { | |
124 #if defined(OS_WIN) | |
125 DCHECK(execution_mode == ExecutionMode::SINGLE_THREADED); | |
126 #else | |
127 DCHECK(execution_mode == ExecutionMode::SINGLE_THREADED); | |
128 #endif // defined(OS_WIN) | |
129 | |
130 return scoped_refptr<SingleThreadTaskRunner>( | |
131 new SchedulerSingleThreadTaskRunner( | |
132 traits, &single_thread_priority_queue_, delayed_task_manager_, | |
133 shutdown_manager_)); | |
134 } | |
135 | |
136 bool WorkerThread::HasSingleThreadedTasks() const { | |
137 subtle::MemoryBarrier(); | |
138 return !single_thread_priority_queue_.UnsynchronizedEmpty() || | |
139 is_running_single_threaded_task_; | |
140 } | |
141 | |
142 void WorkerThread::ShutdownAndJoinForTesting() { | |
143 DCHECK(!thread_handle_.is_null()); | |
144 shutdown_manager_->Shutdown(); | |
145 WakeUp(); | |
146 PlatformThread::Join(thread_handle_); | |
147 thread_handle_ = PlatformThreadHandle(); | |
148 } | |
149 | |
150 WorkerThread::WorkerThread( | |
151 ThreadPriority thread_priority, | |
152 PriorityQueue* shared_priority_queue, | |
153 const MainEntryCallback& main_entry_callback, | |
154 const ReinsertSequenceCallback& reinsert_sequence_callback, | |
155 const BecomesIdleCallback& becomes_idle_callback, | |
156 DelayedTaskManager* delayed_task_manager, | |
157 ShutdownManager* shutdown_manager) | |
158 : wakeup_event_(false, false), | |
159 is_running_single_threaded_task_(false), | |
160 single_thread_priority_queue_( | |
161 Bind(&WorkerThread::WakeUp, Unretained(this)), | |
162 shared_priority_queue), | |
163 shared_priority_queue_(shared_priority_queue), | |
164 main_entry_callback_(main_entry_callback), | |
165 reinsert_sequence_callback_(reinsert_sequence_callback), | |
166 becomes_idle_callback_(becomes_idle_callback), | |
167 delayed_task_manager_(delayed_task_manager), | |
168 shutdown_manager_(shutdown_manager) { | |
169 DCHECK(shared_priority_queue_); | |
170 DCHECK(!reinsert_sequence_callback.is_null()); | |
171 DCHECK(!becomes_idle_callback.is_null()); | |
172 DCHECK(delayed_task_manager_); | |
173 DCHECK(shutdown_manager_); | |
174 | |
175 #if defined(OS_MACOSX) | |
176 // Mac only supports 2 priorities. crbug.com/554651 | |
177 if (thread_priority != ThreadPriority::NORMAL && | |
178 thread_priority != ThreadPriority::REALTIME_AUDIO) { | |
179 thread_priority = ThreadPriority::NORMAL; | |
180 } | |
181 #endif // defined(OS_MACOSX) | |
182 | |
183 const size_t kDefaultStackSize = 0; | |
184 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, | |
185 thread_priority); | |
186 } | |
187 | |
188 bool WorkerThread::IsValid() const { | |
189 return !thread_handle_.is_null(); | |
190 } | |
191 | |
192 scoped_refptr<Sequence> WorkerThread::GetWork() { | |
193 scoped_ptr<PriorityQueue::Transaction> shared_transaction( | |
194 shared_priority_queue_->BeginTransaction()); | |
195 SequenceSortKey shared_sort_key; | |
196 scoped_refptr<Sequence> shared_sequence = | |
197 shared_transaction->PeekSequence(&shared_sort_key); | |
198 | |
199 scoped_ptr<PriorityQueue::Transaction> single_thread_transaction( | |
200 single_thread_priority_queue_.BeginTransaction()); | |
201 SequenceSortKey single_thread_sort_key; | |
202 scoped_refptr<Sequence> single_thread_sequence = | |
203 single_thread_transaction->PeekSequence(&single_thread_sort_key); | |
204 | |
205 if (single_thread_sequence.get() == nullptr && | |
206 shared_sequence.get() == nullptr) { | |
207 return scoped_refptr<Sequence>(); | |
208 } | |
209 | |
210 if (single_thread_sequence.get() == nullptr || | |
211 (shared_sequence.get() != nullptr && | |
212 single_thread_sort_key < shared_sort_key)) { | |
213 shared_transaction->PopSequence(); | |
214 return shared_sequence; | |
215 } | |
216 | |
217 DCHECK(single_thread_sequence.get()); | |
218 | |
219 is_running_single_threaded_task_ = true; | |
220 single_thread_transaction->PopSequence(); | |
221 return single_thread_sequence; | |
222 } | |
223 | |
224 void WorkerThread::ReinsertSequenceInSingleThreadPriorityQueue( | |
225 scoped_refptr<Sequence> sequence) { | |
226 // Get the sort key of |sequence| before creating a priority queue | |
227 // transaction, to avoid holding 2 locks at the same time. | |
228 SequenceSortKey sort_key = sequence->GetSortKey(); | |
229 | |
230 // Insert the sequence in the single-thread priority queue. | |
231 single_thread_priority_queue_.BeginTransaction()->PushSequence(sequence, | |
232 sort_key); | |
233 } | |
234 | |
235 void WorkerThread::WaitUntilWorkIsAvailable() { | |
236 const TimeTicks next_delayed_task_ready_time = | |
237 delayed_task_manager_->GetNextDelayedRunTime(); | |
238 | |
239 if (next_delayed_task_ready_time.is_null()) { | |
240 // There is no delayed tasks. Wait until |wakeup_event_| is signaled. | |
241 wakeup_event_.Wait(); | |
242 } else { | |
243 // There is delayed tasks. Wait until either a delayed task becomes ready | |
244 // for execution or |wakeup_event_| is signaled. Note: Multiple threads | |
245 // sharing the same DelayedTaskManager may wake up at the same time when a | |
246 // delayed task becomes ready for execution. This isn't optimal. However, | |
247 // since most delayed tasks should be posted to BACKGROUND thread pools | |
248 // (which have a single thread), this behavior shouldn't occur frequently. | |
249 const TimeDelta wait_time = next_delayed_task_ready_time - TimeTicks::Now(); | |
250 if (wait_time.InMilliseconds() > 0) | |
251 wakeup_event_.TimedWait(wait_time); | |
252 } | |
253 } | |
254 | |
255 void WorkerThread::ThreadMain() { | |
256 main_entry_callback_.Run(); | |
257 while (!shutdown_manager_->shutdown_completed()) { | |
258 // Get the sequence containing the next task to execute. | |
259 scoped_refptr<Sequence> sequence = GetWork(); | |
260 if (sequence.get() == nullptr) { | |
261 // Add the thread to the stack of idle threads of the parent thread pool. | |
262 becomes_idle_callback_.Run(this); | |
263 | |
264 // Check one more time whether there is pending work. Without this, it | |
265 // could be that work has been added to |shared_priority_queue_| after the | |
266 // first call to GetWork() and before this thread was added to the stack | |
267 // of idle threads. In such a case, |wake_up_event_| hasn't been signaled | |
268 // because this thread wasn't in the stack of idle threads. However, this | |
269 // thread is needed to execute the newly added work. | |
270 sequence = GetWork(); | |
271 | |
272 if (sequence.get() == nullptr) { | |
273 WaitUntilWorkIsAvailable(); | |
274 sequence = GetWork(); | |
275 } | |
276 } | |
277 | |
278 if (sequence.get() != nullptr) { | |
279 // Peek the next task in the sequence. | |
280 const Task* task = sequence->PeekTask(); | |
281 const TaskShutdownBehavior shutdown_behavior = | |
282 task->traits.shutdown_behavior(); | |
283 | |
284 // Run the task. | |
285 if (shutdown_manager_->ShouldScheduleTask(shutdown_behavior)) { | |
286 debug::TaskAnnotator task_annotator; | |
287 task_annotator.RunTask(kQueueFunctionName, *task); | |
288 shutdown_manager_->DidExecuteTask(shutdown_behavior); | |
289 } | |
290 | |
291 // Pop the task from the sequence. | |
292 size_t new_num_tasks_in_sequence; | |
293 sequence->PopTask(&new_num_tasks_in_sequence); | |
294 | |
295 // Put the sequence back in the appropriate priority queue. | |
296 if (new_num_tasks_in_sequence > 0) { | |
297 if (is_running_single_threaded_task_) | |
298 ReinsertSequenceInSingleThreadPriorityQueue(sequence); | |
299 else | |
300 reinsert_sequence_callback_.Run(sequence, this); | |
301 } | |
302 | |
303 // Note that the thread is no longer running a single-threaded task. | |
304 is_running_single_threaded_task_ = false; | |
305 } | |
306 | |
307 // Post delayed tasks that are ready for execution. | |
308 delayed_task_manager_->PostReadyTasks(); | |
309 } | |
310 } | |
311 | |
312 } // namespace internal | |
313 } // namespace base | |
OLD | NEW |