Index: mojo/common/handle_watcher_unittest.cc |
diff --git a/mojo/common/handle_watcher_unittest.cc b/mojo/common/handle_watcher_unittest.cc |
index 02cc6105d6a2bf9fcfe4c2311e62f3241e6c5978..b734e99b7fc9564a5390c6dc418adada04da5616 100644 |
--- a/mojo/common/handle_watcher_unittest.cc |
+++ b/mojo/common/handle_watcher_unittest.cc |
@@ -9,8 +9,10 @@ |
#include "base/at_exit.h" |
#include "base/auto_reset.h" |
#include "base/bind.h" |
+#include "base/memory/scoped_vector.h" |
#include "base/run_loop.h" |
#include "base/test/simple_test_tick_clock.h" |
+#include "base/threading/thread.h" |
#include "mojo/common/time_helper.h" |
#include "mojo/public/cpp/system/core.h" |
#include "mojo/public/cpp/test_support/test_utils.h" |
@@ -336,6 +338,104 @@ TEST(HandleWatcherCleanEnvironmentTest, AbortedOnMessageLoopDestruction) { |
EXPECT_EQ(MOJO_RESULT_ABORTED, result); |
} |
+void NeverReached(MojoResult result) { |
+ FAIL() << "Callback should never be invoked " << result; |
+} |
+ |
+// Called on the main thread when a thread is done. Decrements |active_count| |
+// and if |active_count| is zero quits |run_loop|. |
+void StressThreadDone(base::RunLoop* run_loop, int* active_count) { |
+ (*active_count)--; |
+ EXPECT_GE(*active_count, 0); |
+ if (*active_count == 0) |
+ run_loop->Quit(); |
+} |
+ |
+// See description of StressTest. This is called on the background thread. |
+// |count| is the number of HandleWatchers to create. |active_count| is the |
+// number of outstanding threads, |task_runner| the task runner for the main |
+// thread and |run_loop| the run loop that should be quit when there are no more |
+// threads running. When done StressThreadDone() is invoked on the main thread. |
+// |active_count| and |run_loop| should only be used on the main thread. |
+void RunStressTest(int count, |
+ scoped_refptr<base::TaskRunner> task_runner, |
+ base::RunLoop* run_loop, |
+ int* active_count) { |
+ struct TestData { |
+ MessagePipe pipe; |
+ HandleWatcher watcher; |
+ }; |
+ ScopedVector<TestData> data_vector; |
+ for (int i = 0; i < count; ++i) { |
+ if (i % 20 == 0) { |
+ // Every so often we wait. This results in some level of thread balancing |
+ // as well as making sure HandleWatcher has time to actually start some |
+ // watches. |
+ MessagePipe test_pipe; |
+ ASSERT_TRUE(test_pipe.handle0.is_valid()); |
+ CallbackHelper callback_helper; |
+ HandleWatcher watcher; |
+ callback_helper.Start(&watcher, test_pipe.handle0.get()); |
+ RunUntilIdle(); |
+ EXPECT_FALSE(callback_helper.got_callback()); |
+ EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(), |
+ std::string())); |
+ base::MessageLoop::ScopedNestableTaskAllower scoper( |
+ base::MessageLoop::current()); |
+ callback_helper.RunUntilGotCallback(); |
+ EXPECT_TRUE(callback_helper.got_callback()); |
+ } else { |
+ scoped_ptr<TestData> test_data(new TestData); |
+ ASSERT_TRUE(test_data->pipe.handle0.is_valid()); |
+ test_data->watcher.Start(test_data->pipe.handle0.get(), |
+ MOJO_HANDLE_SIGNAL_READABLE, |
+ MOJO_DEADLINE_INDEFINITE, |
+ base::Bind(&NeverReached)); |
+ data_vector.push_back(test_data.release()); |
+ } |
+ if (i % 15 == 0) |
+ data_vector.clear(); |
+ } |
+ task_runner->PostTask(FROM_HERE, |
+ base::Bind(&StressThreadDone, run_loop, |
+ active_count)); |
+} |
+ |
+// This test is meant to stress HandleWatcher. It uses from various threads |
+// repeatedly starting and stopping watches. It spins up kThreadCount |
+// threads. Each thread creates kWatchCount watches. Every so often each thread |
+// writes to a pipe and waits for the response. |
+TEST(HandleWatcherCleanEnvironmentTest, StressTest) { |
+#if defined(NDEBUG) |
+ const int kThreadCount = 15; |
+ const int kWatchCount = 400; |
+#else |
+ const int kThreadCount = 10; |
+ const int kWatchCount = 250; |
+#endif |
+ |
+ base::ShadowingAtExitManager at_exit; |
+ base::MessageLoop message_loop; |
+ base::RunLoop run_loop; |
+ ScopedVector<base::Thread> threads; |
+ int threads_active_counter = kThreadCount; |
+ // Starts the threads first and then post the task in hopes of having more |
+ // threads running at once. |
+ for (int i = 0; i < kThreadCount; ++i) { |
+ scoped_ptr<base::Thread> thread(new base::Thread("test thread")); |
+ thread->Start(); |
+ threads.push_back(thread.release()); |
+ } |
+ for (int i = 0; i < kThreadCount; ++i) { |
+ threads[i]->task_runner()->PostTask( |
+ FROM_HERE, base::Bind(&RunStressTest, kWatchCount, |
+ message_loop.task_runner(), |
+ &run_loop, &threads_active_counter)); |
+ } |
+ run_loop.Run(); |
+ ASSERT_EQ(0, threads_active_counter); |
+} |
+ |
} // namespace test |
} // namespace common |
} // namespace mojo |