Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Side by Side Diff: base/task_scheduler/worker_thread.cc

Issue 1698183005: Reference CL for the new task scheduler. (Closed) Base URL: https://luckyluke-private.googlesource.com/src@bigmaster2
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « base/task_scheduler/worker_thread.h ('k') | base/task_scheduler/worker_thread_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/worker_thread.h"
6
7 #include <utility>
8
9 #include "base/atomicops.h"
10 #include "base/bind.h"
11 #include "base/debug/task_annotator.h"
12 #include "base/logging.h"
13 #include "base/task_scheduler/delayed_task_manager.h"
14 #include "base/task_scheduler/priority_queue.h"
15 #include "base/task_scheduler/shutdown_manager.h"
16 #include "base/task_scheduler/utils.h"
17 #include "base/time/time.h"
18 #include "build/build_config.h"
19
20 namespace base {
21 namespace internal {
22
23 namespace {
24
25 // A task runner that runs tasks on a single WorkerThread.
26 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
27 public:
28 // Tasks posted through this task runner have |traits| and are inserted in
29 // |single_thread_priority_queue|. |delayed_task_manager| is used to post
30 // delayed tasks. |shutdown_manager| is notified when a task is posted.
31 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
32 PriorityQueue* single_thread_priority_queue,
33 DelayedTaskManager* delayed_task_manager,
34 ShutdownManager* shutdown_manager);
35
36 // SingleThreadTaskRunner:
37 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
38 const Closure& task,
39 TimeDelta delay) override;
40 bool PostDelayedTask(const tracked_objects::Location& from_here,
41 const Closure& closure,
42 TimeDelta delay) override;
43 bool RunsTasksOnCurrentThread() const override;
44
45 private:
46 ~SchedulerSingleThreadTaskRunner() override;
47
48 TaskTraits traits_;
49 scoped_refptr<Sequence> sequence_;
50 PriorityQueue* priority_queue_;
51 DelayedTaskManager* delayed_task_manager_;
52 ShutdownManager* shutdown_manager_;
53
54 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
55 };
56
57 SchedulerSingleThreadTaskRunner::SchedulerSingleThreadTaskRunner(
58 const TaskTraits& traits,
59 PriorityQueue* priority_queue,
60 DelayedTaskManager* delayed_task_manager,
61 ShutdownManager* shutdown_manager)
62 : traits_(traits),
63 sequence_(new Sequence),
64 priority_queue_(priority_queue),
65 delayed_task_manager_(delayed_task_manager),
66 shutdown_manager_(shutdown_manager) {}
67
68 bool SchedulerSingleThreadTaskRunner::PostDelayedTask(
69 const tracked_objects::Location& from_here,
70 const Closure& closure,
71 TimeDelta delay) {
72 Task task(from_here, closure, traits_, TimeTicks::Now());
73 if (!delay.is_zero())
74 task.delayed_run_time = task.post_time + delay;
75 PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_,
76 delayed_task_manager_);
77 return true;
78 }
79
80 bool SchedulerSingleThreadTaskRunner::RunsTasksOnCurrentThread() const {
81 // TODO(fdoray): Return true only if tasks posted may actually run on the
82 // current thread. It is valid, but not ideal, to always return true.
83 return true;
84 }
85
86 bool SchedulerSingleThreadTaskRunner::PostNonNestableDelayedTask(
87 const tracked_objects::Location& from_here,
88 const Closure& task,
89 TimeDelta delay) {
90 return PostDelayedTask(from_here, task, delay);
91 }
92
93 SchedulerSingleThreadTaskRunner::~SchedulerSingleThreadTaskRunner() = default;
94
95 } // namespace
96
97 scoped_ptr<WorkerThread> WorkerThread::CreateWorkerThread(
98 ThreadPriority thread_priority,
99 PriorityQueue* shared_priority_queue,
100 const MainEntryCallback& main_entry_callback,
101 const ReinsertSequenceCallback& reinsert_sequence_callback,
102 const BecomesIdleCallback& becomes_idle_callback,
103 DelayedTaskManager* delayed_task_manager,
104 ShutdownManager* shutdown_manager) {
105 scoped_ptr<WorkerThread> worker_thread(new WorkerThread(
106 thread_priority, shared_priority_queue, main_entry_callback,
107 reinsert_sequence_callback, becomes_idle_callback, delayed_task_manager,
108 shutdown_manager));
109 return worker_thread->IsValid() ? std::move(worker_thread)
110 : scoped_ptr<WorkerThread>();
111 }
112
113 WorkerThread::~WorkerThread() {
114 DCHECK(thread_handle_.is_null());
115 }
116
117 void WorkerThread::WakeUp() {
118 wakeup_event_.Signal();
119 }
120
121 scoped_refptr<SingleThreadTaskRunner> WorkerThread::CreateTaskRunnerWithTraits(
122 const TaskTraits& traits,
123 ExecutionMode execution_mode) {
124 #if defined(OS_WIN)
125 DCHECK(execution_mode == ExecutionMode::SINGLE_THREADED ||
126 execution_mode == ExecutionMode::SINGLE_THREADED_COM_STA);
127 #else
128 DCHECK(execution_mode == ExecutionMode::SINGLE_THREADED);
129 #endif // defined(OS_WIN)
130
131 return scoped_refptr<SingleThreadTaskRunner>(
132 new SchedulerSingleThreadTaskRunner(
133 traits, &single_thread_priority_queue_, delayed_task_manager_,
134 shutdown_manager_));
135 }
136
137 bool WorkerThread::HasSingleThreadedTasks() const {
138 subtle::MemoryBarrier();
139 return !single_thread_priority_queue_.UnsynchronizedEmpty() ||
140 is_running_single_threaded_task_;
141 }
142
143 void WorkerThread::ShutdownAndJoinForTesting() {
144 DCHECK(!thread_handle_.is_null());
145 shutdown_manager_->Shutdown();
146 WakeUp();
147 PlatformThread::Join(thread_handle_);
148 thread_handle_ = PlatformThreadHandle();
149 }
150
151 WorkerThread::WorkerThread(
152 ThreadPriority thread_priority,
153 PriorityQueue* shared_priority_queue,
154 const MainEntryCallback& main_entry_callback,
155 const ReinsertSequenceCallback& reinsert_sequence_callback,
156 const BecomesIdleCallback& becomes_idle_callback,
157 DelayedTaskManager* delayed_task_manager,
158 ShutdownManager* shutdown_manager)
159 : wakeup_event_(false, false),
160 is_running_single_threaded_task_(false),
161 single_thread_priority_queue_(
162 Bind(&WorkerThread::WakeUp, Unretained(this)),
163 shared_priority_queue),
164 shared_priority_queue_(shared_priority_queue),
165 main_entry_callback_(main_entry_callback),
166 reinsert_sequence_callback_(reinsert_sequence_callback),
167 becomes_idle_callback_(becomes_idle_callback),
168 delayed_task_manager_(delayed_task_manager),
169 shutdown_manager_(shutdown_manager) {
170 DCHECK(shared_priority_queue_);
171 DCHECK(!reinsert_sequence_callback.is_null());
172 DCHECK(!becomes_idle_callback.is_null());
173 DCHECK(delayed_task_manager_);
174 DCHECK(shutdown_manager_);
175
176 #if defined(OS_MACOSX)
177 // Mac only supports 2 priorities. crbug.com/554651
178 if (thread_priority != ThreadPriority::NORMAL &&
179 thread_priority != ThreadPriority::REALTIME_AUDIO) {
180 thread_priority = ThreadPriority::NORMAL;
181 }
182 #endif // defined(OS_MACOSX)
183
184 const size_t kDefaultStackSize = 0;
185 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
186 thread_priority);
187 }
188
189 bool WorkerThread::IsValid() const {
190 return !thread_handle_.is_null();
191 }
192
193 scoped_refptr<Sequence> WorkerThread::GetWork() {
194 scoped_ptr<PriorityQueue::Transaction> shared_transaction(
195 shared_priority_queue_->BeginTransaction());
196 SequenceSortKey shared_sort_key;
197 scoped_refptr<Sequence> shared_sequence =
198 shared_transaction->PeekSequence(&shared_sort_key);
199
200 scoped_ptr<PriorityQueue::Transaction> single_thread_transaction(
201 single_thread_priority_queue_.BeginTransaction());
202 SequenceSortKey single_thread_sort_key;
203 scoped_refptr<Sequence> single_thread_sequence =
204 single_thread_transaction->PeekSequence(&single_thread_sort_key);
205
206 if (single_thread_sequence.get() == nullptr &&
207 shared_sequence.get() == nullptr) {
208 return scoped_refptr<Sequence>();
209 }
210
211 if (single_thread_sequence.get() == nullptr ||
212 (shared_sequence.get() != nullptr &&
213 single_thread_sort_key < shared_sort_key)) {
214 shared_transaction->PopSequence();
215 return shared_sequence;
216 }
217
218 DCHECK(single_thread_sequence.get());
219
220 is_running_single_threaded_task_ = true;
221 single_thread_transaction->PopSequence();
222 return single_thread_sequence;
223 }
224
225 void WorkerThread::ReinsertSequenceInSingleThreadPriorityQueue(
226 scoped_refptr<Sequence> sequence) {
227 // Get the sort key of |sequence| before creating a priority queue
228 // transaction, to avoid holding 2 locks at the same time.
229 SequenceSortKey sort_key = sequence->GetSortKey();
230
231 // Insert the sequence in the single-thread priority queue.
232 single_thread_priority_queue_.BeginTransaction()->PushSequence(sequence,
233 sort_key);
234 }
235
236 void WorkerThread::WaitUntilWorkIsAvailable() {
237 const TimeTicks next_delayed_task_ready_time =
238 delayed_task_manager_->GetNextDelayedRunTime();
239
240 if (next_delayed_task_ready_time.is_null()) {
241 // There is no delayed tasks. Wait until |wakeup_event_| is signaled.
242 wakeup_event_.Wait();
243 } else {
244 // There is delayed tasks. Wait until either a delayed task becomes ready
245 // for execution or |wakeup_event_| is signaled. Note: Multiple threads
246 // sharing the same DelayedTaskManager may wake up at the same time when a
247 // delayed task becomes ready for execution. This isn't optimal. However,
248 // since most delayed tasks should be posted to BACKGROUND thread pools
249 // (which have a single thread), this behavior shouldn't occur frequently.
250 const TimeDelta wait_time = next_delayed_task_ready_time - TimeTicks::Now();
251 if (wait_time.InMilliseconds() > 0)
252 wakeup_event_.TimedWait(wait_time);
253 }
254 }
255
256 void WorkerThread::ThreadMain() {
257 main_entry_callback_.Run();
258 while (!shutdown_manager_->shutdown_completed()) {
259 // Get the sequence containing the next task to execute.
260 scoped_refptr<Sequence> sequence = GetWork();
261 if (sequence.get() == nullptr) {
262 // Add the thread to the stack of idle threads of the parent thread pool.
263 becomes_idle_callback_.Run(this);
264
265 // Check one more time whether there is pending work. Without this, it
266 // could be that work has been added to |shared_priority_queue_| after the
267 // first call to GetWork() and before this thread was added to the stack
268 // of idle threads. In such a case, |wake_up_event_| hasn't been signaled
269 // because this thread wasn't in the stack of idle threads. However, this
270 // thread is needed to execute the newly added work.
271 sequence = GetWork();
272
273 if (sequence.get() == nullptr) {
274 WaitUntilWorkIsAvailable();
275 sequence = GetWork();
276 }
277 }
278
279 if (sequence.get() != nullptr) {
280 // Peek the next task in the sequence.
281 const Task* task = sequence->PeekTask();
282 const TaskShutdownBehavior shutdown_behavior =
283 task->traits.shutdown_behavior();
284
285 // Run the task.
286 if (shutdown_manager_->ShouldScheduleTask(shutdown_behavior)) {
287 debug::TaskAnnotator task_annotator;
288 task_annotator.RunTask(kQueueFunctionName, *task);
289 shutdown_manager_->DidExecuteTask(shutdown_behavior);
290 }
291
292 // Pop the task from the sequence.
293 size_t new_num_tasks_in_sequence;
294 sequence->PopTask(&new_num_tasks_in_sequence);
295
296 // Put the sequence back in the appropriate priority queue.
297 if (new_num_tasks_in_sequence > 0) {
298 if (is_running_single_threaded_task_)
299 ReinsertSequenceInSingleThreadPriorityQueue(sequence);
300 else
301 reinsert_sequence_callback_.Run(sequence, this);
302 }
303
304 // Note that the thread is no longer running a single-threaded task.
305 is_running_single_threaded_task_ = false;
306 }
307
308 // Post delayed tasks that are ready for execution.
309 delayed_task_manager_->PostReadyTasks();
310 }
311 }
312
313 } // namespace internal
314 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/worker_thread.h ('k') | base/task_scheduler/worker_thread_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698