| Index: base/threading/sequenced_worker_pool_unittest.cc
|
| ===================================================================
|
| --- base/threading/sequenced_worker_pool_unittest.cc (revision 0)
|
| +++ base/threading/sequenced_worker_pool_unittest.cc (revision 0)
|
| @@ -0,0 +1,405 @@
|
| +// Copyright (c) 2011 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include <algorithm>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/memory/ref_counted.h"
|
| +#include "base/synchronization/condition_variable.h"
|
| +#include "base/synchronization/lock.h"
|
| +#include "base/threading/platform_thread.h"
|
| +#include "base/threading/sequenced_worker_pool.h"
|
| +#include "testing/gtest/include/gtest/gtest.h"
|
| +
|
| +namespace base {
|
| +
|
| +// IMPORTANT NOTE:
|
| +//
|
| +// Many of these tests have failure modes where they'll hang forever. These
|
| +// tests should not be flaky, and hangling indicates a type of failure. Do not
|
| +// mark as flaky if they're hanging, it's likely an actual bug.
|
| +
|
| +namespace {
|
| +
|
| +const size_t kNumWorkerThreads = 3;
|
| +
|
| +// Allows a number of threads to all be blocked on the same event, and
|
| +// provides a way to unblock a certain number of them.
|
| +class ThreadBlocker {
|
| + public:
|
| + ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {
|
| + }
|
| +
|
| + void Block() {
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + while (unblock_counter_ == 0)
|
| + cond_var_.Wait();
|
| + unblock_counter_--;
|
| + }
|
| + cond_var_.Signal();
|
| + }
|
| +
|
| + void Unblock(size_t count) {
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + DCHECK(unblock_counter_ == 0);
|
| + unblock_counter_ = count;
|
| + }
|
| + cond_var_.Signal();
|
| + }
|
| +
|
| + private:
|
| + base::Lock lock_;
|
| + base::ConditionVariable cond_var_;
|
| +
|
| + size_t unblock_counter_;
|
| +};
|
| +
|
| +class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
|
| + public:
|
| + TestTracker()
|
| + : lock_(),
|
| + cond_var_(&lock_),
|
| + started_events_(0) {
|
| + }
|
| +
|
| + // Each of these tasks appends the argument to the complete sequence vector
|
| + // so calling code can see what order they finished in.
|
| + void FastTask(int id) {
|
| + SignalWorkerDone(id);
|
| + }
|
| + void SlowTask(int id) {
|
| + base::PlatformThread::Sleep(1000);
|
| + SignalWorkerDone(id);
|
| + }
|
| +
|
| + void BlockTask(int id, ThreadBlocker* blocker) {
|
| + // Note that this task has started and signal anybody waiting for that
|
| + // to happen.
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + started_events_++;
|
| + }
|
| + cond_var_.Signal();
|
| +
|
| + blocker->Block();
|
| + SignalWorkerDone(id);
|
| + }
|
| +
|
| + // Waits until the given number of tasks have started executing.
|
| + void WaitUntilTasksBlocked(size_t count) {
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + while (started_events_ < count)
|
| + cond_var_.Wait();
|
| + }
|
| + cond_var_.Signal();
|
| + }
|
| +
|
| + // Blocks the current thread until at least the given number of tasks are in
|
| + // the completed vector, and then returns a copy.
|
| + std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
|
| + std::vector<int> ret;
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + while (complete_sequence_.size() < num_tasks)
|
| + cond_var_.Wait();
|
| + ret = complete_sequence_;
|
| + }
|
| + cond_var_.Signal();
|
| + return ret;
|
| + }
|
| +
|
| + void ClearCompleteSequence() {
|
| + base::AutoLock lock(lock_);
|
| + complete_sequence_.clear();
|
| + started_events_ = 0;
|
| + }
|
| +
|
| + private:
|
| + void SignalWorkerDone(int id) {
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + complete_sequence_.push_back(id);
|
| + }
|
| + cond_var_.Signal();
|
| + }
|
| +
|
| + // Protects the complete_sequence.
|
| + base::Lock lock_;
|
| +
|
| + base::ConditionVariable cond_var_;
|
| +
|
| + // Protected by lock_.
|
| + std::vector<int> complete_sequence_;
|
| +
|
| + // Counter of the number of "block" workers that have started.
|
| + size_t started_events_;
|
| +};
|
| +
|
| +class SequencedWorkerPoolTest : public testing::Test,
|
| + public SequencedWorkerPool::TestingObserver {
|
| + public:
|
| + SequencedWorkerPoolTest()
|
| + : pool_(kNumWorkerThreads, "test"),
|
| + tracker_(new TestTracker) {
|
| + pool_.SetTestingObserver(this);
|
| + }
|
| + ~SequencedWorkerPoolTest() {
|
| + }
|
| +
|
| + virtual void SetUp() {
|
| + }
|
| + virtual void TearDown() {
|
| + pool_.Shutdown();
|
| + }
|
| +
|
| + SequencedWorkerPool& pool() { return pool_; }
|
| + TestTracker* tracker() { return tracker_.get(); }
|
| +
|
| + // Ensures that the given number of worker threads is created by adding
|
| + // tasks and waiting until they complete. Worker thread creation is
|
| + // serialized, can happen on background threads asynchronously, and doesn't
|
| + // happen any more at shutdown. This means that if a test posts a bunch of
|
| + // tasks and calls shutdown, fewer workers will be created than the test may
|
| + // expect.
|
| + //
|
| + // This function ensures that this condition can't happen so tests can make
|
| + // assumptions about the number of workers active. See the comment in
|
| + // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
|
| + // details.
|
| + //
|
| + // It will post tasks to the queue with id -1. It also assumes this is the
|
| + // first thing called in a test since it will clear the complete_sequence_.
|
| + void EnsureAllWorkersCreated() {
|
| + // Create a bunch of threads, all waiting. This will cause that may
|
| + // workers to be created.
|
| + ThreadBlocker blocker;
|
| + for (size_t i = 0; i < kNumWorkerThreads; i++) {
|
| + pool().PostWorkerTask(FROM_HERE,
|
| + base::Bind(&TestTracker::BlockTask,
|
| + tracker(), -1, &blocker));
|
| + }
|
| + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
|
| +
|
| + // Now wake them up and wait until they're done.
|
| + blocker.Unblock(kNumWorkerThreads);
|
| + tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
|
| +
|
| + // Clean up the task IDs we added.
|
| + tracker()->ClearCompleteSequence();
|
| + }
|
| +
|
| + protected:
|
| + // This closure will be executed right before the pool blocks on shutdown.
|
| + base::Closure before_wait_for_shutdown_;
|
| +
|
| + private:
|
| + // SequencedWorkerPool::TestingObserver implementation.
|
| + virtual void WillWaitForShutdown() {
|
| + if (!before_wait_for_shutdown_.is_null())
|
| + before_wait_for_shutdown_.Run();
|
| + }
|
| +
|
| + SequencedWorkerPool pool_;
|
| + scoped_refptr<TestTracker> tracker_;
|
| +};
|
| +
|
| +// Checks that the given number of entries are in the tasks to complete of
|
| +// the given tracker, and then signals the given event the given number of
|
| +// times. This is used to wakt up blocked background threads before blocking
|
| +// on shutdown.
|
| +void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
|
| + size_t expected_tasks_to_complete,
|
| + ThreadBlocker* blocker,
|
| + size_t threads_to_awake) {
|
| + EXPECT_EQ(
|
| + expected_tasks_to_complete,
|
| + tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
|
| +
|
| + blocker->Unblock(threads_to_awake);
|
| +}
|
| +
|
| +} // namespace
|
| +
|
| +// Tests that same-named tokens have the same ID.
|
| +TEST_F(SequencedWorkerPoolTest, NamedTokens) {
|
| + const std::string name1("hello");
|
| + SequencedWorkerPool::SequenceToken token1 =
|
| + pool().GetNamedSequenceToken(name1);
|
| +
|
| + SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken();
|
| +
|
| + const std::string name3("goodbye");
|
| + SequencedWorkerPool::SequenceToken token3 =
|
| + pool().GetNamedSequenceToken(name3);
|
| +
|
| + // All 3 tokens should be different.
|
| + EXPECT_FALSE(token1.Equals(token2));
|
| + EXPECT_FALSE(token1.Equals(token3));
|
| + EXPECT_FALSE(token2.Equals(token3));
|
| +
|
| + // Requesting the same name again should give the same value.
|
| + SequencedWorkerPool::SequenceToken token1again =
|
| + pool().GetNamedSequenceToken(name1);
|
| + EXPECT_TRUE(token1.Equals(token1again));
|
| +
|
| + SequencedWorkerPool::SequenceToken token3again =
|
| + pool().GetNamedSequenceToken(name3);
|
| + EXPECT_TRUE(token3.Equals(token3again));
|
| +}
|
| +
|
| +// Tests that posting a bunch of tasks (many more than the number of worker
|
| +// threads) runs them all.
|
| +TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
|
| + pool().PostWorkerTask(FROM_HERE,
|
| + base::Bind(&TestTracker::SlowTask, tracker(), 0));
|
| +
|
| + const size_t kNumTasks = 20;
|
| + for (size_t i = 1; i < kNumTasks; i++) {
|
| + pool().PostWorkerTask(FROM_HERE,
|
| + base::Bind(&TestTracker::FastTask, tracker(), i));
|
| + }
|
| +
|
| + std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
|
| + EXPECT_EQ(kNumTasks, result.size());
|
| +}
|
| +
|
| +// Test that tasks with the same sequence token are executed in order but don't
|
| +// affect other tasks.
|
| +TEST_F(SequencedWorkerPoolTest, Sequence) {
|
| + // Fill all the worker threads except one.
|
| + const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
|
| + ThreadBlocker background_blocker;
|
| + for (size_t i = 0; i < kNumBackgroundTasks; i++) {
|
| + pool().PostWorkerTask(FROM_HERE,
|
| + base::Bind(&TestTracker::BlockTask,
|
| + tracker(), i, &background_blocker));
|
| + }
|
| + tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
|
| +
|
| + // Create two tasks with the same sequence token, one that will block on the
|
| + // event, and one which will just complete quickly when it's run. Since there
|
| + // is one worker thread free, the first task will start and then block, and
|
| + // the second task should be waiting.
|
| + ThreadBlocker blocker;
|
| + SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken();
|
| + pool().PostSequencedWorkerTask(
|
| + token1, FROM_HERE,
|
| + base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
|
| + pool().PostSequencedWorkerTask(
|
| + token1, FROM_HERE,
|
| + base::Bind(&TestTracker::FastTask, tracker(), 101));
|
| + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
|
| +
|
| + // Create another two tasks as above with a different token. These will be
|
| + // blocked since there are no slots to run.
|
| + SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken();
|
| + pool().PostSequencedWorkerTask(
|
| + token2, FROM_HERE,
|
| + base::Bind(&TestTracker::FastTask, tracker(), 200));
|
| + pool().PostSequencedWorkerTask(
|
| + token2, FROM_HERE,
|
| + base::Bind(&TestTracker::FastTask, tracker(), 201));
|
| + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
|
| +
|
| + // Let one background task complete. This should then let both tasks of
|
| + // token2 run to completion in order. The second task of token1 should still
|
| + // be blocked.
|
| + background_blocker.Unblock(1);
|
| + std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
|
| + ASSERT_EQ(3u, result.size());
|
| + EXPECT_EQ(200, result[1]);
|
| + EXPECT_EQ(201, result[2]);
|
| +
|
| + // Finish the rest of the background tasks. This should leave some workers
|
| + // free with the second token1 task still blocked on the first.
|
| + background_blocker.Unblock(kNumBackgroundTasks - 1);
|
| + EXPECT_EQ(kNumBackgroundTasks + 2,
|
| + tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
|
| +
|
| + // Allow the first task of token1 to complete. This should run the second.
|
| + blocker.Unblock(1);
|
| + result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
|
| + ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
|
| + EXPECT_EQ(100, result[result.size() - 2]);
|
| + EXPECT_EQ(101, result[result.size() - 1]);
|
| +}
|
| +
|
| +// Tests that unrun tasks are discarded properly according to their shutdown
|
| +// mode.
|
| +TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
|
| + // Start tasks to take all the threads and block them.
|
| + EnsureAllWorkersCreated();
|
| + ThreadBlocker blocker;
|
| + for (size_t i = 0; i < kNumWorkerThreads; i++) {
|
| + pool().PostWorkerTask(FROM_HERE,
|
| + base::Bind(&TestTracker::BlockTask,
|
| + tracker(), i, &blocker));
|
| + }
|
| + tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
|
| +
|
| + // Create some tasks with different shutdown modes.
|
| + pool().PostWorkerTaskWithShutdownBehavior(
|
| + FROM_HERE,
|
| + base::Bind(&TestTracker::FastTask, tracker(), 100),
|
| + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
|
| + pool().PostWorkerTaskWithShutdownBehavior(
|
| + FROM_HERE,
|
| + base::Bind(&TestTracker::FastTask, tracker(), 101),
|
| + SequencedWorkerPool::SKIP_ON_SHUTDOWN);
|
| + pool().PostWorkerTaskWithShutdownBehavior(
|
| + FROM_HERE,
|
| + base::Bind(&TestTracker::FastTask, tracker(), 102),
|
| + SequencedWorkerPool::BLOCK_SHUTDOWN);
|
| +
|
| + // Shutdown the worker pool. This should discard all non-blocking tasks.
|
| + before_wait_for_shutdown_ =
|
| + base::Bind(&EnsureTasksToCompleteCountAndUnblock,
|
| + scoped_refptr<TestTracker>(tracker()), 0,
|
| + &blocker, kNumWorkerThreads);
|
| + pool().Shutdown();
|
| +
|
| + std::vector<int> result = tracker()->WaitUntilTasksComplete(4);
|
| +
|
| + // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
|
| + // one, in no particular order.
|
| + ASSERT_EQ(4u, result.size());
|
| + for (size_t i = 0; i < kNumWorkerThreads; i++) {
|
| + EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
|
| + result.end());
|
| + }
|
| + EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
|
| +}
|
| +
|
| +// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
|
| +TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
|
| + EnsureAllWorkersCreated();
|
| + ThreadBlocker blocker;
|
| + pool().PostWorkerTaskWithShutdownBehavior(
|
| + FROM_HERE,
|
| + base::Bind(&TestTracker::BlockTask,
|
| + tracker(), 0, &blocker),
|
| + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
|
| + tracker()->WaitUntilTasksBlocked(1);
|
| +
|
| + // This should not block. If this test hangs, it means it failed.
|
| + pool().Shutdown();
|
| +
|
| + // The task should not have completed yet.
|
| + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
|
| +
|
| + // Posting more tasks should fail.
|
| + EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior(
|
| + FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
|
| + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
|
| +
|
| + // Continue the background thread and make sure the task can complete.
|
| + blocker.Unblock(1);
|
| + std::vector<int> result = tracker()->WaitUntilTasksComplete(1);
|
| + EXPECT_EQ(1u, result.size());
|
| +}
|
| +
|
| +} // namespace base
|
|
|