Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(134)

Side by Side Diff: base/task_scheduler/worker_thread_unittest.cc

Issue 1704113002: TaskScheduler [6] SchedulerWorkerThread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@s_4_shutdown
Patch Set: CR from gab #18 Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/worker_thread.h"
6
7 #include <utility>
8 #include <vector>
9
10 #include "base/bind.h"
11 #include "base/bind_helpers.h"
12 #include "base/callback_forward.h"
13 #include "base/logging.h"
14 #include "base/macros.h"
15 #include "base/memory/scoped_ptr.h"
16 #include "base/synchronization/condition_variable.h"
17 #include "base/task_scheduler/priority_queue.h"
18 #include "base/task_scheduler/scheduler_lock.h"
19 #include "base/task_scheduler/task_tracker.h"
20 #include "base/task_scheduler/utils.h"
21 #include "base/threading/simple_thread.h"
22 #include "testing/gtest/include/gtest/gtest.h"
23
24 namespace base {
25 namespace internal {
26
27 namespace {
28
29 const size_t kNumTasksPerTest = 500;
robliao 2016/03/24 23:03:27 Why 500?
fdoray 2016/03/29 18:33:35 A high number of tasks increases the chances to ca
30
31 class TaskClosureFactory {
robliao 2016/03/24 23:03:26 Optional: Maybe SequencedTaskClosureFactory?
fdoray 2016/03/29 18:33:34 Now I use the factory to create non-sequenced task
32 public:
33 TaskClosureFactory() : run_task_cv_(lock_.CreateConditionVariable()) {}
34
35 ~TaskClosureFactory() {
36 AutoSchedulerLock auto_lock(lock_);
37 EXPECT_EQ(num_factored_tasks_, num_run_tasks_);
38 }
39
40 Closure CreateTaskClosure() {
41 AutoSchedulerLock auto_lock(lock_);
42 ++num_factored_tasks_;
43 return Bind(&TaskClosureFactory::RunTask, Unretained(this),
44 num_factored_tasks_);
45 }
46
47 void WaitUntilLastFactoredTaskRan() {
robliao 2016/03/24 23:03:26 WaitForAllTasksToRun
fdoray 2016/03/29 18:33:35 Done.
48 AutoSchedulerLock auto_lock(lock_);
49 while (num_factored_tasks_ != num_run_tasks_)
robliao 2016/03/24 23:03:26 Nit: num_run_tasks_ < num_created_tasks_ Makes it
fdoray 2016/03/29 18:33:35 Done.
50 run_task_cv_->Wait();
51 }
52
53 private:
54 void RunTask(size_t task_index) {
55 AutoSchedulerLock auto_lock(lock_);
56
57 if (task_index != num_run_tasks_ + 1)
robliao 2016/03/24 23:03:27 If you allow task_index to be zero, you can avoid
fdoray 2016/03/29 18:33:34 Done.
58 ADD_FAILURE() << "Unexpected task execution order.";
59
60 ++num_run_tasks_;
61 run_task_cv_->Signal();
62 }
63
64 // Synchronizes access to all members.
65 SchedulerLock lock_;
66
67 // Signaled when a task runs.
68 scoped_ptr<ConditionVariable> run_task_cv_;
69
70 // Number of times that CreateTaskClosure() has been called.
71 size_t num_factored_tasks_ = 0;
robliao 2016/03/24 23:03:26 num_created_tasks_
fdoray 2016/03/29 18:33:34 Done.
72
73 // Number of times that RunTask() has been called.
74 size_t num_run_tasks_ = 0;
75
76 DISALLOW_COPY_AND_ASSIGN(TaskClosureFactory);
77 };
78
79 class ThreadPostingTasks : public SimpleThread {
80 public:
81 ThreadPostingTasks(WorkerThread* worker_thread)
82 : SimpleThread("ThreadPostingTasks"), worker_thread_(worker_thread) {}
83
84 void WaitUntilLastFactoredTaskRan() {
85 factory_.WaitUntilLastFactoredTaskRan();
86 }
87
88 private:
89 void Run() override {
90 auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
91
92 for (size_t i = 0; i < kNumTasksPerTest; ++i)
93 task_runner->PostTask(FROM_HERE, factory_.CreateTaskClosure());
94 }
95
96 WorkerThread* const worker_thread_;
97 TaskClosureFactory factory_;
98
99 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks);
100 };
101
102 } // namespace
103
104 class TaskSchedulerWorkerThreadTest : public testing::Test {
105 protected:
106 TaskSchedulerWorkerThreadTest()
107 : shared_priority_queue_(Bind(&DoNothing)),
108 state_changed_callback_cv_(lock_.CreateConditionVariable()) {}
109
110 virtual void SetUp() override {
robliao 2016/03/24 23:03:26 Nit: virtual not needed with override. Here and be
fdoray 2016/03/29 18:33:35 Done.
111 worker_thread_ = WorkerThread::CreateWorkerThread(
112 ThreadPriority::NORMAL, &shared_priority_queue_,
113 Bind(&TaskSchedulerWorkerThreadTest::
114 PoppedTaskFromSharedSequenceCallback,
115 Unretained(this)),
116 Bind(&TaskSchedulerWorkerThreadTest::StateChangedCallback,
117 Unretained(this)),
118 &task_tracker_);
119 ASSERT_TRUE(worker_thread_);
120 WaitUntilIdle();
121
122 AutoSchedulerLock auto_lock(lock_);
123 num_state_changes_ = 0;
124 }
125
126 virtual void TearDown() override { worker_thread_->JoinForTesting(); }
127
128 void WaitUntilIdle() {
129 AutoSchedulerLock auto_lock(lock_);
130 while (last_state_ != WorkerThread::State::IDLE)
131 state_changed_callback_cv_->Wait();
132 }
133
134 void WaitUntilNumStateChanges(size_t expected_num_state_changes) {
135 AutoSchedulerLock auto_lock(lock_);
136 while (num_state_changes_ != expected_num_state_changes)
robliao 2016/03/24 23:03:27 Nit: num_state_changes_ < expected_num_state_chang
fdoray 2016/03/29 18:33:35 Done.
137 state_changed_callback_cv_->Wait();
138 }
139
140 // Returns the number of state changes that occurred after SetUp() completed
141 // its execution.
142 size_t num_state_changes() const {
143 AutoSchedulerLock auto_lock(lock_);
144 return num_state_changes_;
145 }
146
147 size_t num_popped_task_from_shared_sequence() const {
148 AutoSchedulerLock auto_lock(lock_);
149 return num_popped_task_from_shared_sequence_;
150 }
151
152 PriorityQueue shared_priority_queue_;
153 TaskTracker task_tracker_;
154 scoped_ptr<WorkerThread> worker_thread_;
155
156 private:
157 void PoppedTaskFromSharedSequenceCallback(const WorkerThread* worker_thread,
158 scoped_refptr<Sequence> sequence) {
159 {
160 AutoSchedulerLock auto_lock(lock_);
161 ++num_popped_task_from_shared_sequence_;
162 }
163
164 // Reinsert sequence in |shared_priority_queue_|.
165 const SequenceSortKey sort_key = sequence->GetSortKey();
166 shared_priority_queue_.BeginTransaction()->Push(make_scoped_ptr(
167 new PriorityQueue::SequenceAndSortKey(std::move(sequence), sort_key)));
168 }
169
170 void StateChangedCallback(WorkerThread* worker_thread,
171 WorkerThread::State state) {
172 AutoSchedulerLock auto_lock(lock_);
173 EXPECT_EQ(worker_thread_.get(), worker_thread);
174 EXPECT_NE(last_state_, state);
175 last_state_ = state;
176 ++num_state_changes_;
177 state_changed_callback_cv_->Signal();
178 }
179
180 // Synchronizes access to all members below.
181 mutable SchedulerLock lock_;
182
183 // Condition variable signaled when StateChangedCallback() is invoked.
184 scoped_ptr<ConditionVariable> state_changed_callback_cv_;
185
186 // Last state reported to StateChangedCallback().
187 WorkerThread::State last_state_ = WorkerThread::State::BUSY;
188
189 // Number of times that StateChangedCallback() has been invoked.
190 size_t num_state_changes_ = 0;
191
192 // Number of times that PoppedTaskFromSharedSequenceCallback() has been
193 // invoked.
194 size_t num_popped_task_from_shared_sequence_ = 0;
195
196 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerThreadTest);
197 };
198
199 // Verify each call to WakeUp() on an idle WorkerThread causes 2 state changes.
robliao 2016/03/24 23:03:26 This seems like a brittle test, especially if for
fdoray 2016/03/29 18:33:34 StateChangedCallback() verifies that the states go
200 TEST_F(TaskSchedulerWorkerThreadTest, WakeUp) {
201 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
202 worker_thread_->WakeUp();
203 WaitUntilNumStateChanges(2 * (i + 1));
robliao 2016/03/24 23:03:26 Should this instead WaitUntilIdle? If so, you can
fdoray 2016/03/29 18:33:34 Replacing WaitUntilNumStateChanges() with WaitUnti
204 EXPECT_EQ(2 * (i + 1), num_state_changes());
205 }
206
207 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
208 }
209
210 // Verify that a task runs successfully when it is posted through a
211 // task runner returned by WorkerThread::CreateTaskRunnerWithTraits().
212 TEST_F(TaskSchedulerWorkerThreadTest, PostOneSingleThreadedTask) {
213 TaskClosureFactory factory;
214 worker_thread_->CreateTaskRunnerWithTraits(TaskTraits())
215 ->PostTask(FROM_HERE, factory.CreateTaskClosure());
216
217 factory.WaitUntilLastFactoredTaskRan();
218 WaitUntilIdle();
219 EXPECT_EQ(2, num_state_changes());
220 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
221 }
222
223 // Verify that 500 tasks run successfully when they are posted through a task
robliao 2016/03/24 23:03:27 Maybe |kNumTasksPerTest| instead of 500.
fdoray 2016/03/29 18:33:34 Done.
224 // runner returned by WorkerThread::CreateTaskRunnerWithTraits(). Don't wait
225 // between posts.
226 TEST_F(TaskSchedulerWorkerThreadTest,
227 PostMultipleSingleThreadedTasksNoWaitBetweenPosts) {
228 TaskClosureFactory factory;
229 auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
230
231 for (size_t i = 0; i < kNumTasksPerTest; ++i)
232 task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure());
233
234 factory.WaitUntilLastFactoredTaskRan();
235 WaitUntilIdle();
236 EXPECT_GE(num_state_changes(), 2);
237 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
238 }
239
240 // Verify that 500 tasks run successfully when they are posted through a task
241 // runner returned by WorkerThread::CreateTaskRunnerWithTraits(). Wait until the
242 // previous task has completed its execution before posting a new task.
243 TEST_F(TaskSchedulerWorkerThreadTest,
robliao 2016/03/24 23:03:27 Isn't this similar to PostOneSingleThreadedTask, j
fdoray 2016/03/29 18:33:34 Yes, PostOneSingleThreadedTask is equivalent to th
244 PostMultipleSingleThreadedTasksWaitBetweenPosts) {
245 TaskClosureFactory factory;
246 auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
247
248 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
249 task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure());
250 factory.WaitUntilLastFactoredTaskRan();
251 WaitUntilIdle();
252 EXPECT_EQ(2 * (i + 1), num_state_changes());
253 }
254
255 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
256 }
257
258 // Verify that 1000 tasks run successfully when they are posted through 2 task
259 // runners returned by WorkerThread::CreateTaskRunnerWithTraits(). Don't wait
260 // between posts.
261 TEST_F(TaskSchedulerWorkerThreadTest,
262 PostMultipleTasksTwoSingleThreadedTaskRunners) {
263 TaskClosureFactory factory;
264 auto task_runner_a = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
265 auto task_runner_b = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
266
267 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
268 task_runner_a->PostTask(FROM_HERE, factory.CreateTaskClosure());
269 task_runner_b->PostTask(FROM_HERE, factory.CreateTaskClosure());
robliao 2016/03/24 23:03:26 Do we guarantee task execution ordering across tas
fdoray 2016/03/29 18:33:35 The current implementation runs tasks posted to th
270 }
271
272 factory.WaitUntilLastFactoredTaskRan();
273 WaitUntilIdle();
274 EXPECT_GE(num_state_changes(), 2);
275 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
276 }
277
278 // Verify that a task runs successfully when it is added to the shared priority
279 // queue of a WorkerThread. The test wakes up the WorkerThread after adding the
280 // task to the priority queue.
robliao 2016/03/24 23:03:27 Mention that the wakeup is necessary since shared
fdoray 2016/03/29 18:33:35 Done.
281 TEST_F(TaskSchedulerWorkerThreadTest, PostOneSharedTask) {
282 TaskClosureFactory factory;
283 PostTaskHelper(make_scoped_ptr(new Task(
284 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
285 make_scoped_refptr(new Sequence), &shared_priority_queue_,
286 &task_tracker_);
287
288 worker_thread_->WakeUp();
289 factory.WaitUntilLastFactoredTaskRan();
290 WaitUntilIdle();
291 EXPECT_EQ(2, num_state_changes());
292 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
293 }
294
295 // Verify that 500 tasks run successfully when they are added to the shared
296 // priority queue of a WorkerThread in different Sequences. The test wakes up
297 // the WorkerThread each time it adds a task to the priority queue.
robliao 2016/03/24 23:03:27 The worker thread looks like it's only woken up on
fdoray 2016/03/29 18:33:34 Done.
298 TEST_F(TaskSchedulerWorkerThreadTest,
299 PostMultipleSharedTasksNoWaitBetweenPosts) {
300 TaskClosureFactory factory;
301 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
302 PostTaskHelper(make_scoped_ptr(new Task(
303 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
304 make_scoped_refptr(new Sequence), &shared_priority_queue_,
305 &task_tracker_);
306 }
307
308 worker_thread_->WakeUp();
309 factory.WaitUntilLastFactoredTaskRan();
310 WaitUntilIdle();
311 EXPECT_EQ(2, num_state_changes());
312 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
313 }
314
315 // Verify that 500 tasks run successfully when they are added to the shared
316 // priority queue of a WorkerThread in different Sequences. The test wakes up
317 // the WorkerThread and waits until the task completes its execution each time
318 // it adds a task to the priority queue.
319 TEST_F(TaskSchedulerWorkerThreadTest, PostMultipleSharedTasksWaitBetweenPosts) {
320 TaskClosureFactory factory;
321 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
322 PostTaskHelper(make_scoped_ptr(new Task(
323 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
324 make_scoped_refptr(new Sequence), &shared_priority_queue_,
325 &task_tracker_);
326 worker_thread_->WakeUp();
327
328 factory.WaitUntilLastFactoredTaskRan();
329 WaitUntilIdle();
330 EXPECT_EQ(2 * (i + 1), num_state_changes());
331 }
332
333 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
334 }
335
336 // Verify that 500 tasks run successfully when they are added to the shared
337 // priority queue of a WorkerThread (all tasks in the same Sequence). The test
338 // wakes up the WorkerThread once it has added the 500 tasks to the priority
339 // queue.
340 TEST_F(TaskSchedulerWorkerThreadTest, PostMultipleSharedTasksInSameSequence) {
341 TaskClosureFactory factory;
342 scoped_refptr<Sequence> sequence(new Sequence);
343
344 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
345 PostTaskHelper(make_scoped_ptr(new Task(
346 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
347 sequence, &shared_priority_queue_, &task_tracker_);
348 }
349
350 worker_thread_->WakeUp();
351 factory.WaitUntilLastFactoredTaskRan();
352 WaitUntilIdle();
353 EXPECT_EQ(2, num_state_changes());
354 EXPECT_EQ(kNumTasksPerTest - 1, num_popped_task_from_shared_sequence());
355 }
356
357 // Verify that 1000 tasks run successfully when they are added to the shared
358 // priority queue of a WorkerThread. Tasks are split into 2 sequences. The test
359 // wakes up the WorkerThread once it has added the 1000 tasks to the priority
360 // queue.
361 TEST_F(TaskSchedulerWorkerThreadTest, PostMultipleSharedTasksInTwoSequences) {
362 TaskClosureFactory factory;
363 scoped_refptr<Sequence> sequence_a(new Sequence);
364 scoped_refptr<Sequence> sequence_b(new Sequence);
365
366 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
367 PostTaskHelper(make_scoped_ptr(new Task(
368 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
369 sequence_a, &shared_priority_queue_, &task_tracker_);
robliao 2016/03/24 23:03:27 We definitely don't guarantee interleaving across
fdoray 2016/03/29 18:33:34 Done.
370 PostTaskHelper(make_scoped_ptr(new Task(
371 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
372 sequence_b, &shared_priority_queue_, &task_tracker_);
373 }
374
375 worker_thread_->WakeUp();
376 factory.WaitUntilLastFactoredTaskRan();
377 WaitUntilIdle();
378 EXPECT_EQ(2, num_state_changes());
379 EXPECT_EQ(2 * (kNumTasksPerTest - 1), num_popped_task_from_shared_sequence());
380 }
381
382 // Verify that a shared task and a single-threaded task run successfully when
383 // they are posted to a WorkerThread. The WorkerThread is not woken up by the
384 // test after the shared task is posted. The wake-up is done by the task runner
385 // through which the single-thread task is posted.
386 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedAndSingleThreadedTasks) {
387 TaskClosureFactory factory;
388
389 // Post a task in the shared priority queue. Don't wake up the WorkerThread.
390 PostTaskHelper(make_scoped_ptr(new Task(
391 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
392 make_scoped_refptr(new Sequence), &shared_priority_queue_,
393 &task_tracker_);
394
395 // Post a task in the single-threaded priority queue. The TaskRunner will wake
396 // up the WorkerThread.
397 worker_thread_->CreateTaskRunnerWithTraits(TaskTraits())
398 ->PostTask(FROM_HERE, factory.CreateTaskClosure());
399
400 factory.WaitUntilLastFactoredTaskRan();
401 WaitUntilIdle();
402 EXPECT_EQ(2, num_state_changes());
403 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
404 }
405
406 // Verify that 500 shared tasks and 500 single-threaded tasks run successfully
407 // when they are posted to a WorkerThread. The WorkerThread is not woken up by
408 // the test after a shared task is posted. The wake-ups are done by the task
409 // runner through which the single-thread tasks are posted.
410 TEST_F(TaskSchedulerWorkerThreadTest,
411 PostMultipleSharedAndSingleThreadedTasks) {
412 TaskClosureFactory factory;
413
414 for (size_t i = 0; i < kNumTasksPerTest; ++i) {
415 // Post a task in the shared priority queue. Don't wake up the WorkerThread.
416 PostTaskHelper(make_scoped_ptr(new Task(
417 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
418 make_scoped_refptr(new Sequence), &shared_priority_queue_,
419 &task_tracker_);
420
421 // Post a task in the single-threaded priority queue. The TaskRunner will
422 // wake up the WorkerThread.
423 worker_thread_->CreateTaskRunnerWithTraits(TaskTraits())
424 ->PostTask(FROM_HERE, factory.CreateTaskClosure());
425
426 factory.WaitUntilLastFactoredTaskRan();
427 WaitUntilIdle();
428 EXPECT_EQ(2 * (i + 1), num_state_changes());
429 }
430
431 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
432 }
433
434 // Verify that 10000 single-threaded tasks posted to a single WorkerThread from
435 // 10 threads run successfully.
436 TEST_F(TaskSchedulerWorkerThreadTest,
437 PostSingleThreadedTasksFromMultipleThreads) {
438 const size_t kNumThreads = 20;
439 std::vector<scoped_ptr<ThreadPostingTasks>> threads;
440 for (size_t i = 0; i < kNumThreads; ++i) {
441 threads.push_back(
442 make_scoped_ptr(new ThreadPostingTasks(worker_thread_.get())));
443 threads.back()->Start();
444 }
445
446 for (const auto& thread : threads) {
447 thread->Join();
448 thread->WaitUntilLastFactoredTaskRan();
449 }
450
451 WaitUntilIdle();
452 EXPECT_GE(num_state_changes(), 2);
453 EXPECT_EQ(0, num_popped_task_from_shared_sequence());
454 }
455
456 } // namespace internal
457 } // namespace base
OLDNEW
« base/task_scheduler/worker_thread.cc ('K') | « base/task_scheduler/worker_thread.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698