Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(120)

Side by Side Diff: base/task_scheduler/worker_thread_unittest.cc

Issue 1704113002: TaskScheduler [6] SchedulerWorkerThread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@s_4_shutdown
Patch Set: self review Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/worker_thread.h"
6
7 #include <utility>
8 #include <vector>
9
10 #include "base/bind.h"
11 #include "base/bind_helpers.h"
12 #include "base/callback_forward.h"
13 #include "base/logging.h"
14 #include "base/macros.h"
15 #include "base/memory/scoped_ptr.h"
16 #include "base/synchronization/condition_variable.h"
17 #include "base/task_scheduler/priority_queue.h"
18 #include "base/task_scheduler/scheduler_lock.h"
19 #include "base/task_scheduler/task_tracker.h"
20 #include "base/task_scheduler/utils.h"
21 #include "base/threading/simple_thread.h"
22 #include "testing/gtest/include/gtest/gtest.h"
23
24 namespace base {
25 namespace internal {
26
27 namespace {
28
29 const size_t kNumTasksPerTest = 500;
30
31 class TaskClosureFactory {
32 public:
33 enum class ExpectedRunOrder {
34 SEQUENCED,
35 NO_EXPECTATION,
36 };
37
38 explicit TaskClosureFactory(
39 TaskClosureFactory::ExpectedRunOrder expected_run_order)
40 : expected_run_order_(expected_run_order),
41 run_task_cv_(lock_.CreateConditionVariable()) {}
42
43 ~TaskClosureFactory() {
44 AutoSchedulerLock auto_lock(lock_);
45 EXPECT_EQ(num_created_tasks_, num_run_tasks_);
46 }
47
48 Closure CreateTaskClosure() {
49 AutoSchedulerLock auto_lock(lock_);
50 return Bind(&TaskClosureFactory::RunTask, Unretained(this),
51 num_created_tasks_++);
52 }
53
54 void WaitForAllTasksToRun() {
55 AutoSchedulerLock auto_lock(lock_);
56 while (num_run_tasks_ < num_created_tasks_)
57 run_task_cv_->Wait();
58 }
59
60 private:
61 void RunTask(size_t task_index) {
62 AutoSchedulerLock auto_lock(lock_);
63
64 if (expected_run_order_ == ExpectedRunOrder::SEQUENCED &&
65 task_index != num_run_tasks_) {
66 ADD_FAILURE() << "Unexpected task execution order.";
67 }
68
69 ++num_run_tasks_;
70 run_task_cv_->Signal();
71 }
72
73 // Synchronizes access to all members.
74 SchedulerLock lock_;
75
76 // Expectation for the order in which tasks run.
77 const ExpectedRunOrder expected_run_order_;
78
79 // Signaled when a task runs.
80 scoped_ptr<ConditionVariable> run_task_cv_;
81
82 // Number of times that CreateTaskClosure() has been called.
83 size_t num_created_tasks_ = 0;
84
85 // Number of times that RunTask() has been called.
86 size_t num_run_tasks_ = 0;
87
88 DISALLOW_COPY_AND_ASSIGN(TaskClosureFactory);
89 };
90
91 class ThreadPostingTasks : public SimpleThread {
92 public:
93 explicit ThreadPostingTasks(WorkerThread* worker_thread)
94 : SimpleThread("ThreadPostingTasks"),
95 worker_thread_(worker_thread),
96 factory_(TaskClosureFactory::ExpectedRunOrder::SEQUENCED) {}
97
98 void WaitForAllTasksToRun() { factory_.WaitForAllTasksToRun(); }
99
100 private:
101 void Run() override {
102 auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
103
104 for (size_t i = 0; i < kNumTasksPerTest; ++i)
105 task_runner->PostTask(FROM_HERE, factory_.CreateTaskClosure());
106 }
107
108 WorkerThread* const worker_thread_;
109 TaskClosureFactory factory_;
110
111 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks);
112 };
113
114 } // namespace
115
116 class TaskSchedulerWorkerThreadTest : public testing::Test {
117 protected:
118 TaskSchedulerWorkerThreadTest()
119 : shared_priority_queue_(Bind(&DoNothing)),
120 lock_(shared_priority_queue_.container_lock()),
121 state_changed_callback_cv_(lock_.CreateConditionVariable()) {}
122
123 void SetUp() override {
124 worker_thread_ = WorkerThread::CreateWorkerThread(
125 ThreadPriority::NORMAL, &shared_priority_queue_,
126 Bind(&TaskSchedulerWorkerThreadTest::
127 PoppedTaskFromSharedSequenceCallback,
128 Unretained(this)),
129 Bind(&TaskSchedulerWorkerThreadTest::StateChangedCallback,
130 Unretained(this)),
131 &task_tracker_);
132 ASSERT_TRUE(worker_thread_);
133 WaitUntilIdle();
134
135 AutoSchedulerLock auto_lock(lock_);
136 num_state_changes_ = 0;
137 }
138
139 void TearDown() override { worker_thread_->JoinForTesting(); }
140
141 void WaitUntilIdle() {
142 AutoSchedulerLock auto_lock(lock_);
143 while (last_state_ != WorkerThread::State::IDLE)
144 state_changed_callback_cv_->Wait();
145 }
146
147 void WaitUntilNumStateChanges(size_t expected_num_state_changes) {
148 AutoSchedulerLock auto_lock(lock_);
149 while (num_state_changes_ < expected_num_state_changes)
150 state_changed_callback_cv_->Wait();
151 }
152
153 // Returns the number of state changes that occurred after SetUp() completed
154 // its execution.
155 size_t num_state_changes() const {
156 AutoSchedulerLock auto_lock(lock_);
157 return num_state_changes_;
158 }
159
160 size_t num_popped_task_from_shared_sequence() const {
161 AutoSchedulerLock auto_lock(lock_);
162 return num_popped_task_from_shared_sequence_;
163 }
164
165 PriorityQueue shared_priority_queue_;
166 TaskTracker task_tracker_;
167 scoped_ptr<WorkerThread> worker_thread_;
168
169 private:
170 void PoppedTaskFromSharedSequenceCallback(const WorkerThread* worker_thread,
171 scoped_refptr<Sequence> sequence) {
172 {
173 AutoSchedulerLock auto_lock(lock_);
174 ++num_popped_task_from_shared_sequence_;
175 }
176
177 // Reinsert sequence in |shared_priority_queue_|.
178 const SequenceSortKey sort_key = sequence->GetSortKey();
179 shared_priority_queue_.BeginTransaction()->Push(make_scoped_ptr(
180 new PriorityQueue::SequenceAndSortKey(std::move(sequence), sort_key)));
181 }
182
183 void StateChangedCallback(WorkerThread* worker_thread,
184 WorkerThread::State state) {
185 AutoSchedulerLock auto_lock(lock_);
186 EXPECT_EQ(worker_thread_.get(), worker_thread);
187 EXPECT_NE(last_state_, state);
188 last_state_ = state;
189 ++num_state_changes_;
190 state_changed_callback_cv_->Signal();
191 }
192
193 // Synchronizes access to all members below.
194 mutable SchedulerLock lock_;
195
196 // Condition variable signaled when StateChangedCallback() is invoked.
197 scoped_ptr<ConditionVariable> state_changed_callback_cv_;
198
199 // Last state reported to StateChangedCallback().
200 WorkerThread::State last_state_ = WorkerThread::State::BUSY;
201
202 // Number of times that StateChangedCallback() has been invoked.
203 size_t num_state_changes_ = 0;
204
205 // Number of times that PoppedTaskFromSharedSequenceCallback() has been
206 // invoked.
207 size_t num_popped_task_from_shared_sequence_ = 0;
208
209 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerThreadTest);
210 };
211
212 // Verify that each call to WakeUp() on an IDLE WorkerThread causes a state
213 // change to BUSY followed by a state change to IDLE.
214 TEST_F(TaskSchedulerWorkerThreadTest, WakeUp) {
215 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
216 worker_thread_->WakeUp();
217
218 // StateChangedCallback() verifies that state alternates between BUSY and
219 // IDLE.
220
221 WaitUntilNumStateChanges(2 * (i + 1));
222 EXPECT_EQ(2 * (i + 1), num_state_changes());
223 }
224
225 EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
226 }
227
228 // Verify that |kNumTasksPerTest| tasks run successfully when they are posted
229 // through a single-threaded task runner. Don't wait between posts.
230 TEST_F(TaskSchedulerWorkerThreadTest,
231 PostSingleThreadedTasksNoWaitBetweenPosts) {
232 TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
233 auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
234
235 for (size_t i = 0; i < kNumTasksPerTest; ++i)
236 task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure());
237
238 factory.WaitForAllTasksToRun();
239 WaitUntilIdle();
240 EXPECT_GE(num_state_changes(), 2U);
241 EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
242 }
243
244 // Verify that |kNumTasksPerTest| tasks run successfully when they are posted
245 // through a single-threaded task runner. Wait until the previous task has
246 // completed its execution before posting a new task.
247 TEST_F(TaskSchedulerWorkerThreadTest, PostSingleThreadedTasksWaitBetweenPosts) {
248 TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
249 auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
250
251 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
252 task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure());
253 factory.WaitForAllTasksToRun();
254 WaitUntilIdle();
255 EXPECT_EQ(2 * (i + 1), num_state_changes());
256 }
257
258 EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
259 }
260
261 // Verify that 2 * |kNumTasksPerTest| tasks run successfully when they are
262 // posted through 2 single-threaded task runners. Don't wait between posts.
263 TEST_F(TaskSchedulerWorkerThreadTest, PostTasksTwoSingleThreadedTaskRunners) {
264 TaskClosureFactory factory_a(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
265 TaskClosureFactory factory_b(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
266 auto task_runner_a = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
267 auto task_runner_b = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
268
269 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
270 task_runner_a->PostTask(FROM_HERE, factory_a.CreateTaskClosure());
271 task_runner_b->PostTask(FROM_HERE, factory_b.CreateTaskClosure());
272 }
273
274 factory_a.WaitForAllTasksToRun();
275 factory_b.WaitForAllTasksToRun();
276 WaitUntilIdle();
277 EXPECT_GE(num_state_changes(), 2U);
278 EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
279 }
280
281 // Verify that |kNumTasksPerTest| tasks run successfully when they are added to
282 // the shared priority queue of a WorkerThread in different Sequences. The test
283 // wakes up the WorkerThread once all tasks have been added to the shared
284 // priority queue. This is necessary because shared tasks don't automatically
285 // wake up the WorkerThread.
286 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksNoWaitBetweenPosts) {
287 TaskClosureFactory factory(
288 TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION);
289 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
290 PostTaskHelper(make_scoped_ptr(new Task(
291 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
292 make_scoped_refptr(new Sequence), &shared_priority_queue_,
293 &task_tracker_);
294 }
295
296 worker_thread_->WakeUp();
297 factory.WaitForAllTasksToRun();
298 WaitUntilIdle();
299 EXPECT_EQ(2U, num_state_changes());
300 EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
301 }
302
303 // Verify that |kNumTasksPerTest| tasks run successfully when they are added to
304 // the shared priority queue of a WorkerThread in different Sequences. The test
305 // wakes up the WorkerThread and waits until the task completes its execution
306 // each time it adds a task to the priority queue. The wake-ups are necessary
307 // because shared tasks don't automatically wake up the WorkerThread.
308 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksWaitBetweenPosts) {
309 TaskClosureFactory factory(
310 TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION);
311 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
312 PostTaskHelper(make_scoped_ptr(new Task(
313 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
314 make_scoped_refptr(new Sequence), &shared_priority_queue_,
315 &task_tracker_);
316 worker_thread_->WakeUp();
317
318 factory.WaitForAllTasksToRun();
319 WaitUntilIdle();
320 EXPECT_EQ(2 * (i + 1), num_state_changes());
321 }
322
323 EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
324 }
325
326 // Verify that |kNumTasksPerTest| tasks run successfully when they are added to
327 // the shared priority queue of a WorkerThread (all tasks in the same Sequence).
328 // The test wakes up the WorkerThread once all tasks have been added to the
329 // priority queue. This is necessary because shared tasks don't automatically
330 // wake up the WorkerThread.
331 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksInSameSequence) {
332 TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
333 scoped_refptr<Sequence> sequence(new Sequence);
334
335 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
336 PostTaskHelper(make_scoped_ptr(new Task(
337 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
338 sequence, &shared_priority_queue_, &task_tracker_);
339 }
340
341 worker_thread_->WakeUp();
342 factory.WaitForAllTasksToRun();
343 WaitUntilIdle();
344 EXPECT_EQ(2U, num_state_changes());
345 EXPECT_EQ(kNumTasksPerTest - 1, num_popped_task_from_shared_sequence());
346 }
347
348 // Verify that 2 * |kNumTasksPerTest| tasks run successfully when they are added
349 // to the shared priority queue of a WorkerThread. Tasks are split into 2
350 // sequences. The test wakes up the WorkerThread once all tasks have been added
351 // to the priority queue. This is necessary because shared tasks don't
352 // automatically wake up the WorkerThread.
353 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksInTwoSequences) {
354 TaskClosureFactory factory_a(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
355 TaskClosureFactory factory_b(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
356 scoped_refptr<Sequence> sequence_a(new Sequence);
357 scoped_refptr<Sequence> sequence_b(new Sequence);
358
359 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
360 PostTaskHelper(make_scoped_ptr(new Task(
361 FROM_HERE, factory_a.CreateTaskClosure(), TaskTraits())),
362 sequence_a, &shared_priority_queue_, &task_tracker_);
363 PostTaskHelper(make_scoped_ptr(new Task(
364 FROM_HERE, factory_b.CreateTaskClosure(), TaskTraits())),
365 sequence_b, &shared_priority_queue_, &task_tracker_);
366 }
367
368 worker_thread_->WakeUp();
369 factory_a.WaitForAllTasksToRun();
370 factory_b.WaitForAllTasksToRun();
371 WaitUntilIdle();
372 EXPECT_EQ(2U, num_state_changes());
373 EXPECT_EQ(2 * (kNumTasksPerTest - 1), num_popped_task_from_shared_sequence());
374 }
375
376 // Verify that |kNumTasksPerTest| shared tasks and |kNumTasksPerTest| single-
377 // threaded tasks run successfully when they are posted to a WorkerThread. The
378 // test doesn't wake up the WorkerThread after posting a shared task. Wake-ups
379 // are done by the single-threaded TaskRunner when tasks are posted through it.
380 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedAndSingleThreadedTasks) {
381 TaskClosureFactory factory(
382 TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION);
383
384 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
385 // Post a task in the shared priority queue. Don't wake up the WorkerThread.
386 PostTaskHelper(make_scoped_ptr(new Task(
387 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
388 make_scoped_refptr(new Sequence), &shared_priority_queue_,
389 &task_tracker_);
390
391 // Post a task in the single-threaded priority queue. The TaskRunner will
392 // wake up the WorkerThread.
393 worker_thread_->CreateTaskRunnerWithTraits(TaskTraits())
394 ->PostTask(FROM_HERE, factory.CreateTaskClosure());
395
396 factory.WaitForAllTasksToRun();
397 WaitUntilIdle();
398 EXPECT_EQ(2 * (i + 1), num_state_changes());
399 }
400
401 EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
402 }
403
404 // Verify that 20 * |kNumTasksPerTest| single-threaded tasks posted to a single
405 // WorkerThread from 20 threads run successfully.
406 TEST_F(TaskSchedulerWorkerThreadTest,
407 PostSingleThreadedTasksFromMultipleThreads) {
408 static const size_t kNumThreads = 20;
409 std::vector<scoped_ptr<ThreadPostingTasks>> threads;
410 for (size_t i = 0; i < kNumThreads; ++i) {
411 threads.push_back(
412 make_scoped_ptr(new ThreadPostingTasks(worker_thread_.get())));
413 threads.back()->Start();
414 }
415
416 for (const auto& thread : threads) {
417 thread->Join();
418 thread->WaitForAllTasksToRun();
419 }
420
421 WaitUntilIdle();
422 EXPECT_GE(num_state_changes(), 2U);
423 EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
424 }
425
426 } // namespace internal
427 } // namespace base
OLDNEW
« base/task_scheduler/worker_thread.cc ('K') | « base/task_scheduler/worker_thread.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698