| Index: base/task_scheduler/worker_thread_unittest.cc
|
| diff --git a/base/task_scheduler/worker_thread_unittest.cc b/base/task_scheduler/worker_thread_unittest.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..27d0649cb526a0e5059b284eb474a10d478046e9
|
| --- /dev/null
|
| +++ b/base/task_scheduler/worker_thread_unittest.cc
|
| @@ -0,0 +1,427 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "base/task_scheduler/worker_thread.h"
|
| +
|
| +#include <utility>
|
| +#include <vector>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/bind_helpers.h"
|
| +#include "base/callback_forward.h"
|
| +#include "base/logging.h"
|
| +#include "base/macros.h"
|
| +#include "base/memory/scoped_ptr.h"
|
| +#include "base/synchronization/condition_variable.h"
|
| +#include "base/task_scheduler/priority_queue.h"
|
| +#include "base/task_scheduler/scheduler_lock.h"
|
| +#include "base/task_scheduler/task_tracker.h"
|
| +#include "base/task_scheduler/utils.h"
|
| +#include "base/threading/simple_thread.h"
|
| +#include "testing/gtest/include/gtest/gtest.h"
|
| +
|
| +namespace base {
|
| +namespace internal {
|
| +
|
| +namespace {
|
| +
|
| +const size_t kNumTasksPerTest = 500;
|
| +
|
| +class TaskClosureFactory {
|
| + public:
|
| + enum class ExpectedRunOrder {
|
| + SEQUENCED,
|
| + NO_EXPECTATION,
|
| + };
|
| +
|
| + explicit TaskClosureFactory(
|
| + TaskClosureFactory::ExpectedRunOrder expected_run_order)
|
| + : expected_run_order_(expected_run_order),
|
| + run_task_cv_(lock_.CreateConditionVariable()) {}
|
| +
|
| + ~TaskClosureFactory() {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + EXPECT_EQ(num_created_tasks_, num_run_tasks_);
|
| + }
|
| +
|
| + Closure CreateTaskClosure() {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + return Bind(&TaskClosureFactory::RunTask, Unretained(this),
|
| + num_created_tasks_++);
|
| + }
|
| +
|
| + void WaitForAllTasksToRun() {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + while (num_run_tasks_ < num_created_tasks_)
|
| + run_task_cv_->Wait();
|
| + }
|
| +
|
| + private:
|
| + void RunTask(size_t task_index) {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| +
|
| + if (expected_run_order_ == ExpectedRunOrder::SEQUENCED &&
|
| + task_index != num_run_tasks_) {
|
| + ADD_FAILURE() << "Unexpected task execution order.";
|
| + }
|
| +
|
| + ++num_run_tasks_;
|
| + run_task_cv_->Signal();
|
| + }
|
| +
|
| + // Synchronizes access to all members.
|
| + SchedulerLock lock_;
|
| +
|
| + // Expectation for the order in which tasks run.
|
| + const ExpectedRunOrder expected_run_order_;
|
| +
|
| + // Signaled when a task runs.
|
| + scoped_ptr<ConditionVariable> run_task_cv_;
|
| +
|
| + // Number of times that CreateTaskClosure() has been called.
|
| + size_t num_created_tasks_ = 0;
|
| +
|
| + // Number of times that RunTask() has been called.
|
| + size_t num_run_tasks_ = 0;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(TaskClosureFactory);
|
| +};
|
| +
|
| +class ThreadPostingTasks : public SimpleThread {
|
| + public:
|
| + explicit ThreadPostingTasks(WorkerThread* worker_thread)
|
| + : SimpleThread("ThreadPostingTasks"),
|
| + worker_thread_(worker_thread),
|
| + factory_(TaskClosureFactory::ExpectedRunOrder::SEQUENCED) {}
|
| +
|
| + void WaitForAllTasksToRun() { factory_.WaitForAllTasksToRun(); }
|
| +
|
| + private:
|
| + void Run() override {
|
| + auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
|
| +
|
| + for (size_t i = 0; i < kNumTasksPerTest; ++i)
|
| + task_runner->PostTask(FROM_HERE, factory_.CreateTaskClosure());
|
| + }
|
| +
|
| + WorkerThread* const worker_thread_;
|
| + TaskClosureFactory factory_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks);
|
| +};
|
| +
|
| +} // namespace
|
| +
|
| +class TaskSchedulerWorkerThreadTest : public testing::Test {
|
| + protected:
|
| + TaskSchedulerWorkerThreadTest()
|
| + : shared_priority_queue_(Bind(&DoNothing)),
|
| + lock_(shared_priority_queue_.container_lock()),
|
| + state_changed_callback_cv_(lock_.CreateConditionVariable()) {}
|
| +
|
| + void SetUp() override {
|
| + worker_thread_ = WorkerThread::CreateWorkerThread(
|
| + ThreadPriority::NORMAL, &shared_priority_queue_,
|
| + Bind(&TaskSchedulerWorkerThreadTest::
|
| + PoppedTaskFromSharedSequenceCallback,
|
| + Unretained(this)),
|
| + Bind(&TaskSchedulerWorkerThreadTest::StateChangedCallback,
|
| + Unretained(this)),
|
| + &task_tracker_);
|
| + ASSERT_TRUE(worker_thread_);
|
| + WaitUntilIdle();
|
| +
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + num_state_changes_ = 0;
|
| + }
|
| +
|
| + void TearDown() override { worker_thread_->JoinForTesting(); }
|
| +
|
| + void WaitUntilIdle() {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + while (last_state_ != WorkerThread::State::IDLE)
|
| + state_changed_callback_cv_->Wait();
|
| + }
|
| +
|
| + void WaitUntilNumStateChanges(size_t expected_num_state_changes) {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + while (num_state_changes_ < expected_num_state_changes)
|
| + state_changed_callback_cv_->Wait();
|
| + }
|
| +
|
| + // Returns the number of state changes that occurred after SetUp() completed
|
| + // its execution.
|
| + size_t num_state_changes() const {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + return num_state_changes_;
|
| + }
|
| +
|
| + size_t num_popped_task_from_shared_sequence() const {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + return num_popped_task_from_shared_sequence_;
|
| + }
|
| +
|
| + PriorityQueue shared_priority_queue_;
|
| + TaskTracker task_tracker_;
|
| + scoped_ptr<WorkerThread> worker_thread_;
|
| +
|
| + private:
|
| + void PoppedTaskFromSharedSequenceCallback(const WorkerThread* worker_thread,
|
| + scoped_refptr<Sequence> sequence) {
|
| + {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + ++num_popped_task_from_shared_sequence_;
|
| + }
|
| +
|
| + // Reinsert sequence in |shared_priority_queue_|.
|
| + const SequenceSortKey sort_key = sequence->GetSortKey();
|
| + shared_priority_queue_.BeginTransaction()->Push(make_scoped_ptr(
|
| + new PriorityQueue::SequenceAndSortKey(std::move(sequence), sort_key)));
|
| + }
|
| +
|
| + void StateChangedCallback(WorkerThread* worker_thread,
|
| + WorkerThread::State state) {
|
| + AutoSchedulerLock auto_lock(lock_);
|
| + EXPECT_EQ(worker_thread_.get(), worker_thread);
|
| + EXPECT_NE(last_state_, state);
|
| + last_state_ = state;
|
| + ++num_state_changes_;
|
| + state_changed_callback_cv_->Signal();
|
| + }
|
| +
|
| + // Synchronizes access to all members below.
|
| + mutable SchedulerLock lock_;
|
| +
|
| + // Condition variable signaled when StateChangedCallback() is invoked.
|
| + scoped_ptr<ConditionVariable> state_changed_callback_cv_;
|
| +
|
| + // Last state reported to StateChangedCallback().
|
| + WorkerThread::State last_state_ = WorkerThread::State::BUSY;
|
| +
|
| + // Number of times that StateChangedCallback() has been invoked.
|
| + size_t num_state_changes_ = 0;
|
| +
|
| + // Number of times that PoppedTaskFromSharedSequenceCallback() has been
|
| + // invoked.
|
| + size_t num_popped_task_from_shared_sequence_ = 0;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerThreadTest);
|
| +};
|
| +
|
| +// Verify that each call to WakeUp() on an IDLE WorkerThread causes a state
|
| +// change to BUSY followed by a state change to IDLE.
|
| +TEST_F(TaskSchedulerWorkerThreadTest, WakeUp) {
|
| + for (size_t i = 0; i < kNumTasksPerTest; ++i) {
|
| + worker_thread_->WakeUp();
|
| +
|
| + // StateChangedCallback() verifies that state alternates between BUSY and
|
| + // IDLE.
|
| +
|
| + WaitUntilNumStateChanges(2 * (i + 1));
|
| + EXPECT_EQ(2 * (i + 1), num_state_changes());
|
| + }
|
| +
|
| + EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
|
| +}
|
| +
|
| +// Verify that |kNumTasksPerTest| tasks run successfully when they are posted
|
| +// through a single-threaded task runner. Don't wait between posts.
|
| +TEST_F(TaskSchedulerWorkerThreadTest,
|
| + PostSingleThreadedTasksNoWaitBetweenPosts) {
|
| + TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
|
| + auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
|
| +
|
| + for (size_t i = 0; i < kNumTasksPerTest; ++i)
|
| + task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure());
|
| +
|
| + factory.WaitForAllTasksToRun();
|
| + WaitUntilIdle();
|
| + EXPECT_GE(num_state_changes(), 2U);
|
| + EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
|
| +}
|
| +
|
| +// Verify that |kNumTasksPerTest| tasks run successfully when they are posted
|
| +// through a single-threaded task runner. Wait until the previous task has
|
| +// completed its execution before posting a new task.
|
| +TEST_F(TaskSchedulerWorkerThreadTest, PostSingleThreadedTasksWaitBetweenPosts) {
|
| + TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
|
| + auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
|
| +
|
| + for (size_t i = 0; i < kNumTasksPerTest; ++i) {
|
| + task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure());
|
| + factory.WaitForAllTasksToRun();
|
| + WaitUntilIdle();
|
| + EXPECT_EQ(2 * (i + 1), num_state_changes());
|
| + }
|
| +
|
| + EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
|
| +}
|
| +
|
| +// Verify that 2 * |kNumTasksPerTest| tasks run successfully when they are
|
| +// posted through 2 single-threaded task runners. Don't wait between posts.
|
| +TEST_F(TaskSchedulerWorkerThreadTest, PostTasksTwoSingleThreadedTaskRunners) {
|
| + TaskClosureFactory factory_a(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
|
| + TaskClosureFactory factory_b(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
|
| + auto task_runner_a = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
|
| + auto task_runner_b = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
|
| +
|
| + for (size_t i = 0; i < kNumTasksPerTest; ++i) {
|
| + task_runner_a->PostTask(FROM_HERE, factory_a.CreateTaskClosure());
|
| + task_runner_b->PostTask(FROM_HERE, factory_b.CreateTaskClosure());
|
| + }
|
| +
|
| + factory_a.WaitForAllTasksToRun();
|
| + factory_b.WaitForAllTasksToRun();
|
| + WaitUntilIdle();
|
| + EXPECT_GE(num_state_changes(), 2U);
|
| + EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
|
| +}
|
| +
|
| +// Verify that |kNumTasksPerTest| tasks run successfully when they are added to
|
| +// the shared priority queue of a WorkerThread in different Sequences. The test
|
| +// wakes up the WorkerThread once all tasks have been added to the shared
|
| +// priority queue. This is necessary because shared tasks don't automatically
|
| +// wake up the WorkerThread.
|
| +TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksNoWaitBetweenPosts) {
|
| + TaskClosureFactory factory(
|
| + TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION);
|
| + for (size_t i = 0; i < kNumTasksPerTest; ++i) {
|
| + PostTaskHelper(make_scoped_ptr(new Task(
|
| + FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
|
| + make_scoped_refptr(new Sequence), &shared_priority_queue_,
|
| + &task_tracker_);
|
| + }
|
| +
|
| + worker_thread_->WakeUp();
|
| + factory.WaitForAllTasksToRun();
|
| + WaitUntilIdle();
|
| + EXPECT_EQ(2U, num_state_changes());
|
| + EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
|
| +}
|
| +
|
| +// Verify that |kNumTasksPerTest| tasks run successfully when they are added to
|
| +// the shared priority queue of a WorkerThread in different Sequences. The test
|
| +// wakes up the WorkerThread and waits until the task completes its execution
|
| +// each time it adds a task to the priority queue. The wake-ups are necessary
|
| +// because shared tasks don't automatically wake up the WorkerThread.
|
| +TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksWaitBetweenPosts) {
|
| + TaskClosureFactory factory(
|
| + TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION);
|
| + for (size_t i = 0; i < kNumTasksPerTest; ++i) {
|
| + PostTaskHelper(make_scoped_ptr(new Task(
|
| + FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
|
| + make_scoped_refptr(new Sequence), &shared_priority_queue_,
|
| + &task_tracker_);
|
| + worker_thread_->WakeUp();
|
| +
|
| + factory.WaitForAllTasksToRun();
|
| + WaitUntilIdle();
|
| + EXPECT_EQ(2 * (i + 1), num_state_changes());
|
| + }
|
| +
|
| + EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
|
| +}
|
| +
|
| +// Verify that |kNumTasksPerTest| tasks run successfully when they are added to
|
| +// the shared priority queue of a WorkerThread (all tasks in the same Sequence).
|
| +// The test wakes up the WorkerThread once all tasks have been added to the
|
| +// priority queue. This is necessary because shared tasks don't automatically
|
| +// wake up the WorkerThread.
|
| +TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksInSameSequence) {
|
| + TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
|
| + scoped_refptr<Sequence> sequence(new Sequence);
|
| +
|
| + for (size_t i = 0; i < kNumTasksPerTest; ++i) {
|
| + PostTaskHelper(make_scoped_ptr(new Task(
|
| + FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
|
| + sequence, &shared_priority_queue_, &task_tracker_);
|
| + }
|
| +
|
| + worker_thread_->WakeUp();
|
| + factory.WaitForAllTasksToRun();
|
| + WaitUntilIdle();
|
| + EXPECT_EQ(2U, num_state_changes());
|
| + EXPECT_EQ(kNumTasksPerTest - 1, num_popped_task_from_shared_sequence());
|
| +}
|
| +
|
| +// Verify that 2 * |kNumTasksPerTest| tasks run successfully when they are added
|
| +// to the shared priority queue of a WorkerThread. Tasks are split into 2
|
| +// sequences. The test wakes up the WorkerThread once all tasks have been added
|
| +// to the priority queue. This is necessary because shared tasks don't
|
| +// automatically wake up the WorkerThread.
|
| +TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksInTwoSequences) {
|
| + TaskClosureFactory factory_a(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
|
| + TaskClosureFactory factory_b(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
|
| + scoped_refptr<Sequence> sequence_a(new Sequence);
|
| + scoped_refptr<Sequence> sequence_b(new Sequence);
|
| +
|
| + for (size_t i = 0; i < kNumTasksPerTest; ++i) {
|
| + PostTaskHelper(make_scoped_ptr(new Task(
|
| + FROM_HERE, factory_a.CreateTaskClosure(), TaskTraits())),
|
| + sequence_a, &shared_priority_queue_, &task_tracker_);
|
| + PostTaskHelper(make_scoped_ptr(new Task(
|
| + FROM_HERE, factory_b.CreateTaskClosure(), TaskTraits())),
|
| + sequence_b, &shared_priority_queue_, &task_tracker_);
|
| + }
|
| +
|
| + worker_thread_->WakeUp();
|
| + factory_a.WaitForAllTasksToRun();
|
| + factory_b.WaitForAllTasksToRun();
|
| + WaitUntilIdle();
|
| + EXPECT_EQ(2U, num_state_changes());
|
| + EXPECT_EQ(2 * (kNumTasksPerTest - 1), num_popped_task_from_shared_sequence());
|
| +}
|
| +
|
| +// Verify that |kNumTasksPerTest| shared tasks and |kNumTasksPerTest| single-
|
| +// threaded tasks run successfully when they are posted to a WorkerThread. The
|
| +// test doesn't wake up the WorkerThread after posting a shared task. Wake-ups
|
| +// are done by the single-threaded TaskRunner when tasks are posted through it.
|
| +TEST_F(TaskSchedulerWorkerThreadTest, PostSharedAndSingleThreadedTasks) {
|
| + TaskClosureFactory factory(
|
| + TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION);
|
| +
|
| + for (size_t i = 0; i < kNumTasksPerTest; ++i) {
|
| + // Post a task in the shared priority queue. Don't wake up the WorkerThread.
|
| + PostTaskHelper(make_scoped_ptr(new Task(
|
| + FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
|
| + make_scoped_refptr(new Sequence), &shared_priority_queue_,
|
| + &task_tracker_);
|
| +
|
| + // Post a task in the single-threaded priority queue. The TaskRunner will
|
| + // wake up the WorkerThread.
|
| + worker_thread_->CreateTaskRunnerWithTraits(TaskTraits())
|
| + ->PostTask(FROM_HERE, factory.CreateTaskClosure());
|
| +
|
| + factory.WaitForAllTasksToRun();
|
| + WaitUntilIdle();
|
| + EXPECT_EQ(2 * (i + 1), num_state_changes());
|
| + }
|
| +
|
| + EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
|
| +}
|
| +
|
| +// Verify that 20 * |kNumTasksPerTest| single-threaded tasks posted to a single
|
| +// WorkerThread from 20 threads run successfully.
|
| +TEST_F(TaskSchedulerWorkerThreadTest,
|
| + PostSingleThreadedTasksFromMultipleThreads) {
|
| + static const size_t kNumThreads = 20;
|
| + std::vector<scoped_ptr<ThreadPostingTasks>> threads;
|
| + for (size_t i = 0; i < kNumThreads; ++i) {
|
| + threads.push_back(
|
| + make_scoped_ptr(new ThreadPostingTasks(worker_thread_.get())));
|
| + threads.back()->Start();
|
| + }
|
| +
|
| + for (const auto& thread : threads) {
|
| + thread->Join();
|
| + thread->WaitForAllTasksToRun();
|
| + }
|
| +
|
| + WaitUntilIdle();
|
| + EXPECT_GE(num_state_changes(), 2U);
|
| + EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
|
| +}
|
| +
|
| +} // namespace internal
|
| +} // namespace base
|
|
|