Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5409)

Unified Diff: base/task_scheduler/worker_thread_unittest.cc

Issue 1704113002: TaskScheduler [6] SchedulerWorkerThread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@s_4_shutdown
Patch Set: self review Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: base/task_scheduler/worker_thread_unittest.cc
diff --git a/base/task_scheduler/worker_thread_unittest.cc b/base/task_scheduler/worker_thread_unittest.cc
new file mode 100644
index 0000000000000000000000000000000000000000..27d0649cb526a0e5059b284eb474a10d478046e9
--- /dev/null
+++ b/base/task_scheduler/worker_thread_unittest.cc
@@ -0,0 +1,427 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "base/task_scheduler/worker_thread.h"
+
+#include <utility>
+#include <vector>
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/callback_forward.h"
+#include "base/logging.h"
+#include "base/macros.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/task_scheduler/priority_queue.h"
+#include "base/task_scheduler/scheduler_lock.h"
+#include "base/task_scheduler/task_tracker.h"
+#include "base/task_scheduler/utils.h"
+#include "base/threading/simple_thread.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+namespace internal {
+
+namespace {
+
+const size_t kNumTasksPerTest = 500;
+
+class TaskClosureFactory {
+ public:
+ enum class ExpectedRunOrder {
+ SEQUENCED,
+ NO_EXPECTATION,
+ };
+
+ explicit TaskClosureFactory(
+ TaskClosureFactory::ExpectedRunOrder expected_run_order)
+ : expected_run_order_(expected_run_order),
+ run_task_cv_(lock_.CreateConditionVariable()) {}
+
+ ~TaskClosureFactory() {
+ AutoSchedulerLock auto_lock(lock_);
+ EXPECT_EQ(num_created_tasks_, num_run_tasks_);
+ }
+
+ Closure CreateTaskClosure() {
+ AutoSchedulerLock auto_lock(lock_);
+ return Bind(&TaskClosureFactory::RunTask, Unretained(this),
+ num_created_tasks_++);
+ }
+
+ void WaitForAllTasksToRun() {
+ AutoSchedulerLock auto_lock(lock_);
+ while (num_run_tasks_ < num_created_tasks_)
+ run_task_cv_->Wait();
+ }
+
+ private:
+ void RunTask(size_t task_index) {
+ AutoSchedulerLock auto_lock(lock_);
+
+ if (expected_run_order_ == ExpectedRunOrder::SEQUENCED &&
+ task_index != num_run_tasks_) {
+ ADD_FAILURE() << "Unexpected task execution order.";
+ }
+
+ ++num_run_tasks_;
+ run_task_cv_->Signal();
+ }
+
+ // Synchronizes access to all members.
+ SchedulerLock lock_;
+
+ // Expectation for the order in which tasks run.
+ const ExpectedRunOrder expected_run_order_;
+
+ // Signaled when a task runs.
+ scoped_ptr<ConditionVariable> run_task_cv_;
+
+ // Number of times that CreateTaskClosure() has been called.
+ size_t num_created_tasks_ = 0;
+
+ // Number of times that RunTask() has been called.
+ size_t num_run_tasks_ = 0;
+
+ DISALLOW_COPY_AND_ASSIGN(TaskClosureFactory);
+};
+
+class ThreadPostingTasks : public SimpleThread {
+ public:
+ explicit ThreadPostingTasks(WorkerThread* worker_thread)
+ : SimpleThread("ThreadPostingTasks"),
+ worker_thread_(worker_thread),
+ factory_(TaskClosureFactory::ExpectedRunOrder::SEQUENCED) {}
+
+ void WaitForAllTasksToRun() { factory_.WaitForAllTasksToRun(); }
+
+ private:
+ void Run() override {
+ auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
+
+ for (size_t i = 0; i < kNumTasksPerTest; ++i)
+ task_runner->PostTask(FROM_HERE, factory_.CreateTaskClosure());
+ }
+
+ WorkerThread* const worker_thread_;
+ TaskClosureFactory factory_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks);
+};
+
+} // namespace
+
+class TaskSchedulerWorkerThreadTest : public testing::Test {
+ protected:
+ TaskSchedulerWorkerThreadTest()
+ : shared_priority_queue_(Bind(&DoNothing)),
+ lock_(shared_priority_queue_.container_lock()),
+ state_changed_callback_cv_(lock_.CreateConditionVariable()) {}
+
+ void SetUp() override {
+ worker_thread_ = WorkerThread::CreateWorkerThread(
+ ThreadPriority::NORMAL, &shared_priority_queue_,
+ Bind(&TaskSchedulerWorkerThreadTest::
+ PoppedTaskFromSharedSequenceCallback,
+ Unretained(this)),
+ Bind(&TaskSchedulerWorkerThreadTest::StateChangedCallback,
+ Unretained(this)),
+ &task_tracker_);
+ ASSERT_TRUE(worker_thread_);
+ WaitUntilIdle();
+
+ AutoSchedulerLock auto_lock(lock_);
+ num_state_changes_ = 0;
+ }
+
+ void TearDown() override { worker_thread_->JoinForTesting(); }
+
+ void WaitUntilIdle() {
+ AutoSchedulerLock auto_lock(lock_);
+ while (last_state_ != WorkerThread::State::IDLE)
+ state_changed_callback_cv_->Wait();
+ }
+
+ void WaitUntilNumStateChanges(size_t expected_num_state_changes) {
+ AutoSchedulerLock auto_lock(lock_);
+ while (num_state_changes_ < expected_num_state_changes)
+ state_changed_callback_cv_->Wait();
+ }
+
+ // Returns the number of state changes that occurred after SetUp() completed
+ // its execution.
+ size_t num_state_changes() const {
+ AutoSchedulerLock auto_lock(lock_);
+ return num_state_changes_;
+ }
+
+ size_t num_popped_task_from_shared_sequence() const {
+ AutoSchedulerLock auto_lock(lock_);
+ return num_popped_task_from_shared_sequence_;
+ }
+
+ PriorityQueue shared_priority_queue_;
+ TaskTracker task_tracker_;
+ scoped_ptr<WorkerThread> worker_thread_;
+
+ private:
+ void PoppedTaskFromSharedSequenceCallback(const WorkerThread* worker_thread,
+ scoped_refptr<Sequence> sequence) {
+ {
+ AutoSchedulerLock auto_lock(lock_);
+ ++num_popped_task_from_shared_sequence_;
+ }
+
+ // Reinsert sequence in |shared_priority_queue_|.
+ const SequenceSortKey sort_key = sequence->GetSortKey();
+ shared_priority_queue_.BeginTransaction()->Push(make_scoped_ptr(
+ new PriorityQueue::SequenceAndSortKey(std::move(sequence), sort_key)));
+ }
+
+ void StateChangedCallback(WorkerThread* worker_thread,
+ WorkerThread::State state) {
+ AutoSchedulerLock auto_lock(lock_);
+ EXPECT_EQ(worker_thread_.get(), worker_thread);
+ EXPECT_NE(last_state_, state);
+ last_state_ = state;
+ ++num_state_changes_;
+ state_changed_callback_cv_->Signal();
+ }
+
+ // Synchronizes access to all members below.
+ mutable SchedulerLock lock_;
+
+ // Condition variable signaled when StateChangedCallback() is invoked.
+ scoped_ptr<ConditionVariable> state_changed_callback_cv_;
+
+ // Last state reported to StateChangedCallback().
+ WorkerThread::State last_state_ = WorkerThread::State::BUSY;
+
+ // Number of times that StateChangedCallback() has been invoked.
+ size_t num_state_changes_ = 0;
+
+ // Number of times that PoppedTaskFromSharedSequenceCallback() has been
+ // invoked.
+ size_t num_popped_task_from_shared_sequence_ = 0;
+
+ DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerThreadTest);
+};
+
+// Verify that each call to WakeUp() on an IDLE WorkerThread causes a state
+// change to BUSY followed by a state change to IDLE.
+TEST_F(TaskSchedulerWorkerThreadTest, WakeUp) {
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) {
+ worker_thread_->WakeUp();
+
+ // StateChangedCallback() verifies that state alternates between BUSY and
+ // IDLE.
+
+ WaitUntilNumStateChanges(2 * (i + 1));
+ EXPECT_EQ(2 * (i + 1), num_state_changes());
+ }
+
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
+}
+
+// Verify that |kNumTasksPerTest| tasks run successfully when they are posted
+// through a single-threaded task runner. Don't wait between posts.
+TEST_F(TaskSchedulerWorkerThreadTest,
+ PostSingleThreadedTasksNoWaitBetweenPosts) {
+ TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
+ auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
+
+ for (size_t i = 0; i < kNumTasksPerTest; ++i)
+ task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure());
+
+ factory.WaitForAllTasksToRun();
+ WaitUntilIdle();
+ EXPECT_GE(num_state_changes(), 2U);
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
+}
+
+// Verify that |kNumTasksPerTest| tasks run successfully when they are posted
+// through a single-threaded task runner. Wait until the previous task has
+// completed its execution before posting a new task.
+TEST_F(TaskSchedulerWorkerThreadTest, PostSingleThreadedTasksWaitBetweenPosts) {
+ TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
+ auto task_runner = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
+
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) {
+ task_runner->PostTask(FROM_HERE, factory.CreateTaskClosure());
+ factory.WaitForAllTasksToRun();
+ WaitUntilIdle();
+ EXPECT_EQ(2 * (i + 1), num_state_changes());
+ }
+
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
+}
+
+// Verify that 2 * |kNumTasksPerTest| tasks run successfully when they are
+// posted through 2 single-threaded task runners. Don't wait between posts.
+TEST_F(TaskSchedulerWorkerThreadTest, PostTasksTwoSingleThreadedTaskRunners) {
+ TaskClosureFactory factory_a(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
+ TaskClosureFactory factory_b(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
+ auto task_runner_a = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
+ auto task_runner_b = worker_thread_->CreateTaskRunnerWithTraits(TaskTraits());
+
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) {
+ task_runner_a->PostTask(FROM_HERE, factory_a.CreateTaskClosure());
+ task_runner_b->PostTask(FROM_HERE, factory_b.CreateTaskClosure());
+ }
+
+ factory_a.WaitForAllTasksToRun();
+ factory_b.WaitForAllTasksToRun();
+ WaitUntilIdle();
+ EXPECT_GE(num_state_changes(), 2U);
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
+}
+
+// Verify that |kNumTasksPerTest| tasks run successfully when they are added to
+// the shared priority queue of a WorkerThread in different Sequences. The test
+// wakes up the WorkerThread once all tasks have been added to the shared
+// priority queue. This is necessary because shared tasks don't automatically
+// wake up the WorkerThread.
+TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksNoWaitBetweenPosts) {
+ TaskClosureFactory factory(
+ TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION);
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) {
+ PostTaskHelper(make_scoped_ptr(new Task(
+ FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
+ make_scoped_refptr(new Sequence), &shared_priority_queue_,
+ &task_tracker_);
+ }
+
+ worker_thread_->WakeUp();
+ factory.WaitForAllTasksToRun();
+ WaitUntilIdle();
+ EXPECT_EQ(2U, num_state_changes());
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
+}
+
+// Verify that |kNumTasksPerTest| tasks run successfully when they are added to
+// the shared priority queue of a WorkerThread in different Sequences. The test
+// wakes up the WorkerThread and waits until the task completes its execution
+// each time it adds a task to the priority queue. The wake-ups are necessary
+// because shared tasks don't automatically wake up the WorkerThread.
+TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksWaitBetweenPosts) {
+ TaskClosureFactory factory(
+ TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION);
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) {
+ PostTaskHelper(make_scoped_ptr(new Task(
+ FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
+ make_scoped_refptr(new Sequence), &shared_priority_queue_,
+ &task_tracker_);
+ worker_thread_->WakeUp();
+
+ factory.WaitForAllTasksToRun();
+ WaitUntilIdle();
+ EXPECT_EQ(2 * (i + 1), num_state_changes());
+ }
+
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
+}
+
+// Verify that |kNumTasksPerTest| tasks run successfully when they are added to
+// the shared priority queue of a WorkerThread (all tasks in the same Sequence).
+// The test wakes up the WorkerThread once all tasks have been added to the
+// priority queue. This is necessary because shared tasks don't automatically
+// wake up the WorkerThread.
+TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksInSameSequence) {
+ TaskClosureFactory factory(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
+ scoped_refptr<Sequence> sequence(new Sequence);
+
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) {
+ PostTaskHelper(make_scoped_ptr(new Task(
+ FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
+ sequence, &shared_priority_queue_, &task_tracker_);
+ }
+
+ worker_thread_->WakeUp();
+ factory.WaitForAllTasksToRun();
+ WaitUntilIdle();
+ EXPECT_EQ(2U, num_state_changes());
+ EXPECT_EQ(kNumTasksPerTest - 1, num_popped_task_from_shared_sequence());
+}
+
+// Verify that 2 * |kNumTasksPerTest| tasks run successfully when they are added
+// to the shared priority queue of a WorkerThread. Tasks are split into 2
+// sequences. The test wakes up the WorkerThread once all tasks have been added
+// to the priority queue. This is necessary because shared tasks don't
+// automatically wake up the WorkerThread.
+TEST_F(TaskSchedulerWorkerThreadTest, PostSharedTasksInTwoSequences) {
+ TaskClosureFactory factory_a(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
+ TaskClosureFactory factory_b(TaskClosureFactory::ExpectedRunOrder::SEQUENCED);
+ scoped_refptr<Sequence> sequence_a(new Sequence);
+ scoped_refptr<Sequence> sequence_b(new Sequence);
+
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) {
+ PostTaskHelper(make_scoped_ptr(new Task(
+ FROM_HERE, factory_a.CreateTaskClosure(), TaskTraits())),
+ sequence_a, &shared_priority_queue_, &task_tracker_);
+ PostTaskHelper(make_scoped_ptr(new Task(
+ FROM_HERE, factory_b.CreateTaskClosure(), TaskTraits())),
+ sequence_b, &shared_priority_queue_, &task_tracker_);
+ }
+
+ worker_thread_->WakeUp();
+ factory_a.WaitForAllTasksToRun();
+ factory_b.WaitForAllTasksToRun();
+ WaitUntilIdle();
+ EXPECT_EQ(2U, num_state_changes());
+ EXPECT_EQ(2 * (kNumTasksPerTest - 1), num_popped_task_from_shared_sequence());
+}
+
+// Verify that |kNumTasksPerTest| shared tasks and |kNumTasksPerTest| single-
+// threaded tasks run successfully when they are posted to a WorkerThread. The
+// test doesn't wake up the WorkerThread after posting a shared task. Wake-ups
+// are done by the single-threaded TaskRunner when tasks are posted through it.
+TEST_F(TaskSchedulerWorkerThreadTest, PostSharedAndSingleThreadedTasks) {
+ TaskClosureFactory factory(
+ TaskClosureFactory::ExpectedRunOrder::NO_EXPECTATION);
+
+ for (size_t i = 0; i < kNumTasksPerTest; ++i) {
+ // Post a task in the shared priority queue. Don't wake up the WorkerThread.
+ PostTaskHelper(make_scoped_ptr(new Task(
+ FROM_HERE, factory.CreateTaskClosure(), TaskTraits())),
+ make_scoped_refptr(new Sequence), &shared_priority_queue_,
+ &task_tracker_);
+
+ // Post a task in the single-threaded priority queue. The TaskRunner will
+ // wake up the WorkerThread.
+ worker_thread_->CreateTaskRunnerWithTraits(TaskTraits())
+ ->PostTask(FROM_HERE, factory.CreateTaskClosure());
+
+ factory.WaitForAllTasksToRun();
+ WaitUntilIdle();
+ EXPECT_EQ(2 * (i + 1), num_state_changes());
+ }
+
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
+}
+
+// Verify that 20 * |kNumTasksPerTest| single-threaded tasks posted to a single
+// WorkerThread from 20 threads run successfully.
+TEST_F(TaskSchedulerWorkerThreadTest,
+ PostSingleThreadedTasksFromMultipleThreads) {
+ static const size_t kNumThreads = 20;
+ std::vector<scoped_ptr<ThreadPostingTasks>> threads;
+ for (size_t i = 0; i < kNumThreads; ++i) {
+ threads.push_back(
+ make_scoped_ptr(new ThreadPostingTasks(worker_thread_.get())));
+ threads.back()->Start();
+ }
+
+ for (const auto& thread : threads) {
+ thread->Join();
+ thread->WaitForAllTasksToRun();
+ }
+
+ WaitUntilIdle();
+ EXPECT_GE(num_state_changes(), 2U);
+ EXPECT_EQ(0U, num_popped_task_from_shared_sequence());
+}
+
+} // namespace internal
+} // namespace base
« base/task_scheduler/worker_thread.cc ('K') | « base/task_scheduler/worker_thread.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698