OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "base/task_scheduler/worker_thread.h" | |
6 | |
7 #include <utility> | |
8 | |
9 #include "base/bind.h" | |
10 #include "base/logging.h" | |
11 #include "base/task_scheduler/task_tracker.h" | |
12 #include "base/task_scheduler/utils.h" | |
13 #include "base/time/time.h" | |
14 | |
15 namespace base { | |
16 namespace internal { | |
17 | |
18 namespace { | |
19 | |
20 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner { | |
21 public: | |
22 SchedulerSingleThreadTaskRunner(const TaskTraits& traits, | |
23 PriorityQueue* single_thread_priority_queue, | |
24 TaskTracker* task_tracker); | |
25 | |
26 // SingleThreadTaskRunner: | |
27 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here, | |
28 const Closure& task, | |
29 TimeDelta delay) override; | |
30 bool PostDelayedTask(const tracked_objects::Location& from_here, | |
31 const Closure& closure, | |
32 TimeDelta delay) override; | |
33 bool RunsTasksOnCurrentThread() const override; | |
34 | |
35 private: | |
36 ~SchedulerSingleThreadTaskRunner() override; | |
37 | |
38 const TaskTraits traits_; | |
39 const scoped_refptr<Sequence> sequence_; | |
40 PriorityQueue* const priority_queue_; | |
41 TaskTracker* const task_tracker_; | |
42 | |
43 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner); | |
44 }; | |
45 | |
46 SchedulerSingleThreadTaskRunner::SchedulerSingleThreadTaskRunner( | |
47 const TaskTraits& traits, | |
48 PriorityQueue* single_thread_priority_queue, | |
49 TaskTracker* task_tracker) | |
50 : traits_(traits), | |
51 sequence_(new Sequence), | |
52 priority_queue_(single_thread_priority_queue), | |
53 task_tracker_(task_tracker) {} | |
54 | |
55 bool SchedulerSingleThreadTaskRunner::PostDelayedTask( | |
56 const tracked_objects::Location& from_here, | |
57 const Closure& closure, | |
58 TimeDelta delay) { | |
59 // TODO(fdoray): Support delayed tasks. | |
60 DCHECK(delay.is_zero()); | |
61 PostTaskHelper(make_scoped_ptr(new Task(from_here, closure, traits_)), | |
62 sequence_, priority_queue_, task_tracker_); | |
63 return true; | |
64 } | |
65 | |
66 bool SchedulerSingleThreadTaskRunner::RunsTasksOnCurrentThread() const { | |
67 // TODO(fdoray): Return true only if tasks posted may actually run on the | |
68 // current thread. It is valid, but not ideal, to always return true. | |
69 return true; | |
gab
2016/03/21 19:11:54
I think this can be implemented relatively easily
fdoray
2016/03/24 19:21:09
As discussed earlier, this will be implemented wit
| |
70 } | |
71 | |
72 bool SchedulerSingleThreadTaskRunner::PostNonNestableDelayedTask( | |
73 const tracked_objects::Location& from_here, | |
74 const Closure& task, | |
75 TimeDelta delay) { | |
76 return PostDelayedTask(from_here, task, delay); | |
gab
2016/03/21 19:11:54
Add:
// Tasks are never nested on WorkerThread.
fdoray
2016/03/24 19:21:10
Done.
| |
77 } | |
78 | |
79 SchedulerSingleThreadTaskRunner::~SchedulerSingleThreadTaskRunner() = default; | |
80 | |
81 void PushSequenceInPriorityQueue(scoped_refptr<Sequence> sequence, | |
82 PriorityQueue* priority_queue) { | |
83 const SequenceSortKey sort_key = sequence->GetSortKey(); | |
84 priority_queue->BeginTransaction()->Push(make_scoped_ptr( | |
85 new PriorityQueue::SequenceAndSortKey(std::move(sequence), sort_key))); | |
86 } | |
87 | |
88 } // namespace | |
89 | |
90 scoped_ptr<WorkerThread> WorkerThread::CreateWorkerThread( | |
91 ThreadPriority thread_priority, | |
92 PriorityQueue* shared_priority_queue, | |
93 const ReinsertSequenceCallback& reinsert_sequence_callback, | |
94 const BecomesIdleCallback& becomes_idle_callback, | |
95 TaskTracker* task_tracker) { | |
96 scoped_ptr<WorkerThread> worker_thread(new WorkerThread( | |
97 thread_priority, shared_priority_queue, reinsert_sequence_callback, | |
98 becomes_idle_callback, task_tracker)); | |
99 | |
100 if (worker_thread->thread_handle_.is_null()) | |
101 return scoped_ptr<WorkerThread>(); | |
102 return worker_thread; | |
103 } | |
104 | |
105 WorkerThread::~WorkerThread() { | |
106 AutoSchedulerLock auto_lock(lock_); | |
107 DCHECK(should_exit_for_testing_); | |
108 } | |
109 | |
110 bool WorkerThread::WakeUp() { | |
111 AutoSchedulerLock auto_lock(lock_); | |
112 if (is_awake_) | |
113 return false; | |
114 is_awake_ = true; | |
115 wake_up_cv_->Signal(); | |
gab
2016/03/21 19:11:54
tl;dr; no explicit request here, mostly thoughts a
fdoray
2016/03/24 19:21:10
I now use a WaitableEvent instead of a ConditionVa
| |
116 return true; | |
117 } | |
118 | |
119 scoped_refptr<SingleThreadTaskRunner> WorkerThread::CreateTaskRunnerWithTraits( | |
120 const TaskTraits& traits) { | |
121 return scoped_refptr<SingleThreadTaskRunner>( | |
122 new SchedulerSingleThreadTaskRunner( | |
123 traits, &single_thread_priority_queue_, task_tracker_)); | |
gab
2016/03/21 19:11:54
This is only fine because we never delete WorkerTh
fdoray
2016/03/24 19:21:10
Done.
| |
124 } | |
125 | |
126 void WorkerThread::JoinForTesting() { | |
127 { | |
128 AutoSchedulerLock auto_lock(lock_); | |
129 DCHECK(!should_exit_for_testing_); | |
130 should_exit_for_testing_ = true; | |
131 } | |
132 WakeUp(); | |
133 PlatformThread::Join(thread_handle_); | |
134 } | |
135 | |
136 WorkerThread::WorkerThread( | |
137 ThreadPriority thread_priority, | |
138 PriorityQueue* shared_priority_queue, | |
139 const ReinsertSequenceCallback& reinsert_sequence_callback, | |
140 const BecomesIdleCallback& becomes_idle_callback, | |
141 TaskTracker* task_tracker) | |
142 : wake_up_cv_(lock_.CreateConditionVariable()), | |
143 is_awake_(true), | |
144 should_exit_for_testing_(false), | |
145 single_thread_priority_queue_( | |
146 Bind(IgnoreResult(&WorkerThread::WakeUp), Unretained(this)), | |
147 shared_priority_queue), | |
148 shared_priority_queue_(shared_priority_queue), | |
149 reinsert_sequence_callback_(reinsert_sequence_callback), | |
150 becomes_idle_callback_(becomes_idle_callback), | |
151 task_tracker_(task_tracker) { | |
152 DCHECK(shared_priority_queue_); | |
153 DCHECK(!reinsert_sequence_callback_.is_null()); | |
154 DCHECK(!becomes_idle_callback_.is_null()); | |
155 DCHECK(task_tracker_); | |
156 | |
157 #if defined(OS_MACOSX) | |
158 // Mac only supports 2 priorities. crbug.com/554651 | |
gab
2016/03/21 19:11:54
Looks like this is fixed, I think we can remove th
fdoray
2016/03/24 19:21:10
Done. I pinged erikchen@ to have this landed.
| |
159 if (thread_priority != ThreadPriority::NORMAL && | |
160 thread_priority != ThreadPriority::REALTIME_AUDIO) { | |
161 thread_priority = ThreadPriority::NORMAL; | |
162 } | |
163 #endif // defined(OS_MACOSX) | |
164 | |
165 const size_t kDefaultStackSize = 0; | |
166 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_, | |
gab
2016/03/21 19:11:54
In theory we'd need to use CreateNonJoinable() --
fdoray
2016/03/24 19:21:09
We don't want to use CreateNonJoinable() because:
| |
167 thread_priority); | |
168 } | |
169 | |
170 scoped_refptr<Sequence> WorkerThread::GetWork(bool* single_thread) { | |
171 DCHECK(single_thread); | |
172 *single_thread = false; | |
173 | |
174 scoped_ptr<PriorityQueue::Transaction> shared_transaction( | |
175 shared_priority_queue_->BeginTransaction()); | |
176 const PriorityQueue::SequenceAndSortKey shared_sequence = | |
177 shared_transaction->Peek(); | |
178 | |
179 scoped_ptr<PriorityQueue::Transaction> single_thread_transaction( | |
180 single_thread_priority_queue_.BeginTransaction()); | |
181 const PriorityQueue::SequenceAndSortKey single_thread_sequence = | |
182 single_thread_transaction->Peek(); | |
183 | |
184 if (single_thread_sequence.is_null() && shared_sequence.is_null()) { | |
185 return scoped_refptr<Sequence>(); | |
186 } | |
187 | |
188 if (single_thread_sequence.is_null() || | |
189 (!shared_sequence.is_null() && | |
gab
2016/03/21 19:11:54
This check is redundant. If single_thread_sequence
fdoray
2016/03/24 19:21:10
No. If we remove !shared_sequence.is_null(), |shar
gab
2016/04/05 23:35:20
True, my bad, this is fine as-is nvm.
| |
190 single_thread_sequence.sort_key < shared_sequence.sort_key)) { | |
191 shared_transaction->Pop(); | |
192 return shared_sequence.sequence; | |
193 } | |
194 | |
195 DCHECK(!single_thread_sequence.is_null()); | |
196 single_thread_transaction->Pop(); | |
197 *single_thread = true; | |
198 return single_thread_sequence.sequence; | |
199 } | |
200 | |
201 void WorkerThread::WaitUntilWakeUp() { | |
202 AutoSchedulerLock auto_lock(lock_); | |
203 while (!is_awake_ && !should_exit_for_testing_) | |
204 wake_up_cv_->Wait(); | |
gab
2016/03/21 19:11:54
So long as WaitUntilWakeUp() is only called from o
fdoray
2016/03/24 19:21:10
I now use a WaitableEvent.
| |
205 } | |
206 | |
207 void WorkerThread::ThreadMain() { | |
208 while (!should_exit_for_testing_) { | |
gab
2016/03/21 19:11:54
Add a comment explaining what happens in product (
fdoray
2016/03/24 19:21:10
I added !task_tracker_->shutdown_completed() so th
| |
209 // Get the sequence containing the next task to execute. | |
210 bool sequence_is_single_threaded = false; | |
211 scoped_refptr<Sequence> sequence = GetWork(&sequence_is_single_threaded); | |
212 if (sequence.get() == nullptr) { | |
gab
2016/03/21 19:11:54
if (!sequence)
(here and below)
fdoray
2016/03/24 19:21:10
Done.
| |
213 // Mark the WorkerThread as idle. | |
214 { | |
215 AutoSchedulerLock auto_lock(lock_); | |
216 is_awake_ = false; | |
217 } | |
218 becomes_idle_callback_.Run(this); | |
219 | |
220 // Check one more time if there is work available. If work is added to | |
221 // |shared_priority_queue_| after the first call to GetWork() but before | |
gab
2016/03/21 19:11:54
Could also have been added to |single_thread_prior
fdoray
2016/03/24 19:21:10
Done. This extra check is really just for |shared_
| |
222 // the |becomes_idle_callback_| invocation, this WorkerThread should run | |
223 // this work. | |
224 sequence = GetWork(&sequence_is_single_threaded); | |
gab
2016/03/21 19:11:54
If we do get work here don't we need to self-wake,
fdoray
2016/03/24 19:21:10
Done.
| |
225 | |
226 // Check |should_exit_for_testing_| one more time. If JoinForTesting() is | |
227 // called after the loop condition is checked but before |is_awake_| is | |
228 // updated, this WorkerThread could block on WaitUntilWakeUp() forever | |
229 // without this extra check. | |
230 if (should_exit_for_testing_) | |
gab
2016/03/21 19:11:54
Need to lock to check this member, right?
Can we
fdoray
2016/03/24 19:21:10
We no longer need this with a WaitableEvent.
| |
231 break; | |
232 | |
233 if (sequence.get() == nullptr) { | |
234 WaitUntilWakeUp(); | |
235 sequence = GetWork(&sequence_is_single_threaded); | |
gab
2016/03/21 19:11:54
Here I'd say "continue;" is easier to read, result
fdoray
2016/03/24 19:21:10
Done.
| |
236 } | |
237 } | |
238 | |
239 if (sequence.get() != nullptr) { | |
240 // Peek the next task in the sequence and run it. | |
241 task_tracker_->RunTask(sequence->PeekTask()); | |
242 | |
243 // Pop the task from its sequence. If the sequence isn't empty, reinsert | |
244 // it in the appropriate PriorityQueue. | |
245 if (!sequence->PopTask()) { | |
246 if (sequence_is_single_threaded) { | |
247 PushSequenceInPriorityQueue(std::move(sequence), | |
gab
2016/03/21 19:11:54
Inline this method here? It's a 2 lines helper and
fdoray
2016/03/24 19:21:09
Done.
| |
248 &single_thread_priority_queue_); | |
249 } else { | |
250 reinsert_sequence_callback_.Run(std::move(sequence), this); | |
251 } | |
252 } | |
253 } | |
254 } | |
255 } | |
256 | |
257 } // namespace internal | |
258 } // namespace base | |
OLD | NEW |