Chromium Code Reviews| Index: base/threading/sequenced_worker_pool_unittest.cc |
| =================================================================== |
| --- base/threading/sequenced_worker_pool_unittest.cc (revision 0) |
| +++ base/threading/sequenced_worker_pool_unittest.cc (revision 0) |
| @@ -0,0 +1,373 @@ |
| +// Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include <algorithm> |
| + |
| +#include "base/bind.h" |
| +#include "base/memory/ref_counted.h" |
| +#include "base/synchronization/condition_variable.h" |
| +#include "base/synchronization/lock.h" |
| +#include "base/synchronization/waitable_event.h" |
| +#include "base/threading/platform_thread.h" |
| +#include "base/threading/sequenced_worker_pool.h" |
| +#include "testing/gtest/include/gtest/gtest.h" |
| + |
| +namespace base { |
| + |
| +// IMPORTANT NOTE: |
| +// |
| +// Many of these tests have failure modes where they'll hang forever. These |
| +// tests should not be flaky, and hangling indicates a type of failure. Do not |
| +// mark as flaky if they're hanging, it's likely an actual bug. |
| + |
| +namespace { |
| + |
| +const size_t kNumWorkerThreads = 3; |
| + |
| +class TestTracker : public base::RefCountedThreadSafe<TestTracker> { |
| + public: |
| + TestTracker() |
| + : lock_(), |
| + cond_var_(&lock_), |
| + completed_event_(false, false), |
| + started_events_(0) { |
| + } |
| + |
| + // Each of these tasks appends the argument to the complete sequence vector |
| + // so calling code can see what order they finished in. |
| + void FastTask(int id) { |
| + SignalWorkerDone(id); |
| + } |
| + void SlowTask(int id) { |
| + base::PlatformThread::Sleep(1000); |
| + SignalWorkerDone(id); |
| + } |
| + void BlockOnEventTask(int id, base::WaitableEvent* event) { |
| + // Note that this task has started and signal anwaybody waiting for that |
|
jar (doing other things)
2011/12/29 18:15:45
nit: anwaybody-->anybody
|
| + // to happen. |
| + { |
| + base::AutoLock lock(lock_); |
| + started_events_++; |
| + } |
| + cond_var_.Signal(); |
| + |
|
jar (doing other things)
2011/12/29 18:15:45
With murphy threads, all the tasks *could* pause a
brettw
2011/12/29 18:23:17
Ah, good catch. I'll fix this.
|
| + event->Wait(); |
| + SignalWorkerDone(id); |
| + } |
| + |
| + // Waits until the given number of tasks have started executing. |
| + void WaitUntilEventTasksBlocked(int count) { |
| + base::AutoLock lock(lock_); |
| + while (started_events_ < count) |
| + cond_var_.Wait(); |
| + } |
| + |
| + // Blocks the current thread until at least the given number of tasks are in |
| + // the completed vector, and then returns a copy. |
| + std::vector<int> WaitUntilTasksComplete(size_t num_tasks) { |
| + for (;;) { |
| + { |
| + base::AutoLock lock(lock_); |
| + if (complete_sequence_.size() >= num_tasks) |
| + return complete_sequence_; |
| + } |
| + completed_event_.Wait(); |
| + } |
| + } |
| + |
| + void ClearCompleteSequence() { |
| + base::AutoLock lock(lock_); |
| + complete_sequence_.clear(); |
| + started_events_ = 0; |
| + } |
| + |
| + private: |
| + void SignalWorkerDone(int id) { |
| + base::AutoLock lock(lock_); |
| + complete_sequence_.push_back(id); |
| + completed_event_.Signal(); |
| + } |
| + |
| + // Protects the complete_sequence. |
| + base::Lock lock_; |
| + |
| + base::ConditionVariable cond_var_; |
| + |
| + // Signaled every time something is posted to the complete_sequence_. |
| + base::WaitableEvent completed_event_; |
| + |
| + // Protected by lock_. |
| + std::vector<int> complete_sequence_; |
| + |
| + int started_events_; |
| +}; |
| + |
| +class SequencedWorkerPoolTest : public testing::Test, |
| + public SequencedWorkerPool::TestingObserver { |
| + public: |
| + SequencedWorkerPoolTest() |
| + : pool_(kNumWorkerThreads, "test"), |
| + tracker_(new TestTracker) { |
| + pool_.SetTestingObserver(this); |
| + } |
| + ~SequencedWorkerPoolTest() { |
| + } |
| + |
| + virtual void SetUp() { |
| + } |
| + virtual void TearDown() { |
| + pool_.Shutdown(); |
| + } |
| + |
| + SequencedWorkerPool& pool() { return pool_; } |
| + TestTracker* tracker() { return tracker_.get(); } |
| + |
| + // Ensures that the given number of worker threads is created by adding |
| + // tasks and waiting until they complete. Worker thread creation is |
| + // serialized, can happen on background threads asynchronously, and doesn't |
| + // happen any more at shutdown. This means that if a test posts a bunch of |
| + // tasks and calls shutdown, fewer workers will be created than the test may |
| + // expect. |
| + // |
| + // This function ensures that this condition can't happen so tests can make |
| + // assumptions about the number of workers active. See the comment in |
| + // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more |
| + // details. |
| + // |
| + // It will post tasks to the queue with id -1. It also assumes this is the |
| + // first thing called in a test since it will clear the complete_sequence_. |
| + void EnsureAllWorkersCreated() { |
| + // Create a bunch of threads, all waiting for an event. This will cause |
| + // that may workers to be created. |
| + base::WaitableEvent event(false, false); |
| + for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| + pool().PostWorkerTask(FROM_HERE, |
| + base::Bind(&TestTracker::BlockOnEventTask, |
| + tracker(), -1, &event)); |
| + } |
| + tracker()->WaitUntilEventTasksBlocked(kNumWorkerThreads); |
| + |
| + // Now signal all the workers and wait until they're done. |
| + for (size_t i = 0; i < kNumWorkerThreads; i++) |
| + event.Signal(); |
|
jar (doing other things)
2011/12/29 18:15:45
As noted above (see line 54), this won't consisten
|
| + tracker()->WaitUntilTasksComplete(kNumWorkerThreads); |
| + |
| + // Clean up the task IDs we added. |
| + tracker()->ClearCompleteSequence(); |
| + } |
| + |
| + protected: |
| + // This closure will be executed right before the pool blocks on shutdown. |
| + base::Closure before_wait_for_shutdown_; |
| + |
| + private: |
| + // SequencedWorkerPool::TestingObserver implementation. |
| + virtual void WillWaitForShutdown() { |
| + if (!before_wait_for_shutdown_.is_null()) |
| + before_wait_for_shutdown_.Run(); |
| + } |
| + |
| + SequencedWorkerPool pool_; |
| + scoped_refptr<TestTracker> tracker_; |
| +}; |
| + |
| +// Checks that the given number of entries are in the tasks to complete of |
| +// the given tracker, and then signals the given event the given number of |
| +// times. This is used to wakt up blocked background threads before blocking |
| +// on shutdown. |
| +void EnsureTasksToCompleteCountAndSignal(scoped_refptr<TestTracker> tracker, |
| + size_t expected_tasks_to_complete, |
| + base::WaitableEvent* event, |
| + size_t times_to_signal_event) { |
| + EXPECT_EQ( |
| + expected_tasks_to_complete, |
| + tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size()); |
| + |
| + for (size_t i = 0; i < times_to_signal_event; i++) |
| + event->Signal(); |
|
jar (doing other things)
2011/12/29 18:15:45
I don't think event contains a counter, and hence
|
| +} |
| + |
| +} // namespace |
| + |
| +// Tests that same-named tokens have the same ID. |
| +TEST_F(SequencedWorkerPoolTest, NamedTokens) { |
| + const std::string name1("hello"); |
| + SequencedWorkerPool::SequenceToken token1 = |
| + pool().GetNamedSequenceToken(name1); |
| + |
| + SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); |
| + |
| + const std::string name3("goodbye"); |
| + SequencedWorkerPool::SequenceToken token3 = |
| + pool().GetNamedSequenceToken(name3); |
| + |
| + // All 3 tokens should be different. |
| + EXPECT_FALSE(token1.Equals(token2)); |
| + EXPECT_FALSE(token1.Equals(token3)); |
| + EXPECT_FALSE(token2.Equals(token3)); |
| + |
| + // Requesting the same name again should give the same value. |
| + SequencedWorkerPool::SequenceToken token1again = |
| + pool().GetNamedSequenceToken(name1); |
| + EXPECT_TRUE(token1.Equals(token1again)); |
| + |
| + SequencedWorkerPool::SequenceToken token3again = |
| + pool().GetNamedSequenceToken(name3); |
| + EXPECT_TRUE(token3.Equals(token3again)); |
| +} |
| + |
| +// Tests that posting a bunch of tasks (many more than the number of worker |
| +// threads) runs them all. |
| +TEST_F(SequencedWorkerPoolTest, LotsOfTasks) { |
| + pool().PostWorkerTask(FROM_HERE, |
| + base::Bind(&TestTracker::SlowTask, tracker(), 0)); |
| + |
| + const size_t kNumTasks = 20; |
| + for (size_t i = 1; i < kNumTasks; i++) { |
| + pool().PostWorkerTask(FROM_HERE, |
| + base::Bind(&TestTracker::FastTask, tracker(), i)); |
| + } |
| + |
| + std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks); |
| + EXPECT_EQ(kNumTasks, result.size()); |
| +} |
| + |
| +// Test that tasks with the same sequence token are executed in order but don't |
| +// affect other tasks. |
| +TEST_F(SequencedWorkerPoolTest, Sequence) { |
| + // Fill all the worker threads except one. |
| + base::WaitableEvent background_event(false, false); |
| + const size_t kNumBackgroundTasks = kNumWorkerThreads - 1; |
| + for (size_t i = 0; i < kNumBackgroundTasks; i++) { |
| + pool().PostWorkerTask(FROM_HERE, |
| + base::Bind(&TestTracker::BlockOnEventTask, |
| + tracker(), i, &background_event)); |
| + } |
| + tracker()->WaitUntilEventTasksBlocked(kNumBackgroundTasks); |
| + |
| + // Create two tasks with the same sequence token, one that will block on the |
| + // event, and one which will just complete quickly when it's run. Since there |
| + // is one worker thread free, the first task will start and then block, and |
| + // the second task should be waiting. |
| + base::WaitableEvent event1(false, false); |
| + SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken(); |
| + pool().PostSequencedWorkerTask( |
| + token1, FROM_HERE, |
| + base::Bind(&TestTracker::BlockOnEventTask, tracker(), 100, |
| + &event1)); |
| + pool().PostSequencedWorkerTask( |
| + token1, FROM_HERE, |
| + base::Bind(&TestTracker::FastTask, tracker(), 101)); |
| + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); |
| + |
| + // Create another two tasks as above with a different token. These will be |
| + // blocked since there are no slots to run. |
| + SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken(); |
| + pool().PostSequencedWorkerTask( |
| + token2, FROM_HERE, |
| + base::Bind(&TestTracker::FastTask, tracker(), 200)); |
| + pool().PostSequencedWorkerTask( |
| + token2, FROM_HERE, |
| + base::Bind(&TestTracker::FastTask, tracker(), 201)); |
| + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); |
| + |
| + // Let one background task complete. This should then let both tasks of |
| + // token2 run to completion in order. The second task of token1 should still |
| + // be blocked. |
| + background_event.Signal(); |
| + std::vector<int> result = tracker()->WaitUntilTasksComplete(3); |
| + ASSERT_EQ(3u, result.size()); |
| + EXPECT_EQ(200, result[1]); |
| + EXPECT_EQ(201, result[2]); |
| + |
| + // Finish the rest of the background tasks. This should leave some workers |
| + // free with the second token1 task still blocked on the first. |
| + for (size_t i = 0; i < kNumBackgroundTasks - 1; i++) |
| + background_event.Signal(); |
| + EXPECT_EQ(kNumBackgroundTasks + 2, |
| + tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size()); |
| + |
| + // Allow the first task of token1 to complete. This should run the second. |
| + event1.Signal(); |
| + result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4); |
| + ASSERT_EQ(kNumBackgroundTasks + 4, result.size()); |
| + EXPECT_EQ(100, result[result.size() - 2]); |
| + EXPECT_EQ(101, result[result.size() - 1]); |
| +} |
| + |
| +// Tests that unrun tasks are discarded properly according to their shutdown |
| +// mode. |
| +TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) { |
| + // Start tasks to take all the threads and block them. |
| + EnsureAllWorkersCreated(); |
| + base::WaitableEvent background_event(false, false); |
| + for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| + pool().PostWorkerTask(FROM_HERE, |
| + base::Bind(&TestTracker::BlockOnEventTask, |
| + tracker(), i, &background_event)); |
| + } |
| + tracker()->WaitUntilEventTasksBlocked(kNumWorkerThreads); |
| + |
| + // Create some tasks with different shutdown modes. |
| + pool().PostWorkerTaskWithShutdownBehavior( |
| + FROM_HERE, |
| + base::Bind(&TestTracker::FastTask, tracker(), 100), |
| + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); |
| + pool().PostWorkerTaskWithShutdownBehavior( |
| + FROM_HERE, |
| + base::Bind(&TestTracker::FastTask, tracker(), 101), |
| + SequencedWorkerPool::SKIP_ON_SHUTDOWN); |
| + pool().PostWorkerTaskWithShutdownBehavior( |
| + FROM_HERE, |
| + base::Bind(&TestTracker::FastTask, tracker(), 102), |
| + SequencedWorkerPool::BLOCK_SHUTDOWN); |
| + |
| + // Shutdown the worker pool. This should discard all non-blocking tasks. |
| + before_wait_for_shutdown_ = |
| + base::Bind(&EnsureTasksToCompleteCountAndSignal, |
| + scoped_refptr<TestTracker>(tracker()), 0, |
| + &background_event, kNumWorkerThreads); |
| + pool().Shutdown(); |
| + |
| + std::vector<int> result = tracker()->WaitUntilTasksComplete(4); |
| + |
| + // Tge kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN |
| + // one, in no particular order. |
| + ASSERT_EQ(4u, result.size()); |
| + for (size_t i = 0; i < kNumWorkerThreads; i++) { |
| + EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) != |
| + result.end()); |
| + } |
| + EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end()); |
| +} |
| + |
| +// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown. |
| +TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) { |
| + EnsureAllWorkersCreated(); |
| + base::WaitableEvent background_event(false, false); |
| + pool().PostWorkerTaskWithShutdownBehavior( |
| + FROM_HERE, |
| + base::Bind(&TestTracker::BlockOnEventTask, |
| + tracker(), 0, &background_event), |
| + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN); |
| + tracker()->WaitUntilEventTasksBlocked(1); |
| + |
| + // This should not block. If this test hangs, it means it failed. |
| + pool().Shutdown(); |
| + |
| + // The task should not have completed yet. |
| + EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size()); |
| + |
| + // Posting more tasks should fail. |
| + EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior( |
| + FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0), |
| + SequencedWorkerPool::CONTINUE_ON_SHUTDOWN)); |
| + |
| + // Continue the background thread and make sure the task can complete. |
| + background_event.Signal(); |
| + std::vector<int> result = tracker()->WaitUntilTasksComplete(1); |
| + EXPECT_EQ(1u, result.size()); |
| +} |
| + |
| +} // namespace base |