Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(113)

Side by Side Diff: base/task_scheduler/worker_thread.cc

Issue 1704113002: TaskScheduler [6] SchedulerWorkerThread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@s_4_shutdown
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « base/task_scheduler/worker_thread.h ('k') | base/task_scheduler/worker_thread_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/worker_thread.h"
6
7 #include <utility>
8
9 #include "base/atomicops.h"
10 #include "base/bind.h"
11 #include "base/debug/task_annotator.h"
12 #include "base/logging.h"
13 #include "base/task_scheduler/delayed_task_manager.h"
14 #include "base/task_scheduler/priority_queue.h"
15 #include "base/task_scheduler/shutdown_manager.h"
16 #include "base/task_scheduler/utils.h"
17 #include "base/time/time.h"
18 #include "build/build_config.h"
19
20 namespace base {
21 namespace internal {
22
23 namespace {
24
25 // A task runner that runs tasks on a single WorkerThread.
26 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
robliao 2016/03/07 19:28:08 Working through the ChildTaskRunner, do these Task
gab 2016/03/07 19:31:37 They do, we will eventually need the underlying ty
27 public:
28 // Tasks posted through this task runner have |traits| and are inserted in
29 // |single_thread_priority_queue|. |delayed_task_manager| is used to post
30 // delayed tasks. |shutdown_manager| is notified when a task is posted.
31 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
32 PriorityQueue* single_thread_priority_queue,
33 DelayedTaskManager* delayed_task_manager,
34 ShutdownManager* shutdown_manager);
35
36 // SingleThreadTaskRunner:
37 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
38 const Closure& task,
39 TimeDelta delay) override;
40 bool PostDelayedTask(const tracked_objects::Location& from_here,
41 const Closure& closure,
42 TimeDelta delay) override;
43 bool RunsTasksOnCurrentThread() const override;
44
45 private:
46 ~SchedulerSingleThreadTaskRunner() override;
47
48 TaskTraits traits_;
49 scoped_refptr<Sequence> sequence_;
50 PriorityQueue* priority_queue_;
51 DelayedTaskManager* delayed_task_manager_;
52 ShutdownManager* shutdown_manager_;
53
54 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
55 };
56
57 SchedulerSingleThreadTaskRunner::SchedulerSingleThreadTaskRunner(
58 const TaskTraits& traits,
59 PriorityQueue* priority_queue,
60 DelayedTaskManager* delayed_task_manager,
61 ShutdownManager* shutdown_manager)
62 : traits_(traits),
63 sequence_(new Sequence),
64 priority_queue_(priority_queue),
65 delayed_task_manager_(delayed_task_manager),
66 shutdown_manager_(shutdown_manager) {}
67
68 bool SchedulerSingleThreadTaskRunner::PostDelayedTask(
69 const tracked_objects::Location& from_here,
70 const Closure& closure,
71 TimeDelta delay) {
72 Task task(from_here, closure, traits_, TimeTicks::Now());
73 if (!delay.is_zero())
74 task.delayed_run_time = task.post_time + delay;
75 PostTaskHelper(task, sequence_, priority_queue_, shutdown_manager_,
76 delayed_task_manager_);
77 return true;
78 }
79
80 bool SchedulerSingleThreadTaskRunner::RunsTasksOnCurrentThread() const {
81 // TODO(fdoray): Return true only if tasks posted may actually run on the
82 // current thread. It is valid, but not ideal, to always return true.
83 return true;
84 }
85
86 bool SchedulerSingleThreadTaskRunner::PostNonNestableDelayedTask(
87 const tracked_objects::Location& from_here,
88 const Closure& task,
89 TimeDelta delay) {
90 return PostDelayedTask(from_here, task, delay);
91 }
92
93 SchedulerSingleThreadTaskRunner::~SchedulerSingleThreadTaskRunner() = default;
94
95 } // namespace
96
97 scoped_ptr<WorkerThread> WorkerThread::CreateWorkerThread(
98 ThreadPriority thread_priority,
99 PriorityQueue* shared_priority_queue,
100 const MainEntryCallback& main_entry_callback,
101 const ReinsertSequenceCallback& reinsert_sequence_callback,
102 const BecomesIdleCallback& becomes_idle_callback,
103 DelayedTaskManager* delayed_task_manager,
104 ShutdownManager* shutdown_manager) {
105 scoped_ptr<WorkerThread> worker_thread(new WorkerThread(
106 thread_priority, shared_priority_queue, main_entry_callback,
107 reinsert_sequence_callback, becomes_idle_callback, delayed_task_manager,
108 shutdown_manager));
109 return worker_thread->IsValid() ? std::move(worker_thread)
110 : scoped_ptr<WorkerThread>();
111 }
112
113 WorkerThread::~WorkerThread() {
114 DCHECK(thread_handle_.is_null());
115 }
116
117 void WorkerThread::WakeUp() {
118 wakeup_event_.Signal();
119 }
120
121 scoped_refptr<SingleThreadTaskRunner> WorkerThread::CreateTaskRunnerWithTraits(
122 const TaskTraits& traits,
123 ExecutionMode execution_mode) {
124 #if defined(OS_WIN)
125 DCHECK(execution_mode == ExecutionMode::SINGLE_THREADED);
126 #else
127 DCHECK(execution_mode == ExecutionMode::SINGLE_THREADED);
128 #endif // defined(OS_WIN)
129
130 return scoped_refptr<SingleThreadTaskRunner>(
131 new SchedulerSingleThreadTaskRunner(
132 traits, &single_thread_priority_queue_, delayed_task_manager_,
133 shutdown_manager_));
134 }
135
136 bool WorkerThread::HasSingleThreadedTasks() const {
137 subtle::MemoryBarrier();
138 return !single_thread_priority_queue_.UnsynchronizedEmpty() ||
139 is_running_single_threaded_task_;
140 }
141
142 void WorkerThread::ShutdownAndJoinForTesting() {
143 DCHECK(!thread_handle_.is_null());
144 shutdown_manager_->Shutdown();
145 WakeUp();
146 PlatformThread::Join(thread_handle_);
147 thread_handle_ = PlatformThreadHandle();
148 }
149
150 WorkerThread::WorkerThread(
151 ThreadPriority thread_priority,
152 PriorityQueue* shared_priority_queue,
153 const MainEntryCallback& main_entry_callback,
154 const ReinsertSequenceCallback& reinsert_sequence_callback,
155 const BecomesIdleCallback& becomes_idle_callback,
156 DelayedTaskManager* delayed_task_manager,
157 ShutdownManager* shutdown_manager)
158 : wakeup_event_(false, false),
159 is_running_single_threaded_task_(false),
160 single_thread_priority_queue_(
161 Bind(&WorkerThread::WakeUp, Unretained(this)),
162 shared_priority_queue),
163 shared_priority_queue_(shared_priority_queue),
164 main_entry_callback_(main_entry_callback),
165 reinsert_sequence_callback_(reinsert_sequence_callback),
166 becomes_idle_callback_(becomes_idle_callback),
167 delayed_task_manager_(delayed_task_manager),
168 shutdown_manager_(shutdown_manager) {
169 DCHECK(shared_priority_queue_);
170 DCHECK(!reinsert_sequence_callback.is_null());
171 DCHECK(!becomes_idle_callback.is_null());
172 DCHECK(delayed_task_manager_);
173 DCHECK(shutdown_manager_);
174
175 #if defined(OS_MACOSX)
176 // Mac only supports 2 priorities. crbug.com/554651
177 if (thread_priority != ThreadPriority::NORMAL &&
178 thread_priority != ThreadPriority::REALTIME_AUDIO) {
179 thread_priority = ThreadPriority::NORMAL;
180 }
181 #endif // defined(OS_MACOSX)
182
183 const size_t kDefaultStackSize = 0;
184 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
185 thread_priority);
186 }
187
188 bool WorkerThread::IsValid() const {
189 return !thread_handle_.is_null();
190 }
191
192 scoped_refptr<Sequence> WorkerThread::GetWork() {
193 scoped_ptr<PriorityQueue::Transaction> shared_transaction(
194 shared_priority_queue_->BeginTransaction());
195 SequenceSortKey shared_sort_key;
196 scoped_refptr<Sequence> shared_sequence =
197 shared_transaction->PeekSequence(&shared_sort_key);
198
199 scoped_ptr<PriorityQueue::Transaction> single_thread_transaction(
200 single_thread_priority_queue_.BeginTransaction());
201 SequenceSortKey single_thread_sort_key;
202 scoped_refptr<Sequence> single_thread_sequence =
203 single_thread_transaction->PeekSequence(&single_thread_sort_key);
204
205 if (single_thread_sequence.get() == nullptr &&
206 shared_sequence.get() == nullptr) {
207 return scoped_refptr<Sequence>();
208 }
209
210 if (single_thread_sequence.get() == nullptr ||
211 (shared_sequence.get() != nullptr &&
212 single_thread_sort_key < shared_sort_key)) {
213 shared_transaction->PopSequence();
214 return shared_sequence;
215 }
216
217 DCHECK(single_thread_sequence.get());
218
219 is_running_single_threaded_task_ = true;
220 single_thread_transaction->PopSequence();
221 return single_thread_sequence;
222 }
223
224 void WorkerThread::ReinsertSequenceInSingleThreadPriorityQueue(
225 scoped_refptr<Sequence> sequence) {
226 // Get the sort key of |sequence| before creating a priority queue
227 // transaction, to avoid holding 2 locks at the same time.
228 SequenceSortKey sort_key = sequence->GetSortKey();
229
230 // Insert the sequence in the single-thread priority queue.
231 single_thread_priority_queue_.BeginTransaction()->PushSequence(sequence,
232 sort_key);
233 }
234
235 void WorkerThread::WaitUntilWorkIsAvailable() {
236 const TimeTicks next_delayed_task_ready_time =
237 delayed_task_manager_->GetNextDelayedRunTime();
238
239 if (next_delayed_task_ready_time.is_null()) {
240 // There is no delayed tasks. Wait until |wakeup_event_| is signaled.
241 wakeup_event_.Wait();
242 } else {
243 // There is delayed tasks. Wait until either a delayed task becomes ready
244 // for execution or |wakeup_event_| is signaled. Note: Multiple threads
245 // sharing the same DelayedTaskManager may wake up at the same time when a
246 // delayed task becomes ready for execution. This isn't optimal. However,
247 // since most delayed tasks should be posted to BACKGROUND thread pools
248 // (which have a single thread), this behavior shouldn't occur frequently.
249 const TimeDelta wait_time = next_delayed_task_ready_time - TimeTicks::Now();
250 if (wait_time.InMilliseconds() > 0)
251 wakeup_event_.TimedWait(wait_time);
252 }
253 }
254
255 void WorkerThread::ThreadMain() {
256 main_entry_callback_.Run();
257 while (!shutdown_manager_->shutdown_completed()) {
258 // Get the sequence containing the next task to execute.
259 scoped_refptr<Sequence> sequence = GetWork();
260 if (sequence.get() == nullptr) {
261 // Add the thread to the stack of idle threads of the parent thread pool.
262 becomes_idle_callback_.Run(this);
263
264 // Check one more time whether there is pending work. Without this, it
265 // could be that work has been added to |shared_priority_queue_| after the
266 // first call to GetWork() and before this thread was added to the stack
267 // of idle threads. In such a case, |wake_up_event_| hasn't been signaled
268 // because this thread wasn't in the stack of idle threads. However, this
269 // thread is needed to execute the newly added work.
270 sequence = GetWork();
271
272 if (sequence.get() == nullptr) {
273 WaitUntilWorkIsAvailable();
274 sequence = GetWork();
275 }
276 }
277
278 if (sequence.get() != nullptr) {
279 // Peek the next task in the sequence.
280 const Task* task = sequence->PeekTask();
281 const TaskShutdownBehavior shutdown_behavior =
282 task->traits.shutdown_behavior();
283
284 // Run the task.
285 if (shutdown_manager_->ShouldScheduleTask(shutdown_behavior)) {
286 debug::TaskAnnotator task_annotator;
287 task_annotator.RunTask(kQueueFunctionName, *task);
288 shutdown_manager_->DidExecuteTask(shutdown_behavior);
289 }
290
291 // Pop the task from the sequence.
292 size_t new_num_tasks_in_sequence;
293 sequence->PopTask(&new_num_tasks_in_sequence);
294
295 // Put the sequence back in the appropriate priority queue.
296 if (new_num_tasks_in_sequence > 0) {
297 if (is_running_single_threaded_task_)
298 ReinsertSequenceInSingleThreadPriorityQueue(sequence);
299 else
300 reinsert_sequence_callback_.Run(sequence, this);
301 }
302
303 // Note that the thread is no longer running a single-threaded task.
304 is_running_single_threaded_task_ = false;
305 }
306
307 // Post delayed tasks that are ready for execution.
308 delayed_task_manager_->PostReadyTasks();
309 }
310 }
311
312 } // namespace internal
313 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/worker_thread.h ('k') | base/task_scheduler/worker_thread_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698