Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(272)

Side by Side Diff: base/task_scheduler/worker_thread.cc

Issue 1704113002: TaskScheduler [6] SchedulerWorkerThread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@s_4_shutdown
Patch Set: rebase Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/worker_thread.h"
6
7 #include <utility>
8
9 #include "base/bind.h"
10 #include "base/logging.h"
11 #include "base/task_scheduler/task_tracker.h"
12 #include "base/task_scheduler/utils.h"
13 #include "base/time/time.h"
14
15 namespace base {
16 namespace internal {
17
18 namespace {
19
20 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
21 public:
22 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
23 PriorityQueue* single_thread_priority_queue,
24 TaskTracker* task_tracker);
25
26 // SingleThreadTaskRunner:
27 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
28 const Closure& task,
29 TimeDelta delay) override;
30 bool PostDelayedTask(const tracked_objects::Location& from_here,
31 const Closure& closure,
32 TimeDelta delay) override;
33 bool RunsTasksOnCurrentThread() const override;
34
35 private:
36 ~SchedulerSingleThreadTaskRunner() override;
37
38 const TaskTraits traits_;
39 const scoped_refptr<Sequence> sequence_;
40 PriorityQueue* const priority_queue_;
41 TaskTracker* const task_tracker_;
42
43 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
44 };
45
46 SchedulerSingleThreadTaskRunner::SchedulerSingleThreadTaskRunner(
47 const TaskTraits& traits,
48 PriorityQueue* single_thread_priority_queue,
49 TaskTracker* task_tracker)
50 : traits_(traits),
51 sequence_(new Sequence),
52 priority_queue_(single_thread_priority_queue),
53 task_tracker_(task_tracker) {}
54
55 bool SchedulerSingleThreadTaskRunner::PostDelayedTask(
56 const tracked_objects::Location& from_here,
57 const Closure& closure,
58 TimeDelta delay) {
59 // TODO(fdoray): Support delayed tasks.
60 DCHECK(delay.is_zero());
61 PostTaskHelper(make_scoped_ptr(new Task(from_here, closure, traits_)),
62 sequence_, priority_queue_, task_tracker_);
63 return true;
64 }
65
66 bool SchedulerSingleThreadTaskRunner::RunsTasksOnCurrentThread() const {
67 // TODO(fdoray): Return true only if tasks posted may actually run on the
68 // current thread. It is valid, but not ideal, to always return true.
69 return true;
70 }
71
72 bool SchedulerSingleThreadTaskRunner::PostNonNestableDelayedTask(
73 const tracked_objects::Location& from_here,
74 const Closure& task,
75 TimeDelta delay) {
76 return PostDelayedTask(from_here, task, delay);
77 }
78
79 SchedulerSingleThreadTaskRunner::~SchedulerSingleThreadTaskRunner() = default;
80
81 void PushSequenceInPriorityQueue(scoped_refptr<Sequence> sequence,
82 PriorityQueue* priority_queue) {
83 const SequenceSortKey sort_key = sequence->GetSortKey();
84 priority_queue->BeginTransaction()->Push(make_scoped_ptr(
85 new PriorityQueue::SequenceAndSortKey(std::move(sequence), sort_key)));
86 }
87
88 } // namespace
89
90 scoped_ptr<WorkerThread> WorkerThread::CreateWorkerThread(
91 ThreadPriority thread_priority,
92 PriorityQueue* shared_priority_queue,
93 const ReinsertSequenceCallback& reinsert_sequence_callback,
94 const BecomesIdleCallback& becomes_idle_callback,
95 TaskTracker* task_tracker) {
96 scoped_ptr<WorkerThread> worker_thread(new WorkerThread(
97 thread_priority, shared_priority_queue, reinsert_sequence_callback,
98 becomes_idle_callback, task_tracker));
99
100 if (worker_thread->thread_handle_.is_null())
101 return scoped_ptr<WorkerThread>();
102 return worker_thread;
103 }
104
105 WorkerThread::~WorkerThread() {
106 AutoSchedulerLock auto_lock(lock_);
107 DCHECK(should_exit_for_testing_);
108 }
109
110 bool WorkerThread::WakeUp() {
111 AutoSchedulerLock auto_lock(lock_);
112 if (is_awake_)
113 return false;
114 is_awake_ = true;
115 wake_up_cv_->Signal();
116 return true;
117 }
118
119 scoped_refptr<SingleThreadTaskRunner> WorkerThread::CreateTaskRunnerWithTraits(
120 const TaskTraits& traits) {
121 return scoped_refptr<SingleThreadTaskRunner>(
122 new SchedulerSingleThreadTaskRunner(
123 traits, &single_thread_priority_queue_, task_tracker_));
124 }
125
126 void WorkerThread::JoinForTesting() {
127 {
128 AutoSchedulerLock auto_lock(lock_);
129 DCHECK(!should_exit_for_testing_);
130 should_exit_for_testing_ = true;
131 }
132 WakeUp();
133 PlatformThread::Join(thread_handle_);
134 }
135
136 WorkerThread::WorkerThread(
137 ThreadPriority thread_priority,
138 PriorityQueue* shared_priority_queue,
139 const ReinsertSequenceCallback& reinsert_sequence_callback,
140 const BecomesIdleCallback& becomes_idle_callback,
141 TaskTracker* task_tracker)
142 : wake_up_cv_(lock_.CreateConditionVariable()),
143 is_awake_(true),
144 should_exit_for_testing_(false),
145 single_thread_priority_queue_(
146 Bind(IgnoreResult(&WorkerThread::WakeUp), Unretained(this)),
147 shared_priority_queue),
148 shared_priority_queue_(shared_priority_queue),
149 reinsert_sequence_callback_(reinsert_sequence_callback),
150 becomes_idle_callback_(becomes_idle_callback),
151 task_tracker_(task_tracker) {
152 DCHECK(shared_priority_queue_);
153 DCHECK(!reinsert_sequence_callback_.is_null());
154 DCHECK(!becomes_idle_callback_.is_null());
155 DCHECK(task_tracker_);
156
157 #if defined(OS_MACOSX)
158 // Mac only supports 2 priorities. crbug.com/554651
159 if (thread_priority != ThreadPriority::NORMAL &&
160 thread_priority != ThreadPriority::REALTIME_AUDIO) {
161 thread_priority = ThreadPriority::NORMAL;
162 }
163 #endif // defined(OS_MACOSX)
164
165 const size_t kDefaultStackSize = 0;
166 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
167 thread_priority);
168 }
169
170 scoped_refptr<Sequence> WorkerThread::GetWork(bool* single_thread) {
171 DCHECK(single_thread);
172 *single_thread = false;
173
174 scoped_ptr<PriorityQueue::Transaction> shared_transaction(
175 shared_priority_queue_->BeginTransaction());
176 const PriorityQueue::SequenceAndSortKey shared_sequence =
177 shared_transaction->Peek();
178
179 scoped_ptr<PriorityQueue::Transaction> single_thread_transaction(
180 single_thread_priority_queue_.BeginTransaction());
181 const PriorityQueue::SequenceAndSortKey single_thread_sequence =
182 single_thread_transaction->Peek();
183
184 if (single_thread_sequence.is_null() && shared_sequence.is_null()) {
185 return scoped_refptr<Sequence>();
186 }
187
188 if (single_thread_sequence.is_null() ||
189 (!shared_sequence.is_null() &&
190 single_thread_sequence.sort_key < shared_sequence.sort_key)) {
191 shared_transaction->Pop();
192 return shared_sequence.sequence;
193 }
194
195 DCHECK(!single_thread_sequence.is_null());
196 single_thread_transaction->Pop();
197 *single_thread = true;
198 return single_thread_sequence.sequence;
199 }
200
201 void WorkerThread::WaitUntilWakeUp() {
202 AutoSchedulerLock auto_lock(lock_);
203 while (!is_awake_)
204 wake_up_cv_->Wait();
205 }
206
207 void WorkerThread::ThreadMain() {
208 while (!task_tracker_->shutdown_completed() && !should_exit_for_testing_) {
209 // Get the sequence containing the next task to execute.
210 bool sequence_is_single_threaded = false;
robliao 2016/03/09 15:41:12 For unit tests: If we're exiting the test, call Wa
fdoray 2016/03/14 20:46:35 You're right. Thanks for the fix. I moved it insid
211 scoped_refptr<Sequence> sequence = GetWork(&sequence_is_single_threaded);
212 if (sequence.get() == nullptr) {
213 // Mark the WorkerThread as idle.
214 {
215 AutoSchedulerLock auto_lock(lock_);
216 is_awake_ = false;
217 }
218 becomes_idle_callback_.Run(this);
219
220 // Check one more time if there is work available. If work has been added
221 // to |shared_priority_queue_| after the first call to GetWork() but
222 // before the |becomes_idle_callback_| invocation, this WorkerThread
223 // should run this work.
224 sequence = GetWork(&sequence_is_single_threaded);
225
226 if (sequence.get() == nullptr) {
227 WaitUntilWakeUp();
228 sequence = GetWork(&sequence_is_single_threaded);
229 }
230 }
231
232 if (sequence.get() != nullptr) {
233 // Peek the next task in the sequence and run it.
234 task_tracker_->RunTask(sequence->PeekTask());
235
236 // Pop the task from its sequence. If the sequence isn't empty, reinsert
237 // it in the appropriate PriorityQueue.
238 if (!sequence->PopTask()) {
239 if (sequence_is_single_threaded) {
240 PushSequenceInPriorityQueue(std::move(sequence),
241 &single_thread_priority_queue_);
242 } else {
243 reinsert_sequence_callback_.Run(std::move(sequence), this);
244 }
245 }
246 }
247 }
248 }
249
250 } // namespace internal
251 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698