Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(142)

Side by Side Diff: base/task_scheduler/worker_thread.cc

Issue 1704113002: TaskScheduler [6] SchedulerWorkerThread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@s_4_shutdown
Patch Set: self review Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « base/task_scheduler/worker_thread.h ('k') | base/task_scheduler/worker_thread_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/worker_thread.h"
6
7 #include <ostream>
8 #include <utility>
9
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/task_scheduler/task_tracker.h"
13 #include "base/task_scheduler/utils.h"
14 #include "base/time/time.h"
15
16 namespace base {
17 namespace internal {
18
19 namespace {
20
21 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
22 public:
23 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
24 PriorityQueue* single_thread_priority_queue,
25 TaskTracker* task_tracker);
26
27 // SingleThreadTaskRunner:
28 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
29 const Closure& task,
30 TimeDelta delay) override;
31 bool PostDelayedTask(const tracked_objects::Location& from_here,
32 const Closure& closure,
33 TimeDelta delay) override;
34 bool RunsTasksOnCurrentThread() const override;
35
36 private:
37 ~SchedulerSingleThreadTaskRunner() override;
38
39 const TaskTraits traits_;
40 const scoped_refptr<Sequence> sequence_;
41 PriorityQueue* const priority_queue_;
42 TaskTracker* const task_tracker_;
43
44 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
45 };
46
47 SchedulerSingleThreadTaskRunner::SchedulerSingleThreadTaskRunner(
48 const TaskTraits& traits,
49 PriorityQueue* single_thread_priority_queue,
50 TaskTracker* task_tracker)
51 : traits_(traits),
52 sequence_(new Sequence),
53 priority_queue_(single_thread_priority_queue),
54 task_tracker_(task_tracker) {}
55
56 bool SchedulerSingleThreadTaskRunner::PostDelayedTask(
57 const tracked_objects::Location& from_here,
58 const Closure& closure,
59 TimeDelta delay) {
60 // TODO(fdoray): Support delayed tasks.
61 DCHECK(delay.is_zero());
62 PostTaskHelper(make_scoped_ptr(new Task(from_here, closure, traits_)),
63 sequence_, priority_queue_, task_tracker_);
64 return true;
65 }
66
67 bool SchedulerSingleThreadTaskRunner::RunsTasksOnCurrentThread() const {
68 // TODO(fdoray): Return true only if tasks posted may actually run on the
69 // current thread. It is valid, but not ideal, to always return true.
70 return true;
71 }
72
73 bool SchedulerSingleThreadTaskRunner::PostNonNestableDelayedTask(
74 const tracked_objects::Location& from_here,
75 const Closure& task,
76 TimeDelta delay) {
77 // Tasks are never nested on WorkerThread.
78 return PostDelayedTask(from_here, task, delay);
79 }
80
81 SchedulerSingleThreadTaskRunner::~SchedulerSingleThreadTaskRunner() = default;
82
83 // Extracts the Sequence with the highest priority from |shared_transaction| or
84 // |single_thread_transaction|. |is_single_threaded| is set to true if the
85 // returned Sequence comes from |single_thread_transaction|.
86 scoped_refptr<Sequence> GetWork(
87 PriorityQueue::Transaction* shared_transaction,
88 PriorityQueue::Transaction* single_thread_transaction,
89 bool* is_single_threaded) {
90 DCHECK(shared_transaction);
91 DCHECK(single_thread_transaction);
92 DCHECK(is_single_threaded);
robliao 2016/03/30 00:42:42 Done as in DCHECK necessary or DCHECK unnecessary?
fdoray 2016/03/30 18:44:49 I removed it in patch set 11... and I re-added it
93
94 *is_single_threaded = false;
95
96 const PriorityQueue::SequenceAndSortKey shared_sequence =
97 shared_transaction->Peek();
98 const PriorityQueue::SequenceAndSortKey single_thread_sequence =
99 single_thread_transaction->Peek();
100
101 if (single_thread_sequence.is_null() && shared_sequence.is_null())
102 return scoped_refptr<Sequence>();
103
104 if (single_thread_sequence.is_null() ||
105 (!shared_sequence.is_null() &&
106 single_thread_sequence.sort_key < shared_sequence.sort_key)) {
107 shared_transaction->Pop();
108 return shared_sequence.sequence;
109 }
110
111 DCHECK(!single_thread_sequence.is_null());
112 single_thread_transaction->Pop();
113 *is_single_threaded = true;
114 return single_thread_sequence.sequence;
115 }
116
117 } // namespace
118
119 scoped_ptr<WorkerThread> WorkerThread::CreateWorkerThread(
120 ThreadPriority thread_priority,
121 PriorityQueue* shared_priority_queue,
122 const SharedSequenceStillHasTasksCallback&
123 shared_sequence_still_has_tasks_callback,
124 const StateChangedCallback& state_changed_callback,
125 TaskTracker* task_tracker) {
126 scoped_ptr<WorkerThread> worker_thread(
127 new WorkerThread(thread_priority, shared_priority_queue,
128 shared_sequence_still_has_tasks_callback,
129 state_changed_callback, task_tracker));
130
131 if (worker_thread->thread_handle_.is_null())
132 return scoped_ptr<WorkerThread>();
133 return worker_thread;
134 }
135
136 WorkerThread::~WorkerThread() {
137 DCHECK(should_exit_for_testing());
138 }
139
140 void WorkerThread::WakeUp() {
141 wake_up_event_.Signal();
142 }
143
144 scoped_refptr<SingleThreadTaskRunner> WorkerThread::CreateTaskRunnerWithTraits(
145 const TaskTraits& traits) {
146 // A WorkerThread is never destroyed, except in tests in which we don't use
147 // task runners after their WorkerThread has been destroyed. Because of that,
148 // it is correct to keep pointers to WorkerThread members in the constructed
149 // task runner.
150 return scoped_refptr<SingleThreadTaskRunner>(
151 new SchedulerSingleThreadTaskRunner(
152 traits, &single_thread_priority_queue_, task_tracker_));
153 }
154
155 void WorkerThread::JoinForTesting() {
156 should_exit_for_testing_ = true;
157 base::subtle::MemoryBarrier();
158 WakeUp();
159 PlatformThread::Join(thread_handle_);
160 }
161
162 WorkerThread::WorkerThread(ThreadPriority thread_priority,
163 PriorityQueue* shared_priority_queue,
164 const SharedSequenceStillHasTasksCallback&
165 shared_sequence_still_has_tasks_callback,
166 const StateChangedCallback& state_changed_callback,
167 TaskTracker* task_tracker)
168 : wake_up_event_(false, false),
169 single_thread_priority_queue_(
170 Bind(&WorkerThread::WakeUp, Unretained(this)),
171 shared_priority_queue),
172 shared_priority_queue_(shared_priority_queue),
173 shared_sequence_still_has_tasks_callback_(
174 shared_sequence_still_has_tasks_callback),
175 state_changed_callback_(state_changed_callback),
176 task_tracker_(task_tracker) {
177 DCHECK(shared_priority_queue_);
178 DCHECK(!shared_sequence_still_has_tasks_callback_.is_null());
179 DCHECK(!state_changed_callback_.is_null());
180 DCHECK(task_tracker_);
181
182 static const size_t kDefaultStackSize = 0;
183 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
184 thread_priority);
185 }
186
187 void WorkerThread::SetState(State state) {
188 DCHECK_NE(state_, state);
189 shared_priority_queue_->container_lock()->AssertAcquired();
190 state_ = state;
191 state_changed_callback_.Run(this, state);
robliao 2016/03/30 00:42:42 I'm not sure I like this callback as we're holding
robliao 2016/03/30 00:47:28 This also means that the threadpool will have to m
fdoray 2016/03/30 18:44:49 I like the idea of delegating GetWork() to the Thr
192 }
193
194 void WorkerThread::ThreadMain() {
195 while (!task_tracker_->shutdown_completed() && !should_exit_for_testing()) {
196 // Get the sequence containing the next task to execute.
197 bool sequence_is_single_threaded = false;
198 scoped_refptr<Sequence> sequence;
199 {
200 scoped_ptr<PriorityQueue::Transaction> shared_transaction(
201 shared_priority_queue_->BeginTransaction());
202 scoped_ptr<PriorityQueue::Transaction> single_thread_transaction(
203 single_thread_priority_queue_.BeginTransaction());
204 sequence =
205 GetWork(shared_transaction.get(), single_thread_transaction.get(),
206 &sequence_is_single_threaded);
207 single_thread_transaction.reset();
208
209 if (!sequence) {
210 // Mark this WorkerThread as IDLE. This must be done within the scope of
211 // a |shared_priority_queue_| Transaction and |shared_priority_queue_|
212 // must be empty.
213 SetState(State::IDLE);
214 shared_transaction.reset();
215
216 // Wait for a wake-up.
217 wake_up_event_.Wait();
robliao 2016/03/30 00:42:42 Between shared_transaction.reset and wake_up_event
fdoray 2016/03/30 18:44:49 Why is it problematic? What shouldn't happen betwe
robliao 2016/03/30 19:38:26 Previously, we had a CV do the work here so we cou
fdoray 2016/03/30 19:56:37 ok, I agree.
218
219 // Mark this WorkerThread as BUSY. This must be done within the scope of
220 // a |shared_priority_queue_| Transaction.
221 shared_transaction = shared_priority_queue_->BeginTransaction();
222 SetState(State::BUSY);
223
224 // Try to get work again.
225 continue;
226 }
227 }
228
229 DCHECK_EQ(state_, State::BUSY);
230
231 // Peek the next task in |sequence| and run it.
232 task_tracker_->RunTask(sequence->PeekTask());
233
234 // Pop a task from |sequence|. Reinsert it in the appropriate PriorityQueue
235 // if it's not empty.
236 if (!sequence->PopTask()) {
237 if (sequence_is_single_threaded) {
238 const SequenceSortKey sort_key = sequence->GetSortKey();
239 single_thread_priority_queue_.BeginTransaction()->Push(
240 make_scoped_ptr(new PriorityQueue::SequenceAndSortKey(
241 std::move(sequence), sort_key)));
242 } else {
243 shared_sequence_still_has_tasks_callback_.Run(this,
244 std::move(sequence));
245 }
246 }
247
248 // Reset |wake_up_event_| to avoid an extra loop iteration before going to
249 // sleep if the PriorityQueues are empty. This WorkerThread will check its
250 // PriorityQueues and exit conditions at least once before going to sleep
251 // despite the fact that the event is reset here.
252 wake_up_event_.Reset();
253 }
254 }
255
256 std::ostream& operator<<(std::ostream& os, WorkerThread::State state) {
257 switch (state) {
258 case WorkerThread::State::BUSY:
259 os << "BUSY";
260 break;
261 case WorkerThread::State::IDLE:
262 os << "IDLE";
263 break;
264 }
265 return os;
266 }
267
268 } // namespace internal
269 } // namespace base
OLDNEW
« no previous file with comments | « base/task_scheduler/worker_thread.h ('k') | base/task_scheduler/worker_thread_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698