Index: base/task_scheduler/worker_thread_unittest.cc |
diff --git a/base/task_scheduler/worker_thread_unittest.cc b/base/task_scheduler/worker_thread_unittest.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..27d0649cb526a0e5059b284eb474a10d478046e9 |
--- /dev/null |
+++ b/base/task_scheduler/worker_thread_unittest.cc |
@@ -0,0 +1,427 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "base/task_scheduler/worker_thread.h" |
+ |
+#include <utility> |
+#include <vector> |
+ |
+#include "base/bind.h" |
+#include "base/bind_helpers.h" |
+#include "base/callback_forward.h" |
+#include "base/logging.h" |
+#include "base/macros.h" |
+#include "base/memory/scoped_ptr.h" |
+#include "base/synchronization/condition_variable.h" |
+#include "base/task_scheduler/priority_queue.h" |
+#include "base/task_scheduler/scheduler_lock.h" |
+#include "base/task_scheduler/task_tracker.h" |
+#include "base/task_scheduler/utils.h" |
+#include "base/threading/simple_thread.h" |
+#include "testing/gtest/include/gtest/gtest.h" |
+ |
+namespace base { |
+namespace internal { |
+ |
+namespace { |
+ |
+const size_t kNumTasksPerTest = 500; |
+ |
+class TaskClosureFactory { |
+ public: |
+ enum class ExpectedRunOrder { |
+ SEQUENCED, |
+ NO_EXPECTATION, |
+ }; |
+ |
+ explicit TaskClosureFactory( |
+ TaskClosureFactory::ExpectedRunOrder expected_run_order) |
+ : expected_run_order_(expected_run_order), |
+ run_task_cv_(lock_.CreateConditionVariable()) {} |
+ |
+ ~TaskClosureFactory() { |
+ AutoSchedulerLock auto_lock(lock_); |
+ EXPECT_EQ(num_created_tasks_, num_run_tasks_); |
+ } |
+ |
+ Closure CreateTaskClosure() { |
+ AutoSchedulerLock auto_lock(lock_); |
+ return Bind(&TaskClosureFactory::RunTask, Unretained(this), |
+ num_created_tasks_++); |
+ } |
+ |
+ void WaitForAllTasksToRun() { |
+ AutoSchedulerLock auto_lock(lock_); |
+ while (num_run_tasks_ < num_created_tasks_) |
+ run_task_cv_->Wait(); |
+ } |
+ |
+ private: |
+ void RunTask(size_t task_index) { |
+ AutoSchedulerLock auto_lock(lock_); |
+ |
+ if (expected_run_order_ == ExpectedRunOrder::SEQUENCED && |
+ task_index != num_run_tasks_) { |
+ ADD_FAILURE() << "Unexpected task execution order."; |
+ } |
+ |
+ ++num_run_tasks_; |
+ run_task_cv_->Signal(); |
+ } |
+ |
+ // Synchronizes access to all members. |
+ SchedulerLock lock_; |
+ |
+ // Expectation for the order in which tasks run. |
+ const ExpectedRunOrder expected_run_order_; |
+ |
+ // Signaled when a task runs. |
+ scoped_ptr<ConditionVariable> run_task_cv_; |
+ |
+ // Number of times that CreateTaskClosure() has been called. |
+ size_t num_created_tasks_ = 0; |
+ |
+ // Number of times that RunTask() has been called. |
+ size_t num_run_tasks_ = 0; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(TaskClosureFactory); |
+}; |
+ |
+class ThreadPostingTasks : public SimpleThread { |
+ public: |
+ explicit ThreadPostingTasks(WorkerThread* worker_thread) |
+ : SimpleThread("ThreadPostingTasks"), |
+ worker_thread_(worker_thread), |
+ factory_(TaskClosureFactory::ExpectedRunOrder::SEQUENCED) {} |
+ |
+ void WaitForAllTasksToRun() { factory_.WaitForAllTasksToRun(); } |
+ |
+ private: |
+ void Run() override { |
+ auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()); |
+ |
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) |
+ task_runner->PostTask(FROM_HERE, factory_.CreateTaskClosure()); |
+ } |
+ |
+ WorkerThread* const worker_thread_; |
+ TaskClosureFactory factory_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks); |
+}; |
+ |
+} // namespace |
+ |
+class TaskSchedulerWorkerThreadTest : public testing::Test { |
+ protected: |
+ TaskSchedulerWorkerThreadTest() |
+ : shared_priority_queue_(Bind(&DoNothing)), |
+ lock_(shared_priority_queue_.container_lock()), |
+ state_changed_callback_cv_(lock_.CreateConditionVariable()) {} |
+ |
+ void SetUp() override { |
+ worker_thread_ = WorkerThread::CreateWorkerThread( |
+ ThreadPriority::NORMAL, &shared_priority_queue_, |
+ Bind(&TaskSchedulerWorkerThreadTest:: |
+ PoppedTaskFromSharedSequenceCallback, |
+ Unretained(this)), |
+ Bind(&TaskSchedulerWorkerThreadTest::StateChangedCallback, |
+ Unretained(this)), |
+ &task_tracker_); |
+ ASSERT_TRUE(worker_thread_); |
+ WaitUntilIdle(); |
+ |
+ AutoSchedulerLock auto_lock(lock_); |
+ num_state_changes_ = 0; |
+ } |
+ |
+ void TearDown() override { worker_thread_->JoinForTesting(); } |
+ |
+ void WaitUntilIdle() { |
+ AutoSchedulerLock auto_lock(lock_); |
+ while (last_state_ != WorkerThread::State::IDLE) |
+ state_changed_callback_cv_->Wait(); |
+ } |
+ |
+ void WaitUntilNumStateChanges(size_t expected_num_state_changes) { |
+ AutoSchedulerLock auto_lock(lock_); |
+ while (num_state_changes_ < expected_num_state_changes) |
+ state_changed_callback_cv_->Wait(); |
+ } |
+ |
+ // Returns the number of state changes that occurred after SetUp() completed |
+ // its execution. |
+ size_t num_state_changes() const { |
+ AutoSchedulerLock auto_lock(lock_); |
+ return num_state_changes_; |
+ } |
+ |
+ size_t num_popped_task_from_shared_sequence() const { |
+ AutoSchedulerLock auto_lock(lock_); |
+ return num_popped_task_from_shared_sequence_; |
+ } |
+ |
+ PriorityQueue shared_priority_queue_; |
+ TaskTracker task_tracker_; |
+ scoped_ptr<WorkerThread> worker_thread_; |
+ |
+ private: |
+ void PoppedTaskFromSharedSequenceCallback(const WorkerThread* worker_thread, |
+ scoped_refptr<Sequence> sequence) { |
+ { |
+ AutoSchedulerLock auto_lock(lock_); |
+ ++num_popped_task_from_shared_sequence_; |
+ } |
+ |
+ // Reinsert sequence in |shared_priority_queue_|. |
+ const SequenceSortKey sort_key = sequence->GetSortKey(); |
+ shared_priority_queue_.BeginTransaction()->Push(make_scoped_ptr( |
+ new PriorityQueue::SequenceAndSortKey(std::move(sequence), sort_key))); |
+ } |
+ |
+ void StateChangedCallback(WorkerThread* worker_thread, |
+ WorkerThread::State state) { |
+ AutoSchedulerLock auto_lock(lock_); |
+ EXPECT_EQ(worker_thread_.get(), worker_thread); |
+ EXPECT_NE(last_state_, state); |
+ last_state_ = state; |
+ ++num_state_changes_; |
+ state_changed_callback_cv_->Signal(); |
+ } |
+ |
+ // Synchronizes access to all members below. |
+ mutable SchedulerLock lock_; |
+ |
+ // Condition variable signaled when StateChangedCallback() is invoked. |
+ scoped_ptr<ConditionVariable> state_changed_callback_cv_; |
+ |
+ // Last state reported to StateChangedCallback(). |
+ WorkerThread::State last_state_ = WorkerThread::State::BUSY; |
+ |
+ // Number of times that StateChangedCallback() has been invoked. |
+ size_t num_state_changes_ = 0; |
+ |
+ // Number of times that PoppedTaskFromSharedSequenceCallback() has been |
+ // invoked. |
+ size_t num_popped_task_from_shared_sequence_ = 0; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerThreadTest); |
+}; |
+ |
+// Verify that each call to WakeUp() on an IDLE WorkerThread causes a state |
+// change to BUSY followed by a state change to IDLE. |
+TEST_F(TaskSchedulerWorkerThreadTest, WakeUp) { |
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
+ worker_thread_->WakeUp(); |
+ |
+ // StateChangedCallback() verifies that state alternates between BUSY and |
+ // IDLE. |
+ |
+ WaitUntilNumStateChanges(2 * (i + 1)); |
+ EXPECT_EQ(2 * (i + 1), num_state_changes()); |
+ } |
+ |
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
+} |
+ |
+// Verify that |kNumTasksPerTest| tasks run successfully when they are posted |
+// through a single-threaded task runner. Don't wait between posts. |
+TEST_F(TaskSchedulerWorkerThreadTest, |
+ PostSingleThreadedTasksNoWaitBetweenPosts) { |
+ TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
+ auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()); |
+ |
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) |
+ task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure()); |
+ |
+ factory.WaitForAllTasksToRun(); |
+ WaitUntilIdle(); |
+ EXPECT_GE(num_state_changes(), 2U); |
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
+} |
+ |
+// Verify that |kNumTasksPerTest| tasks run successfully when they are posted |
+// through a single-threaded task runner. Wait until the previous task has |
+// completed its execution before posting a new task. |
+TEST_F(TaskSchedulerWorkerThreadTest, PostSingleThreadedTasksWaitBetweenPosts) { |
+ TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
+ auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()); |
+ |
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
+ task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure()); |
+ factory.WaitForAllTasksToRun(); |
+ WaitUntilIdle(); |
+ EXPECT_EQ(2 * (i + 1), num_state_changes()); |
+ } |
+ |
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
+} |
+ |
+// Verify that 2 * |kNumTasksPerTest| tasks run successfully when they are |
+// posted through 2 single-threaded task runners. Don't wait between posts. |
+TEST_F(TaskSchedulerWorkerThreadTest, PostTasksTwoSingleThreadedTaskRunners) { |
+ TaskClosureFactory factory_a(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
+ TaskClosureFactory factory_b(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
+ auto task_runner_a = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()); |
+ auto task_runner_b = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()); |
+ |
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
+ task_runner_a->PostTask(FROM_HERE, factory_a.CreateTaskClosure()); |
+ task_runner_b->PostTask(FROM_HERE, factory_b.CreateTaskClosure()); |
+ } |
+ |
+ factory_a.WaitForAllTasksToRun(); |
+ factory_b.WaitForAllTasksToRun(); |
+ WaitUntilIdle(); |
+ EXPECT_GE(num_state_changes(), 2U); |
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
+} |
+ |
+// Verify that |kNumTasksPerTest| tasks run successfully when they are added to |
+// the shared priority queue of a WorkerThread in different Sequences. The test |
+// wakes up the WorkerThread once all tasks have been added to the shared |
+// priority queue. This is necessary because shared tasks don't automatically |
+// wake up the WorkerThread. |
+TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksNoWaitBetweenPosts) { |
+ TaskClosureFactory factory( |
+ TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION); |
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
+ PostTaskHelper(make_scoped_ptr(new Task( |
+ FROM_HERE, factory.CreateTaskClosure(), TaskTraits())), |
+ make_scoped_refptr(new Sequence), &shared_priority_queue_, |
+ &task_tracker_); |
+ } |
+ |
+ worker_thread_->WakeUp(); |
+ factory.WaitForAllTasksToRun(); |
+ WaitUntilIdle(); |
+ EXPECT_EQ(2U, num_state_changes()); |
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
+} |
+ |
+// Verify that |kNumTasksPerTest| tasks run successfully when they are added to |
+// the shared priority queue of a WorkerThread in different Sequences. The test |
+// wakes up the WorkerThread and waits until the task completes its execution |
+// each time it adds a task to the priority queue. The wake-ups are necessary |
+// because shared tasks don't automatically wake up the WorkerThread. |
+TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksWaitBetweenPosts) { |
+ TaskClosureFactory factory( |
+ TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION); |
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
+ PostTaskHelper(make_scoped_ptr(new Task( |
+ FROM_HERE, factory.CreateTaskClosure(), TaskTraits())), |
+ make_scoped_refptr(new Sequence), &shared_priority_queue_, |
+ &task_tracker_); |
+ worker_thread_->WakeUp(); |
+ |
+ factory.WaitForAllTasksToRun(); |
+ WaitUntilIdle(); |
+ EXPECT_EQ(2 * (i + 1), num_state_changes()); |
+ } |
+ |
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
+} |
+ |
+// Verify that |kNumTasksPerTest| tasks run successfully when they are added to |
+// the shared priority queue of a WorkerThread (all tasks in the same Sequence). |
+// The test wakes up the WorkerThread once all tasks have been added to the |
+// priority queue. This is necessary because shared tasks don't automatically |
+// wake up the WorkerThread. |
+TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksInSameSequence) { |
+ TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
+ scoped_refptr<Sequence> sequence(new Sequence); |
+ |
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
+ PostTaskHelper(make_scoped_ptr(new Task( |
+ FROM_HERE, factory.CreateTaskClosure(), TaskTraits())), |
+ sequence, &shared_priority_queue_, &task_tracker_); |
+ } |
+ |
+ worker_thread_->WakeUp(); |
+ factory.WaitForAllTasksToRun(); |
+ WaitUntilIdle(); |
+ EXPECT_EQ(2U, num_state_changes()); |
+ EXPECT_EQ(kNumTasksPerTest - 1, num_popped_task_from_shared_sequence()); |
+} |
+ |
+// Verify that 2 * |kNumTasksPerTest| tasks run successfully when they are added |
+// to the shared priority queue of a WorkerThread. Tasks are split into 2 |
+// sequences. The test wakes up the WorkerThread once all tasks have been added |
+// to the priority queue. This is necessary because shared tasks don't |
+// automatically wake up the WorkerThread. |
+TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksInTwoSequences) { |
+ TaskClosureFactory factory_a(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
+ TaskClosureFactory factory_b(TaskClosureFactory::ExpectedRunOrder::SEQUENCED); |
+ scoped_refptr<Sequence> sequence_a(new Sequence); |
+ scoped_refptr<Sequence> sequence_b(new Sequence); |
+ |
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
+ PostTaskHelper(make_scoped_ptr(new Task( |
+ FROM_HERE, factory_a.CreateTaskClosure(), TaskTraits())), |
+ sequence_a, &shared_priority_queue_, &task_tracker_); |
+ PostTaskHelper(make_scoped_ptr(new Task( |
+ FROM_HERE, factory_b.CreateTaskClosure(), TaskTraits())), |
+ sequence_b, &shared_priority_queue_, &task_tracker_); |
+ } |
+ |
+ worker_thread_->WakeUp(); |
+ factory_a.WaitForAllTasksToRun(); |
+ factory_b.WaitForAllTasksToRun(); |
+ WaitUntilIdle(); |
+ EXPECT_EQ(2U, num_state_changes()); |
+ EXPECT_EQ(2 * (kNumTasksPerTest - 1), num_popped_task_from_shared_sequence()); |
+} |
+ |
+// Verify that |kNumTasksPerTest| shared tasks and |kNumTasksPerTest| single- |
+// threaded tasks run successfully when they are posted to a WorkerThread. The |
+// test doesn't wake up the WorkerThread after posting a shared task. Wake-ups |
+// are done by the single-threaded TaskRunner when tasks are posted through it. |
+TEST_F(TaskSchedulerWorkerThreadTest, PostSharedAndSingleThreadedTasks) { |
+ TaskClosureFactory factory( |
+ TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION); |
+ |
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) { |
+ // Post a task in the shared priority queue. Don't wake up the WorkerThread. |
+ PostTaskHelper(make_scoped_ptr(new Task( |
+ FROM_HERE, factory.CreateTaskClosure(), TaskTraits())), |
+ make_scoped_refptr(new Sequence), &shared_priority_queue_, |
+ &task_tracker_); |
+ |
+ // Post a task in the single-threaded priority queue. The TaskRunner will |
+ // wake up the WorkerThread. |
+ worker_thread_->CreateTaskRunnerWithTraits(TaskTraits()) |
+ ->PostTask(FROM_HERE, factory.CreateTaskClosure()); |
+ |
+ factory.WaitForAllTasksToRun(); |
+ WaitUntilIdle(); |
+ EXPECT_EQ(2 * (i + 1), num_state_changes()); |
+ } |
+ |
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
+} |
+ |
+// Verify that 20 * |kNumTasksPerTest| single-threaded tasks posted to a single |
+// WorkerThread from 20 threads run successfully. |
+TEST_F(TaskSchedulerWorkerThreadTest, |
+ PostSingleThreadedTasksFromMultipleThreads) { |
+ static const size_t kNumThreads = 20; |
+ std::vector<scoped_ptr<ThreadPostingTasks>> threads; |
+ for (size_t i = 0; i < kNumThreads; ++i) { |
+ threads.push_back( |
+ make_scoped_ptr(new ThreadPostingTasks(worker_thread_.get()))); |
+ threads.back()->Start(); |
+ } |
+ |
+ for (const auto& thread : threads) { |
+ thread->Join(); |
+ thread->WaitForAllTasksToRun(); |
+ } |
+ |
+ WaitUntilIdle(); |
+ EXPECT_GE(num_state_changes(), 2U); |
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence()); |
+} |
+ |
+} // namespace internal |
+} // namespace base |