Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(304)

Side by Side Diff: base/task_scheduler/worker_thread.cc

Issue 1704113002: TaskScheduler [6] SchedulerWorkerThread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@s_4_shutdown
Patch Set: CR from gab #18 Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/worker_thread.h"
6
7 #include <ostream>
8 #include <utility>
9
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/task_scheduler/task_tracker.h"
13 #include "base/task_scheduler/utils.h"
14 #include "base/time/time.h"
15
16 namespace base {
17 namespace internal {
18
19 namespace {
20
21 class SchedulerSingleThreadTaskRunner : public SingleThreadTaskRunner {
22 public:
23 SchedulerSingleThreadTaskRunner(const TaskTraits& traits,
24 PriorityQueue* single_thread_priority_queue,
25 TaskTracker* task_tracker);
26
27 // SingleThreadTaskRunner:
28 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
29 const Closure& task,
30 TimeDelta delay) override;
31 bool PostDelayedTask(const tracked_objects::Location& from_here,
32 const Closure& closure,
33 TimeDelta delay) override;
34 bool RunsTasksOnCurrentThread() const override;
35
36 private:
37 ~SchedulerSingleThreadTaskRunner() override;
38
39 const TaskTraits traits_;
40 const scoped_refptr<Sequence> sequence_;
41 PriorityQueue* const priority_queue_;
42 TaskTracker* const task_tracker_;
43
44 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
45 };
46
47 SchedulerSingleThreadTaskRunner::SchedulerSingleThreadTaskRunner(
48 const TaskTraits& traits,
49 PriorityQueue* single_thread_priority_queue,
50 TaskTracker* task_tracker)
51 : traits_(traits),
52 sequence_(new Sequence),
53 priority_queue_(single_thread_priority_queue),
54 task_tracker_(task_tracker) {}
55
56 bool SchedulerSingleThreadTaskRunner::PostDelayedTask(
57 const tracked_objects::Location& from_here,
58 const Closure& closure,
59 TimeDelta delay) {
60 // TODO(fdoray): Support delayed tasks.
61 DCHECK(delay.is_zero());
62 PostTaskHelper(make_scoped_ptr(new Task(from_here, closure, traits_)),
63 sequence_, priority_queue_, task_tracker_);
64 return true;
65 }
66
67 bool SchedulerSingleThreadTaskRunner::RunsTasksOnCurrentThread() const {
68 // TODO(fdoray): Return true only if tasks posted may actually run on the
69 // current thread. It is valid, but not ideal, to always return true.
70 return true;
71 }
72
73 bool SchedulerSingleThreadTaskRunner::PostNonNestableDelayedTask(
74 const tracked_objects::Location& from_here,
75 const Closure& task,
76 TimeDelta delay) {
77 // Tasks are never nested on WorkerThread.
78 return PostDelayedTask(from_here, task, delay);
79 }
80
81 SchedulerSingleThreadTaskRunner::~SchedulerSingleThreadTaskRunner() = default;
82
83 } // namespace
84
85 scoped_ptr<WorkerThread> WorkerThread::CreateWorkerThread(
86 ThreadPriority thread_priority,
87 PriorityQueue* shared_priority_queue,
88 const SharedSequenceStillHasTasksCallback&
89 shared_sequence_still_has_tasks_callback,
90 const StateChangedCallback& state_changed_callback,
91 TaskTracker* task_tracker) {
92 scoped_ptr<WorkerThread> worker_thread(
93 new WorkerThread(thread_priority, shared_priority_queue,
94 shared_sequence_still_has_tasks_callback,
95 state_changed_callback, task_tracker));
96
97 if (worker_thread->thread_handle_.is_null())
98 return scoped_ptr<WorkerThread>();
99 return worker_thread;
100 }
101
102 WorkerThread::~WorkerThread() {
103 DCHECK(should_exit_for_testing());
104 }
105
106 void WorkerThread::WakeUp() {
107 wake_up_event_.Signal();
108 }
109
110 scoped_refptr<SingleThreadTaskRunner> WorkerThread::CreateTaskRunnerWithTraits(
111 const TaskTraits& traits) {
112 // A WorkerThread is never destroyed, except in tests in which we don't use
113 // task runners after their WorkerThread has been destroyed. Because of that,
114 // it is correct to keep pointers to WorkerThread members in the constructed
115 // task runner.
116 return scoped_refptr<SingleThreadTaskRunner>(
117 new SchedulerSingleThreadTaskRunner(
118 traits, &single_thread_priority_queue_, task_tracker_));
119 }
120
121 void WorkerThread::JoinForTesting() {
122 should_exit_for_testing_ = true;
123 base::subtle::MemoryBarrier();
124 WakeUp();
125 PlatformThread::Join(thread_handle_);
126 }
127
128 WorkerThread::WorkerThread(ThreadPriority thread_priority,
129 PriorityQueue* shared_priority_queue,
130 const SharedSequenceStillHasTasksCallback&
131 shared_sequence_still_has_tasks_callback,
132 const StateChangedCallback& state_changed_callback,
133 TaskTracker* task_tracker)
134 : wake_up_event_(false, false),
135 single_thread_priority_queue_(
136 Bind(&WorkerThread::WakeUp, Unretained(this)),
137 shared_priority_queue),
138 shared_priority_queue_(shared_priority_queue),
139 shared_sequence_still_has_tasks_callback_(
140 shared_sequence_still_has_tasks_callback),
141 state_changed_callback_(state_changed_callback),
142 task_tracker_(task_tracker) {
143 DCHECK(shared_priority_queue_);
144 DCHECK(!shared_sequence_still_has_tasks_callback_.is_null());
145 DCHECK(!state_changed_callback_.is_null());
146 DCHECK(task_tracker_);
147
148 const size_t kDefaultStackSize = 0;
robliao 2016/03/24 23:03:26 Might as well make this static const.
fdoray 2016/03/29 18:33:34 Done.
149 PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
150 thread_priority);
151 }
152
153 scoped_refptr<Sequence> WorkerThread::GetWork(bool* is_single_threaded) {
154 DCHECK(is_single_threaded);
robliao 2016/03/24 23:03:26 I think this is one of the rare cases where a DCHE
fdoray 2016/03/29 18:33:34 Done.
155 *is_single_threaded = false;
156
157 scoped_ptr<PriorityQueue::Transaction> shared_transaction(
158 shared_priority_queue_->BeginTransaction());
159 const PriorityQueue::SequenceAndSortKey shared_sequence =
160 shared_transaction->Peek();
161
162 scoped_ptr<PriorityQueue::Transaction> single_thread_transaction(
163 single_thread_priority_queue_.BeginTransaction());
164 const PriorityQueue::SequenceAndSortKey single_thread_sequence =
165 single_thread_transaction->Peek();
166
167 if (single_thread_sequence.is_null() && shared_sequence.is_null())
168 return scoped_refptr<Sequence>();
169
170 if (single_thread_sequence.is_null() ||
171 (!shared_sequence.is_null() &&
172 single_thread_sequence.sort_key < shared_sequence.sort_key)) {
173 shared_transaction->Pop();
174 return shared_sequence.sequence;
175 }
176
177 DCHECK(!single_thread_sequence.is_null());
178 single_thread_transaction->Pop();
179 *is_single_threaded = true;
180 return single_thread_sequence.sequence;
181 }
182
183 void WorkerThread::SetState(State state) {
184 DCHECK_NE(state_, state);
185 state_ = state;
186 state_changed_callback_.Run(this, state);
187 }
188
189 void WorkerThread::ThreadMain() {
190 while (!task_tracker_->shutdown_completed() && !should_exit_for_testing()) {
191 // Get the sequence containing the next task to execute.
192 bool sequence_is_single_threaded = false;
193 scoped_refptr<Sequence> sequence = GetWork(&sequence_is_single_threaded);
194 if (!sequence) {
195 SetState(State::IDLE);
196
197 // Check one more time if there is work available. Work could have been
198 // added to |shared_priority_queue_| between the first call to GetWork()
199 // and the call to SetState() above in which case this WorkerThread
200 // shouldn't go to sleep (the recipient of |state_changed_callback_|
201 // hadn't be notified that this WorkerThread was idle and might have
robliao 2016/03/24 23:03:26 Nit: hasn't
fdoray 2016/03/29 18:33:34 I don't think that "hasn't" is correct because at
robliao 2016/03/30 00:42:42 This CL comment is no longer relevant as of patchs
202 // assumed that it was gonna run the newly added work). Note that
203 // |wake_up_event_| is always signaled when work is added to
204 // |single_thread_priority_queue_|.
205 sequence = GetWork(&sequence_is_single_threaded);
206
207 if (!sequence) {
208 wake_up_event_.Wait();
209
210 // Always mark this WorkerThread as BUSY after a wake up, regardless of
211 // whether work is actually found in its PriorityQueues. This ensures
212 // that there won't be 2 consecutive IDLE invocations of
213 // |state_changed_callback_| in case of spurious wake up.
214 SetState(State::BUSY);
215
216 continue;
217 }
218
219 SetState(State::BUSY);
220 wake_up_event_.Reset();
robliao 2016/03/24 23:03:26 Why is this reset necessary since our events are c
fdoray 2016/03/29 18:33:34 I moved the reset at the bottom of this method and
221 }
222
223 DCHECK_EQ(state_, State::BUSY);
224
225 // Peek the next task in |sequence| and run it.
226 task_tracker_->RunTask(sequence->PeekTask());
227
228 // Pop a task from |sequence|. If it is empty after this, reinsert it in
robliao 2016/03/24 23:03:26 I think you mean if the sequence isn't empty. I w
fdoray 2016/03/29 18:33:34 We don't necessary reinsert the sequence in its or
229 // |single_thread_priority_queue_| or notify
230 // |shared_sequence_still_has_tasks_callback_|.
231 if (!sequence->PopTask()) {
232 if (sequence_is_single_threaded) {
233 const SequenceSortKey sort_key = sequence->GetSortKey();
234 single_thread_priority_queue_.BeginTransaction()->Push(
235 make_scoped_ptr(new PriorityQueue::SequenceAndSortKey(
236 std::move(sequence), sort_key)));
237 } else {
238 shared_sequence_still_has_tasks_callback_.Run(this,
239 std::move(sequence));
240 }
241 }
242 }
243 }
244
245 std::ostream& operator<<(std::ostream& os, WorkerThread::State state) {
246 switch (state) {
247 case WorkerThread::State::BUSY:
248 os << "BUSY";
249 break;
250 case WorkerThread::State::IDLE:
251 os << "IDLE";
252 break;
253 }
254 return os;
255 }
256
257 } // namespace internal
258 } // namespace base
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698