Index: mojo/system/message_pipe_dispatcher_unittest.cc |
diff --git a/mojo/system/message_pipe_dispatcher_unittest.cc b/mojo/system/message_pipe_dispatcher_unittest.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..996565384ced65d11ab94299441329cf6dbe6285 |
--- /dev/null |
+++ b/mojo/system/message_pipe_dispatcher_unittest.cc |
@@ -0,0 +1,334 @@ |
+// Copyright 2013 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+// NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a |
+// heavily-loaded system). Sorry. |kEpsilonMicros| may be increased to increase |
+// tolerance and reduce observed flakiness. |
+ |
+#include "mojo/system/message_pipe_dispatcher.h" |
+ |
+#include "base/memory/ref_counted.h" |
+#include "base/threading/platform_thread.h" // For |Sleep()|. |
+#include "base/time/time.h" |
+#include "mojo/system/message_pipe.h" |
+#include "mojo/system/test_utils.h" |
+#include "mojo/system/waiter.h" |
+#include "mojo/system/waiter_test_utils.h" |
+#include "testing/gtest/include/gtest/gtest.h" |
+ |
+namespace mojo { |
+namespace system { |
+namespace { |
+ |
+const int64_t kMicrosPerMs = 1000; |
+const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms. |
+ |
+TEST(MessagePipeDispatcherTest, Basic) { |
+ test::Stopwatch stopwatch; |
+ int32_t buffer[1]; |
+ const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); |
+ uint32_t buffer_size; |
+ int64_t elapsed_micros; |
+ |
+ // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa. |
+ for (unsigned i = 0; i < 2; i++) { |
+ scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher()); |
+ scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher()); |
+ { |
+ scoped_refptr<MessagePipe> mp(new MessagePipe()); |
+ d_0->Init(mp, i); // 0, 1. |
+ d_1->Init(mp, i ^ 1); // 1, 0. |
+ } |
+ Waiter w; |
+ |
+ // Try adding a writable waiter when already writable. |
+ w.Init(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 0)); |
+ // Shouldn't need to remove the waiter (it was not added). |
+ |
+ // Add a readable waiter to |d_0|, then make it readable (by writing to |
+ // |d_1|), then wait. |
+ w.Init(); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1)); |
+ buffer[0] = 123456789; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_1->WriteMessage(buffer, kBufferSize, |
+ NULL, 0, |
+ MOJO_WRITE_MESSAGE_FLAG_NONE)); |
+ stopwatch.Start(); |
+ EXPECT_EQ(1, w.Wait(MOJO_DEADLINE_INDEFINITE)); |
+ elapsed_micros = stopwatch.Elapsed(); |
+ EXPECT_LT(elapsed_micros, kEpsilonMicros); |
+ d_0->RemoveWaiter(&w); |
+ |
+ // Try adding a readable waiter when already readable (from above). |
+ w.Init(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2)); |
+ // Shouldn't need to remove the waiter (it was not added). |
+ |
+ // Make |d_0| no longer readable (by reading from it). |
+ buffer[0] = 0; |
+ buffer_size = kBufferSize; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_0->ReadMessage(buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ EXPECT_EQ(kBufferSize, buffer_size); |
+ EXPECT_EQ(123456789, buffer[0]); |
+ |
+ // Wait for zero time for readability on |d_0| (will time out). |
+ w.Init(); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3)); |
+ stopwatch.Start(); |
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0)); |
+ elapsed_micros = stopwatch.Elapsed(); |
+ EXPECT_LT(elapsed_micros, kEpsilonMicros); |
+ d_0->RemoveWaiter(&w); |
+ |
+ // Wait for non-zero, finite time for readability on |d_0| (will time out). |
+ w.Init(); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3)); |
+ stopwatch.Start(); |
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(2 * kEpsilonMicros)); |
+ elapsed_micros = stopwatch.Elapsed(); |
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); |
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); |
+ d_0->RemoveWaiter(&w); |
+ |
+ EXPECT_EQ(MOJO_RESULT_OK, d_0->Close()); |
+ EXPECT_EQ(MOJO_RESULT_OK, d_1->Close()); |
+ } |
+} |
+ |
+// Test what happens when one end is closed (single-threaded test). |
+TEST(MessagePipeDispatcherTest, BasicClosed) { |
+ int32_t buffer[1]; |
+ const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); |
+ uint32_t buffer_size; |
+ |
+ // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa. |
+ for (unsigned i = 0; i < 2; i++) { |
+ scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher()); |
+ scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher()); |
+ { |
+ scoped_refptr<MessagePipe> mp(new MessagePipe()); |
+ d_0->Init(mp, i); // 0, 1. |
+ d_1->Init(mp, i ^ 1); // 1, 0. |
+ } |
+ Waiter w; |
+ |
+ // Write (twice) to |d_1|. |
+ buffer[0] = 123456789; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_1->WriteMessage(buffer, kBufferSize, |
+ NULL, 0, |
+ MOJO_WRITE_MESSAGE_FLAG_NONE)); |
+ buffer[0] = 234567890; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_1->WriteMessage(buffer, kBufferSize, |
+ NULL, 0, |
+ MOJO_WRITE_MESSAGE_FLAG_NONE)); |
+ |
+ // Try waiting for readable on |d_0|; should fail (already satisfied). |
+ w.Init(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0)); |
+ |
+ // Close |d_1|. |
+ EXPECT_EQ(MOJO_RESULT_OK, d_1->Close()); |
+ |
+ // Try waiting for readable on |d_0|; should fail (already satisfied). |
+ w.Init(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1)); |
+ |
+ // Read from |d_0|. |
+ buffer[0] = 0; |
+ buffer_size = kBufferSize; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_0->ReadMessage(buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ EXPECT_EQ(kBufferSize, buffer_size); |
+ EXPECT_EQ(123456789, buffer[0]); |
+ |
+ // Try waiting for readable on |d_0|; should fail (already satisfied). |
+ w.Init(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2)); |
+ |
+ // Read again from |d_0|. |
+ buffer[0] = 0; |
+ buffer_size = kBufferSize; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_0->ReadMessage(buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ EXPECT_EQ(kBufferSize, buffer_size); |
+ EXPECT_EQ(234567890, buffer[0]); |
+ |
+ // Try waiting for readable on |d_0|; should fail (unsatisfiable). |
+ w.Init(); |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3)); |
+ |
+ // Try waiting for writable on |d_0|; should fail (unsatisfiable). |
+ w.Init(); |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
+ d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4)); |
+ |
+ // Try reading from |d_0|; should fail (nothing to read). |
+ buffer[0] = 0; |
+ buffer_size = kBufferSize; |
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND, |
+ d_0->ReadMessage(buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ |
+ // Try writing to |d_0|; should fail (other end closed). |
+ buffer[0] = 345678901; |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
+ d_0->WriteMessage(buffer, kBufferSize, |
+ NULL, 0, |
+ MOJO_WRITE_MESSAGE_FLAG_NONE)); |
+ |
+ EXPECT_EQ(MOJO_RESULT_OK, d_0->Close()); |
+ } |
+} |
+ |
+TEST(MessagePipeDispatcherTest, BasicThreaded) { |
+ test::Stopwatch stopwatch; |
+ int32_t buffer[1]; |
+ const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); |
+ uint32_t buffer_size; |
+ bool did_wait; |
+ MojoResult result; |
+ int64_t elapsed_micros; |
+ |
+ // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa. |
+ for (unsigned i = 0; i < 2; i++) { |
+ scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher()); |
+ scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher()); |
+ { |
+ scoped_refptr<MessagePipe> mp(new MessagePipe()); |
+ d_0->Init(mp, i); // 0, 1. |
+ d_1->Init(mp, i ^ 1); // 1, 0. |
+ } |
+ |
+ // Wait for readable on |d_1|, which will become readable after some time. |
+ { |
+ test::WaiterThread thread(d_1, |
+ MOJO_WAIT_FLAG_READABLE, |
+ MOJO_DEADLINE_INDEFINITE, |
+ 0, |
+ &did_wait, &result); |
+ stopwatch.Start(); |
+ thread.Start(); |
+ base::PlatformThread::Sleep( |
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); |
+ // Wake it up by writing to |d_0|. |
+ buffer[0] = 123456789; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_0->WriteMessage(buffer, kBufferSize, |
+ NULL, 0, |
+ MOJO_WRITE_MESSAGE_FLAG_NONE)); |
+ } // Joins the thread. |
+ elapsed_micros = stopwatch.Elapsed(); |
+ EXPECT_TRUE(did_wait); |
+ EXPECT_EQ(0, result); |
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); |
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); |
+ |
+ // Now |d_1| is already readable. Try waiting for it again. |
+ { |
+ test::WaiterThread thread(d_1, |
+ MOJO_WAIT_FLAG_READABLE, |
+ MOJO_DEADLINE_INDEFINITE, |
+ 1, |
+ &did_wait, &result); |
+ stopwatch.Start(); |
+ thread.Start(); |
+ } // Joins the thread. |
+ elapsed_micros = stopwatch.Elapsed(); |
+ EXPECT_FALSE(did_wait); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); |
+ EXPECT_LT(elapsed_micros, kEpsilonMicros); |
+ |
+ // Consume what we wrote to |d_0|. |
+ buffer[0] = 0; |
+ buffer_size = kBufferSize; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ d_1->ReadMessage(buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ EXPECT_EQ(kBufferSize, buffer_size); |
+ EXPECT_EQ(123456789, buffer[0]); |
+ |
+ // Wait for readable on |d_1| and close |d_0| after some time, which should |
+ // cancel that wait. |
+ { |
+ test::WaiterThread thread(d_1, |
+ MOJO_WAIT_FLAG_READABLE, |
+ MOJO_DEADLINE_INDEFINITE, |
+ 0, |
+ &did_wait, &result); |
+ stopwatch.Start(); |
+ thread.Start(); |
+ base::PlatformThread::Sleep( |
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); |
+ EXPECT_EQ(MOJO_RESULT_OK, d_0->Close()); |
+ } // Joins the thread. |
+ elapsed_micros = stopwatch.Elapsed(); |
+ EXPECT_TRUE(did_wait); |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); |
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); |
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); |
+ |
+ EXPECT_EQ(MOJO_RESULT_OK, d_1->Close()); |
+ } |
+ |
+ for (unsigned i = 0; i < 2; i++) { |
+ scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher()); |
+ scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher()); |
+ { |
+ scoped_refptr<MessagePipe> mp(new MessagePipe()); |
+ d_0->Init(mp, i); // 0, 1. |
+ d_1->Init(mp, i ^ 1); // 1, 0. |
+ } |
+ |
+ // Wait for readable on |d_1| and close |d_1| after some time, which should |
+ // cancel that wait. |
+ { |
+ test::WaiterThread thread(d_1, |
+ MOJO_WAIT_FLAG_READABLE, |
+ MOJO_DEADLINE_INDEFINITE, |
+ 0, |
+ &did_wait, &result); |
+ stopwatch.Start(); |
+ thread.Start(); |
+ base::PlatformThread::Sleep( |
+ base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros)); |
+ EXPECT_EQ(MOJO_RESULT_OK, d_1->Close()); |
+ } // Joins the thread. |
+ elapsed_micros = stopwatch.Elapsed(); |
+ EXPECT_TRUE(did_wait); |
+ EXPECT_EQ(MOJO_RESULT_CANCELLED, result); |
+ EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros); |
+ EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros); |
+ |
+ EXPECT_EQ(MOJO_RESULT_OK, d_0->Close()); |
+ } |
+} |
+ |
+// TODO(vtl): Actually read/write on threads? |
+// TODO(vtl): Stress test? |
+ |
+} // namespace |
+} // namespace system |
+} // namespace mojo |