OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "base/task_scheduler/worker_thread.h" |
| 6 |
| 7 #include <utility> |
| 8 #include <vector> |
| 9 |
| 10 #include "base/bind.h" |
| 11 #include "base/bind_helpers.h" |
| 12 #include "base/callback_forward.h" |
| 13 #include "base/logging.h" |
| 14 #include "base/macros.h" |
| 15 #include "base/memory/scoped_ptr.h" |
| 16 #include "base/synchronization/condition_variable.h" |
| 17 #include "base/task_scheduler/priority_queue.h" |
| 18 #include "base/task_scheduler/scheduler_lock.h" |
| 19 #include "base/task_scheduler/task_tracker.h" |
| 20 #include "base/task_scheduler/utils.h" |
| 21 #include "base/threading/simple_thread.h" |
| 22 #include "testing/gtest/include/gtest/gtest.h" |
| 23 |
| 24 namespace base { |
| 25 namespace internal { |
| 26 |
| 27 namespace { |
| 28 |
| 29 const size_t kNumTasksPerTest = 500; |
| 30 |
| 31 class TaskClosureFactory { |
| 32 public: |
| 33 enum class ExpectedRunOrder { |
| 34 SEQUENCED, |
| 35 NO_EXPECTATION, |
| 36 }; |
| 37 |
| 38 explicit TaskClosureFactory( |
| 39 TaskClosureFactory::ExpectedRunOrder expected_run_order) |
| 40 : expected_run_order_(expected_run_order), |
| 41 run_task_cv_(lock_.CreateConditionVariable()) {} |
| 42 |
| 43 ~TaskClosureFactory() { |
| 44 AutoSchedulerLock auto_lock(lock_); |
| 45 EXPECT_EQ(num_created_tasks_, num_run_tasks_); |
| 46 } |
| 47 |
| 48 Closure CreateTaskClosure() { |
| 49 AutoSchedulerLock auto_lock(lock_); |
| 50 return Bind(&TaskClosureFactory::RunTask, Unretained(this), |
| 51 num_created_tasks_++); |
| 52 } |
| 53 |
| 54 void WaitForAllTasksToRun() { |
| 55 AutoSchedulerLock auto_lock(lock_); |
| 56 while (num_run_tasks_ < num_created_tasks_) |
| 57 run_task_cv_->Wait(); |
| 58 } |
| 59 |
| 60 private: |
| 61 void RunTask(size_t task_index) { |
| 62 AutoSchedulerLock auto_lock(lock_); |
| 63 |
| 64 if (expected_run_order_ == ExpectedRunOrder::SEQUENCED && |
| 65 task_index != num_run_tasks_) { |
| 66 ADD_FAILURE() << "Unexpected task execution order."; |
| 67 } |
| 68 |
| 69 ++num_run_tasks_; |
| 70 run_task_cv_->Signal(); |
| 71 } |
| 72 |
| 73 // Synchronizes access to all members. |
| 74 SchedulerLock lock_; |
| 75 |
| 76 // Expectation for the order in which tasks run. |
| 77 const ExpectedRunOrder expected_run_order_; |
| 78 |
| 79 // Signaled when a task runs. |
| 80 scoped_ptr<ConditionVariable> run_task_cv_; |
| 81 |
| 82 // Number of times that CreateTaskClosure() has been called. |
| 83 size_t num_created_tasks_ = 0; |
| 84 |
| 85 // Number of times that RunTask() has been called. |
| 86 size_t num_run_tasks_ = 0; |
| 87 |
| 88 DISALLOW_COPY_AND_ASSIGN(TaskClosureFactory); |
| 89 }; |
| 90 |
| 91 class ThreadPostingTasks : public SimpleThread { |
| 92 public: |
| 93 explicit ThreadPostingTasks(WorkerThread* worker_thread) |
| 94 : SimpleThread("ThreadPostingTasks"), |
| 95 worker_thread_(worker_thread), |
| 96 factory_(TaskClosureFactory::ExpectedRunOrder::SEQUENCED) {} |
| 97 |
| 98 void WaitForAllTasksToRun() { factory_.WaitForAllTasksToRun(); } |
| 99 |
| 100 private: |
| 101 void Run() override { |
| 102 auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()); |
| 103 |
| 104 for (size_t i = 0; i < kNumTasksPerTest; ++i) |
| 105 task_runner->PostTask(FROM_HERE, factory_.CreateTaskClosure()); |
| 106 } |
| 107 |
| 108 WorkerThread* const worker_thread_; |
| 109 TaskClosureFactory factory_; |
| 110 |
| 111 DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks); |
| 112 }; |
| 113 |
| 114 } // namespace |
| 115 |
| 116 class TaskSchedulerWorkerThreadTest : public testing::Test { |
| 117 protected: |
| 118 TaskSchedulerWorkerThreadTest() |
| 119 : shared_priority_queue_(Bind(&DoNothing)), |
| 120 lock_(shared_priority_queue_.container_lock()), |
| 121 state_changed_callback_cv_(lock_.CreateConditionVariable()) {} |
| 122 |
| 123 void SetUp() override { |
| 124 worker_thread_ = WorkerThread::CreateWorkerThread( |
| 125 ThreadPriority::NORMAL, &shared_priority_queue_, |
| 126 Bind(&TaskSchedulerWorkerThreadTest:: |
| 127 PoppedTaskFromSharedSequenceCallback, |
| 128 Unretained(this)), |
| 129 Bind(&TaskSchedulerWorkerThreadTest::StateChangedCallback, |
| 130 Unretained(this)), |
| 131 &task_tracker_); |
| 132 ASSERT_TRUE(worker_thread_); |
| 133 WaitUntilIdle(); |
| 134 |
| 135 AutoSchedulerLock auto_lock(lock_); |
| 136 num_state_changes_ = 0; |
| 137 } |
| 138 |
| 139 void TearDown() override { worker_thread_->JoinForTesting(); } |
| 140 |
| 141 void WaitUntilIdle() { |
| 142 AutoSchedulerLock auto_lock(lock_); |
| 143 while (last_state_ != WorkerThread::State::IDLE) |
| 144 state_changed_callback_cv_->Wait(); |
| 145 } |
| 146 |
| 147 void WaitUntilNumStateChanges(size_t expected_num_state_changes) { |
| 148 AutoSchedulerLock auto_lock(lock_); |
| 149 while (num_state_changes_ < expected_num_state_changes) |
| 150 state_changed_callback_cv_->Wait(); |
| 151 } |
| 152 |
| 153 // Returns the number of state changes that occurred after SetUp() completed |
| 154 // its execution. |
| 155 size_t num_state_changes() const { |
| 156 AutoSchedulerLock auto_lock(lock_); |
| 157 return num_state_changes_; |
| 158 } |
| 159 |
| 160 size_t num_popped_task_from_shared_sequence() const { |
| 161 AutoSchedulerLock auto_lock(lock_); |
| 162 return num_popped_task_from_shared_sequence_; |
| 163 } |
| 164 |
| 165 PriorityQueue shared_priority_queue_; |
| 166 TaskTracker task_tracker_; |
| 167 scoped_ptr<WorkerThread> worker_thread_; |
| 168 |
| 169 private: |
| 170 void PoppedTaskFromSharedSequenceCallback(const WorkerThread* worker_thread, |
| 171 scoped_refptr<Sequence> sequence) { |
| 172 { |
| 173 AutoSchedulerLock auto_lock(lock_); |
| 174 ++num_popped_task_from_shared_sequence_; |
| 175 } |
| 176 |
| 177 // Reinsert sequence in |shared_priority_queue_|. |
| 178 const SequenceSortKey sort_key = sequence->GetSortKey(); |
| 179 shared_priority_queue_.BeginTransaction()->Push(make_scoped_ptr( |
| 180 new PriorityQueue::SequenceAndSortKey(std::move(sequence), sort_key))); |
| 181 } |
| 182 |
| 183 void StateChangedCallback(WorkerThread* worker_thread, |
| 184 WorkerThread::State state) { |
| 185 AutoSchedulerLock auto_lock(lock_); |
| 186 EXPECT_EQ(worker_thread_.get(), worker_thread); |
| 187 EXPECT_NE(last_state_, state); |
| 188 last_state_ = state; |
| 189 ++num_state_changes_; |
| 190 state_changed_callback_cv_->Signal(); |
| 191 } |
| 192 |
| 193 // Synchronizes access to all members below. |
| 194 mutable SchedulerLock lock_; |
| 195 |
| 196 // Condition variable signaled when StateChangedCallback() is invoked. |
| 197 scoped_ptr<ConditionVariable> state_changed_callback_cv_; |
| 198 |
| 199 // Last state reported to StateChangedCallback(). |
| 200 WorkerThread::State last_state_ = WorkerThread::State::BUSY; |
| 201 |
| 202 // Number of times that StateChangedCallback() has been invoked. |
| 203 size_t num_state_changes_ = 0; |
| 204 |
| 205 // Number of times that PoppedTaskFromSharedSequenceCallback() has been |
| 206 // invoked. |
| 207 size_t num_popped_task_from_shared_sequence_ = 0; |
| 208 |
| 209 DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerThreadTest); |
| 210 }; |
| 211 |
| 212 // Verify that each call to WakeUp() on an IDLE WorkerThread causes a state |
| 213 // change to BUSY followed by a state change to IDLE. |
| 214 TEST_F(TaskSchedulerWorkerThreadTest, WakeUp) { |
| 215 for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
| 216 worker_thread_->WakeUp(); |
| 217 |
| 218 // StateChangedCallback() verifies that state alternates between BUSY and |
| 219 // IDLE. |
| 220 |
| 221 WaitUntilNumStateChanges(2 * (i + 1)); |
| 222 EXPECT_EQ(2 * (i + 1), num_state_changes()); |
| 223 } |
| 224 |
| 225 EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
| 226 } |
| 227 |
| 228 // Verify that |kNumTasksPerTest| tasks run successfully when they are posted |
| 229 // through a single-threaded task runner. Don't wait between posts. |
| 230 TEST_F(TaskSchedulerWorkerThreadTest, |
| 231 PostSingleThreadedTasksNoWaitBetweenPosts) { |
| 232 TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
| 233 auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()); |
| 234 |
| 235 for (size_t i = 0; i < kNumTasksPerTest; ++i) |
| 236 task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure()); |
| 237 |
| 238 factory.WaitForAllTasksToRun(); |
| 239 WaitUntilIdle(); |
| 240 EXPECT_GE(num_state_changes(), 2U); |
| 241 EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
| 242 } |
| 243 |
| 244 // Verify that |kNumTasksPerTest| tasks run successfully when they are posted |
| 245 // through a single-threaded task runner. Wait until the previous task has |
| 246 // completed its execution before posting a new task. |
| 247 TEST_F(TaskSchedulerWorkerThreadTest, PostSingleThreadedTasksWaitBetweenPosts) { |
| 248 TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
| 249 auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()); |
| 250 |
| 251 for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
| 252 task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure()); |
| 253 factory.WaitForAllTasksToRun(); |
| 254 WaitUntilIdle(); |
| 255 EXPECT_EQ(2 * (i + 1), num_state_changes()); |
| 256 } |
| 257 |
| 258 EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
| 259 } |
| 260 |
| 261 // Verify that 2 * |kNumTasksPerTest| tasks run successfully when they are |
| 262 // posted through 2 single-threaded task runners. Don't wait between posts. |
| 263 TEST_F(TaskSchedulerWorkerThreadTest, PostTasksTwoSingleThreadedTaskRunners) { |
| 264 TaskClosureFactory factory_a(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
| 265 TaskClosureFactory factory_b(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
| 266 auto task_runner_a = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()); |
| 267 auto task_runner_b = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()); |
| 268 |
| 269 for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
| 270 task_runner_a->PostTask(FROM_HERE, factory_a.CreateTaskClosure()); |
| 271 task_runner_b->PostTask(FROM_HERE, factory_b.CreateTaskClosure()); |
| 272 } |
| 273 |
| 274 factory_a.WaitForAllTasksToRun(); |
| 275 factory_b.WaitForAllTasksToRun(); |
| 276 WaitUntilIdle(); |
| 277 EXPECT_GE(num_state_changes(), 2U); |
| 278 EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
| 279 } |
| 280 |
| 281 // Verify that |kNumTasksPerTest| tasks run successfully when they are added to |
| 282 // the shared priority queue of a WorkerThread in different Sequences. The test |
| 283 // wakes up the WorkerThread once all tasks have been added to the shared |
| 284 // priority queue. This is necessary because shared tasks don't automatically |
| 285 // wake up the WorkerThread. |
| 286 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksNoWaitBetweenPosts) { |
| 287 TaskClosureFactory factory( |
| 288 TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION); |
| 289 for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
| 290 PostTaskHelper(make_scoped_ptr(new Task( |
| 291 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())), |
| 292 make_scoped_refptr(new Sequence), &shared_priority_queue_, |
| 293 &task_tracker_); |
| 294 } |
| 295 |
| 296 worker_thread_->WakeUp(); |
| 297 factory.WaitForAllTasksToRun(); |
| 298 WaitUntilIdle(); |
| 299 EXPECT_EQ(2U, num_state_changes()); |
| 300 EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
| 301 } |
| 302 |
| 303 // Verify that |kNumTasksPerTest| tasks run successfully when they are added to |
| 304 // the shared priority queue of a WorkerThread in different Sequences. The test |
| 305 // wakes up the WorkerThread and waits until the task completes its execution |
| 306 // each time it adds a task to the priority queue. The wake-ups are necessary |
| 307 // because shared tasks don't automatically wake up the WorkerThread. |
| 308 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksWaitBetweenPosts) { |
| 309 TaskClosureFactory factory( |
| 310 TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION); |
| 311 for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
| 312 PostTaskHelper(make_scoped_ptr(new Task( |
| 313 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())), |
| 314 make_scoped_refptr(new Sequence), &shared_priority_queue_, |
| 315 &task_tracker_); |
| 316 worker_thread_->WakeUp(); |
| 317 |
| 318 factory.WaitForAllTasksToRun(); |
| 319 WaitUntilIdle(); |
| 320 EXPECT_EQ(2 * (i + 1), num_state_changes()); |
| 321 } |
| 322 |
| 323 EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
| 324 } |
| 325 |
| 326 // Verify that |kNumTasksPerTest| tasks run successfully when they are added to |
| 327 // the shared priority queue of a WorkerThread (all tasks in the same Sequence). |
| 328 // The test wakes up the WorkerThread once all tasks have been added to the |
| 329 // priority queue. This is necessary because shared tasks don't automatically |
| 330 // wake up the WorkerThread. |
| 331 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksInSameSequence) { |
| 332 TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
| 333 scoped_refptr<Sequence> sequence(new Sequence); |
| 334 |
| 335 for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
| 336 PostTaskHelper(make_scoped_ptr(new Task( |
| 337 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())), |
| 338 sequence, &shared_priority_queue_, &task_tracker_); |
| 339 } |
| 340 |
| 341 worker_thread_->WakeUp(); |
| 342 factory.WaitForAllTasksToRun(); |
| 343 WaitUntilIdle(); |
| 344 EXPECT_EQ(2U, num_state_changes()); |
| 345 EXPECT_EQ(kNumTasksPerTest - 1, num_popped_task_from_shared_sequence()); |
| 346 } |
| 347 |
| 348 // Verify that 2 * |kNumTasksPerTest| tasks run successfully when they are added |
| 349 // to the shared priority queue of a WorkerThread. Tasks are split into 2 |
| 350 // sequences. The test wakes up the WorkerThread once all tasks have been added |
| 351 // to the priority queue. This is necessary because shared tasks don't |
| 352 // automatically wake up the WorkerThread. |
| 353 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksInTwoSequences) { |
| 354 TaskClosureFactory factory_a(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
| 355 TaskClosureFactory factory_b(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
| 356 scoped_refptr<Sequence> sequence_a(new Sequence); |
| 357 scoped_refptr<Sequence> sequence_b(new Sequence); |
| 358 |
| 359 for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
| 360 PostTaskHelper(make_scoped_ptr(new Task( |
| 361 FROM_HERE, factory_a.CreateTaskClosure(), TaskTraits())), |
| 362 sequence_a, &shared_priority_queue_, &task_tracker_); |
| 363 PostTaskHelper(make_scoped_ptr(new Task( |
| 364 FROM_HERE, factory_b.CreateTaskClosure(), TaskTraits())), |
| 365 sequence_b, &shared_priority_queue_, &task_tracker_); |
| 366 } |
| 367 |
| 368 worker_thread_->WakeUp(); |
| 369 factory_a.WaitForAllTasksToRun(); |
| 370 factory_b.WaitForAllTasksToRun(); |
| 371 WaitUntilIdle(); |
| 372 EXPECT_EQ(2U, num_state_changes()); |
| 373 EXPECT_EQ(2 * (kNumTasksPerTest - 1), num_popped_task_from_shared_sequence()); |
| 374 } |
| 375 |
| 376 // Verify that |kNumTasksPerTest| shared tasks and |kNumTasksPerTest| single- |
| 377 // threaded tasks run successfully when they are posted to a WorkerThread. The |
| 378 // test doesn't wake up the WorkerThread after posting a shared task. Wake-ups |
| 379 // are done by the single-threaded TaskRunner when tasks are posted through it. |
| 380 TEST_F(TaskSchedulerWorkerThreadTest, PostSharedAndSingleThreadedTasks) { |
| 381 TaskClosureFactory factory( |
| 382 TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION); |
| 383 |
| 384 for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
| 385 // Post a task in the shared priority queue. Don't wake up the WorkerThread. |
| 386 PostTaskHelper(make_scoped_ptr(new Task( |
| 387 FROM_HERE, factory.CreateTaskClosure(), TaskTraits())), |
| 388 make_scoped_refptr(new Sequence), &shared_priority_queue_, |
| 389 &task_tracker_); |
| 390 |
| 391 // Post a task in the single-threaded priority queue. The TaskRunner will |
| 392 // wake up the WorkerThread. |
| 393 worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()) |
| 394 ->PostTask(FROM_HERE, factory.CreateTaskClosure()); |
| 395 |
| 396 factory.WaitForAllTasksToRun(); |
| 397 WaitUntilIdle(); |
| 398 EXPECT_EQ(2 * (i + 1), num_state_changes()); |
| 399 } |
| 400 |
| 401 EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
| 402 } |
| 403 |
| 404 // Verify that 20 * |kNumTasksPerTest| single-threaded tasks posted to a single |
| 405 // WorkerThread from 20 threads run successfully. |
| 406 TEST_F(TaskSchedulerWorkerThreadTest, |
| 407 PostSingleThreadedTasksFromMultipleThreads) { |
| 408 static const size_t kNumThreads = 20; |
| 409 std::vector<scoped_ptr<ThreadPostingTasks>> threads; |
| 410 for (size_t i = 0; i < kNumThreads; ++i) { |
| 411 threads.push_back( |
| 412 make_scoped_ptr(new ThreadPostingTasks(worker_thread_.get()))); |
| 413 threads.back()->Start(); |
| 414 } |
| 415 |
| 416 for (const auto& thread : threads) { |
| 417 thread->Join(); |
| 418 thread->WaitForAllTasksToRun(); |
| 419 } |
| 420 |
| 421 WaitUntilIdle(); |
| 422 EXPECT_GE(num_state_changes(), 2U); |
| 423 EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
| 424 } |
| 425 |
| 426 } // namespace internal |
| 427 } // namespace base |
OLD | NEW |