Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3553)

Unified Diff: base/threading/sequenced_worker_pool_unittest.cc

Issue 8416019: Add a sequenced worker pool (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/
Patch Set: '' Created 9 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « base/threading/sequenced_worker_pool.cc ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: base/threading/sequenced_worker_pool_unittest.cc
===================================================================
--- base/threading/sequenced_worker_pool_unittest.cc (revision 0)
+++ base/threading/sequenced_worker_pool_unittest.cc (revision 0)
@@ -0,0 +1,373 @@
+// Copyright (c) 2011 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <algorithm>
+
+#include "base/bind.h"
+#include "base/memory/ref_counted.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/synchronization/waitable_event.h"
+#include "base/threading/platform_thread.h"
+#include "base/threading/sequenced_worker_pool.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace base {
+
+// IMPORTANT NOTE:
+//
+// Many of these tests have failure modes where they'll hang forever. These
+// tests should not be flaky, and hangling indicates a type of failure. Do not
+// mark as flaky if they're hanging, it's likely an actual bug.
+
+namespace {
+
+const size_t kNumWorkerThreads = 3;
+
+class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
+ public:
+ TestTracker()
+ : lock_(),
+ cond_var_(&lock_),
+ completed_event_(false, false),
+ started_events_(0) {
+ }
+
+ // Each of these tasks appends the argument to the complete sequence vector
+ // so calling code can see what order they finished in.
+ void FastTask(int id) {
+ SignalWorkerDone(id);
+ }
+ void SlowTask(int id) {
+ base::PlatformThread::Sleep(1000);
+ SignalWorkerDone(id);
+ }
+ void BlockOnEventTask(int id, base::WaitableEvent* event) {
+ // Note that this task has started and signal anwaybody waiting for that
jar (doing other things) 2011/12/29 18:15:45 nit: anwaybody-->anybody
+ // to happen.
+ {
+ base::AutoLock lock(lock_);
+ started_events_++;
+ }
+ cond_var_.Signal();
+
jar (doing other things) 2011/12/29 18:15:45 With murphy threads, all the tasks *could* pause a
brettw 2011/12/29 18:23:17 Ah, good catch. I'll fix this.
+ event->Wait();
+ SignalWorkerDone(id);
+ }
+
+ // Waits until the given number of tasks have started executing.
+ void WaitUntilEventTasksBlocked(int count) {
+ base::AutoLock lock(lock_);
+ while (started_events_ < count)
+ cond_var_.Wait();
+ }
+
+ // Blocks the current thread until at least the given number of tasks are in
+ // the completed vector, and then returns a copy.
+ std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
+ for (;;) {
+ {
+ base::AutoLock lock(lock_);
+ if (complete_sequence_.size() >= num_tasks)
+ return complete_sequence_;
+ }
+ completed_event_.Wait();
+ }
+ }
+
+ void ClearCompleteSequence() {
+ base::AutoLock lock(lock_);
+ complete_sequence_.clear();
+ started_events_ = 0;
+ }
+
+ private:
+ void SignalWorkerDone(int id) {
+ base::AutoLock lock(lock_);
+ complete_sequence_.push_back(id);
+ completed_event_.Signal();
+ }
+
+ // Protects the complete_sequence.
+ base::Lock lock_;
+
+ base::ConditionVariable cond_var_;
+
+ // Signaled every time something is posted to the complete_sequence_.
+ base::WaitableEvent completed_event_;
+
+ // Protected by lock_.
+ std::vector<int> complete_sequence_;
+
+ int started_events_;
+};
+
+class SequencedWorkerPoolTest : public testing::Test,
+ public SequencedWorkerPool::TestingObserver {
+ public:
+ SequencedWorkerPoolTest()
+ : pool_(kNumWorkerThreads, "test"),
+ tracker_(new TestTracker) {
+ pool_.SetTestingObserver(this);
+ }
+ ~SequencedWorkerPoolTest() {
+ }
+
+ virtual void SetUp() {
+ }
+ virtual void TearDown() {
+ pool_.Shutdown();
+ }
+
+ SequencedWorkerPool& pool() { return pool_; }
+ TestTracker* tracker() { return tracker_.get(); }
+
+ // Ensures that the given number of worker threads is created by adding
+ // tasks and waiting until they complete. Worker thread creation is
+ // serialized, can happen on background threads asynchronously, and doesn't
+ // happen any more at shutdown. This means that if a test posts a bunch of
+ // tasks and calls shutdown, fewer workers will be created than the test may
+ // expect.
+ //
+ // This function ensures that this condition can't happen so tests can make
+ // assumptions about the number of workers active. See the comment in
+ // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
+ // details.
+ //
+ // It will post tasks to the queue with id -1. It also assumes this is the
+ // first thing called in a test since it will clear the complete_sequence_.
+ void EnsureAllWorkersCreated() {
+ // Create a bunch of threads, all waiting for an event. This will cause
+ // that may workers to be created.
+ base::WaitableEvent event(false, false);
+ for (size_t i = 0; i < kNumWorkerThreads; i++) {
+ pool().PostWorkerTask(FROM_HERE,
+ base::Bind(&TestTracker::BlockOnEventTask,
+ tracker(), -1, &event));
+ }
+ tracker()->WaitUntilEventTasksBlocked(kNumWorkerThreads);
+
+ // Now signal all the workers and wait until they're done.
+ for (size_t i = 0; i < kNumWorkerThreads; i++)
+ event.Signal();
jar (doing other things) 2011/12/29 18:15:45 As noted above (see line 54), this won't consisten
+ tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
+
+ // Clean up the task IDs we added.
+ tracker()->ClearCompleteSequence();
+ }
+
+ protected:
+ // This closure will be executed right before the pool blocks on shutdown.
+ base::Closure before_wait_for_shutdown_;
+
+ private:
+ // SequencedWorkerPool::TestingObserver implementation.
+ virtual void WillWaitForShutdown() {
+ if (!before_wait_for_shutdown_.is_null())
+ before_wait_for_shutdown_.Run();
+ }
+
+ SequencedWorkerPool pool_;
+ scoped_refptr<TestTracker> tracker_;
+};
+
+// Checks that the given number of entries are in the tasks to complete of
+// the given tracker, and then signals the given event the given number of
+// times. This is used to wakt up blocked background threads before blocking
+// on shutdown.
+void EnsureTasksToCompleteCountAndSignal(scoped_refptr<TestTracker> tracker,
+ size_t expected_tasks_to_complete,
+ base::WaitableEvent* event,
+ size_t times_to_signal_event) {
+ EXPECT_EQ(
+ expected_tasks_to_complete,
+ tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
+
+ for (size_t i = 0; i < times_to_signal_event; i++)
+ event->Signal();
jar (doing other things) 2011/12/29 18:15:45 I don't think event contains a counter, and hence
+}
+
+} // namespace
+
+// Tests that same-named tokens have the same ID.
+TEST_F(SequencedWorkerPoolTest, NamedTokens) {
+ const std::string name1("hello");
+ SequencedWorkerPool::SequenceToken token1 =
+ pool().GetNamedSequenceToken(name1);
+
+ SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken();
+
+ const std::string name3("goodbye");
+ SequencedWorkerPool::SequenceToken token3 =
+ pool().GetNamedSequenceToken(name3);
+
+ // All 3 tokens should be different.
+ EXPECT_FALSE(token1.Equals(token2));
+ EXPECT_FALSE(token1.Equals(token3));
+ EXPECT_FALSE(token2.Equals(token3));
+
+ // Requesting the same name again should give the same value.
+ SequencedWorkerPool::SequenceToken token1again =
+ pool().GetNamedSequenceToken(name1);
+ EXPECT_TRUE(token1.Equals(token1again));
+
+ SequencedWorkerPool::SequenceToken token3again =
+ pool().GetNamedSequenceToken(name3);
+ EXPECT_TRUE(token3.Equals(token3again));
+}
+
+// Tests that posting a bunch of tasks (many more than the number of worker
+// threads) runs them all.
+TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
+ pool().PostWorkerTask(FROM_HERE,
+ base::Bind(&TestTracker::SlowTask, tracker(), 0));
+
+ const size_t kNumTasks = 20;
+ for (size_t i = 1; i < kNumTasks; i++) {
+ pool().PostWorkerTask(FROM_HERE,
+ base::Bind(&TestTracker::FastTask, tracker(), i));
+ }
+
+ std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
+ EXPECT_EQ(kNumTasks, result.size());
+}
+
+// Test that tasks with the same sequence token are executed in order but don't
+// affect other tasks.
+TEST_F(SequencedWorkerPoolTest, Sequence) {
+ // Fill all the worker threads except one.
+ base::WaitableEvent background_event(false, false);
+ const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
+ for (size_t i = 0; i < kNumBackgroundTasks; i++) {
+ pool().PostWorkerTask(FROM_HERE,
+ base::Bind(&TestTracker::BlockOnEventTask,
+ tracker(), i, &background_event));
+ }
+ tracker()->WaitUntilEventTasksBlocked(kNumBackgroundTasks);
+
+ // Create two tasks with the same sequence token, one that will block on the
+ // event, and one which will just complete quickly when it's run. Since there
+ // is one worker thread free, the first task will start and then block, and
+ // the second task should be waiting.
+ base::WaitableEvent event1(false, false);
+ SequencedWorkerPool::SequenceToken token1 = pool().GetSequenceToken();
+ pool().PostSequencedWorkerTask(
+ token1, FROM_HERE,
+ base::Bind(&TestTracker::BlockOnEventTask, tracker(), 100,
+ &event1));
+ pool().PostSequencedWorkerTask(
+ token1, FROM_HERE,
+ base::Bind(&TestTracker::FastTask, tracker(), 101));
+ EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
+
+ // Create another two tasks as above with a different token. These will be
+ // blocked since there are no slots to run.
+ SequencedWorkerPool::SequenceToken token2 = pool().GetSequenceToken();
+ pool().PostSequencedWorkerTask(
+ token2, FROM_HERE,
+ base::Bind(&TestTracker::FastTask, tracker(), 200));
+ pool().PostSequencedWorkerTask(
+ token2, FROM_HERE,
+ base::Bind(&TestTracker::FastTask, tracker(), 201));
+ EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
+
+ // Let one background task complete. This should then let both tasks of
+ // token2 run to completion in order. The second task of token1 should still
+ // be blocked.
+ background_event.Signal();
+ std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
+ ASSERT_EQ(3u, result.size());
+ EXPECT_EQ(200, result[1]);
+ EXPECT_EQ(201, result[2]);
+
+ // Finish the rest of the background tasks. This should leave some workers
+ // free with the second token1 task still blocked on the first.
+ for (size_t i = 0; i < kNumBackgroundTasks - 1; i++)
+ background_event.Signal();
+ EXPECT_EQ(kNumBackgroundTasks + 2,
+ tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
+
+ // Allow the first task of token1 to complete. This should run the second.
+ event1.Signal();
+ result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
+ ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
+ EXPECT_EQ(100, result[result.size() - 2]);
+ EXPECT_EQ(101, result[result.size() - 1]);
+}
+
+// Tests that unrun tasks are discarded properly according to their shutdown
+// mode.
+TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
+ // Start tasks to take all the threads and block them.
+ EnsureAllWorkersCreated();
+ base::WaitableEvent background_event(false, false);
+ for (size_t i = 0; i < kNumWorkerThreads; i++) {
+ pool().PostWorkerTask(FROM_HERE,
+ base::Bind(&TestTracker::BlockOnEventTask,
+ tracker(), i, &background_event));
+ }
+ tracker()->WaitUntilEventTasksBlocked(kNumWorkerThreads);
+
+ // Create some tasks with different shutdown modes.
+ pool().PostWorkerTaskWithShutdownBehavior(
+ FROM_HERE,
+ base::Bind(&TestTracker::FastTask, tracker(), 100),
+ SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
+ pool().PostWorkerTaskWithShutdownBehavior(
+ FROM_HERE,
+ base::Bind(&TestTracker::FastTask, tracker(), 101),
+ SequencedWorkerPool::SKIP_ON_SHUTDOWN);
+ pool().PostWorkerTaskWithShutdownBehavior(
+ FROM_HERE,
+ base::Bind(&TestTracker::FastTask, tracker(), 102),
+ SequencedWorkerPool::BLOCK_SHUTDOWN);
+
+ // Shutdown the worker pool. This should discard all non-blocking tasks.
+ before_wait_for_shutdown_ =
+ base::Bind(&EnsureTasksToCompleteCountAndSignal,
+ scoped_refptr<TestTracker>(tracker()), 0,
+ &background_event, kNumWorkerThreads);
+ pool().Shutdown();
+
+ std::vector<int> result = tracker()->WaitUntilTasksComplete(4);
+
+ // Tge kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
+ // one, in no particular order.
+ ASSERT_EQ(4u, result.size());
+ for (size_t i = 0; i < kNumWorkerThreads; i++) {
+ EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
+ result.end());
+ }
+ EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
+}
+
+// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
+TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
+ EnsureAllWorkersCreated();
+ base::WaitableEvent background_event(false, false);
+ pool().PostWorkerTaskWithShutdownBehavior(
+ FROM_HERE,
+ base::Bind(&TestTracker::BlockOnEventTask,
+ tracker(), 0, &background_event),
+ SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
+ tracker()->WaitUntilEventTasksBlocked(1);
+
+ // This should not block. If this test hangs, it means it failed.
+ pool().Shutdown();
+
+ // The task should not have completed yet.
+ EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
+
+ // Posting more tasks should fail.
+ EXPECT_FALSE(pool().PostWorkerTaskWithShutdownBehavior(
+ FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
+ SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
+
+ // Continue the background thread and make sure the task can complete.
+ background_event.Signal();
+ std::vector<int> result = tracker()->WaitUntilTasksComplete(1);
+ EXPECT_EQ(1u, result.size());
+}
+
+} // namespace base
« no previous file with comments | « base/threading/sequenced_worker_pool.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698