Index: mojo/system/remote_message_pipe_posix_unittest.cc |
diff --git a/mojo/system/remote_message_pipe_posix_unittest.cc b/mojo/system/remote_message_pipe_posix_unittest.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..5f1616690cfa86ae4108090fedaed3ad1e2d4f5d |
--- /dev/null |
+++ b/mojo/system/remote_message_pipe_posix_unittest.cc |
@@ -0,0 +1,342 @@ |
+// Copyright 2013 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+// TODO(vtl): Factor out the POSIX-specific bits of this test (once we have a |
+// non-POSIX implementation). |
+ |
+#include "mojo/system/message_pipe.h" |
+ |
+#include <fcntl.h> |
+#include <stdint.h> |
+#include <string.h> |
+#include <sys/socket.h> |
+#include <sys/types.h> |
+#include <unistd.h> |
+ |
+#include "base/basictypes.h" |
+#include "base/bind.h" |
+#include "base/callback.h" |
+#include "base/location.h" |
+#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/synchronization/waitable_event.h" |
+#include "base/threading/thread.h" |
+#include "mojo/system/channel.h" |
+#include "mojo/system/local_message_pipe_endpoint.h" |
+#include "mojo/system/platform_channel_handle.h" |
+#include "mojo/system/proxy_message_pipe_endpoint.h" |
+#include "mojo/system/test_utils.h" |
+#include "mojo/system/waiter.h" |
+#include "testing/gtest/include/gtest/gtest.h" |
+ |
+namespace mojo { |
+namespace system { |
+namespace { |
+ |
+class RemoteMessagePipeTest : public testing::Test { |
+ public: |
+ RemoteMessagePipeTest() : io_thread_("io_thread") { |
+ } |
+ |
+ virtual ~RemoteMessagePipeTest() { |
+ } |
+ |
+ virtual void SetUp() OVERRIDE { |
+ io_thread_.StartWithOptions( |
+ base::Thread::Options(base::MessageLoop::TYPE_IO, 0)); |
+ |
+ test::PostTaskAndWait(io_thread_task_runner(), |
+ FROM_HERE, |
+ base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread, |
+ base::Unretained(this))); |
+ } |
+ |
+ virtual void TearDown() OVERRIDE { |
+ test::PostTaskAndWait(io_thread_task_runner(), |
+ FROM_HERE, |
+ base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread, |
+ base::Unretained(this))); |
+ io_thread_.Stop(); |
+ } |
+ |
+ // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1, |
+ // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP |
+ // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s. |
+ void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp_0, |
+ scoped_refptr<MessagePipe> mp_1) { |
+ CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop()); |
+ |
+ MessageInTransit::EndpointId local_id_0 = |
+ channels_[0]->AttachMessagePipeEndpoint(mp_0, 1); |
+ MessageInTransit::EndpointId local_id_1 = |
+ channels_[1]->AttachMessagePipeEndpoint(mp_1, 0); |
+ |
+ channels_[0]->RunMessagePipeEndpoint(local_id_0, local_id_1); |
+ channels_[1]->RunMessagePipeEndpoint(local_id_1, local_id_0); |
+ } |
+ |
+ protected: |
+ base::MessageLoop* io_thread_message_loop() { |
+ return io_thread_.message_loop(); |
+ } |
+ |
+ scoped_refptr<base::TaskRunner> io_thread_task_runner() { |
+ return io_thread_message_loop()->message_loop_proxy(); |
+ } |
+ |
+ private: |
+ void SetUpOnIOThread() { |
+ CHECK_EQ(base::MessageLoop::current(), io_thread_message_loop()); |
+ |
+ // Create the socket. |
+ int fds[2]; |
+ PCHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0); |
+ |
+ // Set the ends to non-blocking. |
+ PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0); |
+ PCHECK(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0); |
+ |
+ // Create and initialize |Channel|s. |
+ channels_[0] = new Channel(); |
+ CHECK(channels_[0]->Init(PlatformChannelHandle(fds[0]))); |
+ channels_[1] = new Channel(); |
+ CHECK(channels_[1]->Init(PlatformChannelHandle(fds[1]))); |
+ } |
+ |
+ void TearDownOnIOThread() { |
+ channels_[1]->Shutdown(); |
+ channels_[1] = NULL; |
+ channels_[0]->Shutdown(); |
+ channels_[0] = NULL; |
+ } |
+ |
+ base::Thread io_thread_; |
+ scoped_refptr<Channel> channels_[2]; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest); |
+}; |
+ |
+TEST_F(RemoteMessagePipeTest, Basic) { |
+ const char hello[] = "hello"; |
+ const char world[] = "world!!!1!!!1!"; |
+ char buffer[100] = { 0 }; |
+ uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ Waiter waiter; |
+ |
+ // Connect message pipes. MP 0, port 1 will be attached to channel 0 and |
+ // connected to MP 1, port 0, which will be attached to channel 1. This leaves |
+ // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints. |
+ |
+ scoped_refptr<MessagePipe> mp_0(new MessagePipe( |
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), |
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); |
+ scoped_refptr<MessagePipe> mp_1(new MessagePipe( |
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), |
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); |
+ test::PostTaskAndWait( |
+ io_thread_task_runner(), |
+ FROM_HERE, |
+ base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread, |
+ base::Unretained(this), mp_0, mp_1)); |
+ |
+ // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1. |
+ |
+ // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do |
+ // it later, it might already be readable.) |
+ waiter.Init(); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123)); |
+ |
+ // Write to MP 0, port 0. |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_0->WriteMessage(0, |
+ hello, sizeof(hello), |
+ NULL, 0, |
+ MOJO_WRITE_MESSAGE_FLAG_NONE)); |
+ |
+ // Wait. |
+ EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE)); |
+ mp_1->RemoveWaiter(1, &waiter); |
+ |
+ // Read from MP 1, port 1. |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_1->ReadMessage(1, |
+ buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size)); |
+ EXPECT_EQ(0, strcmp(buffer, hello)); |
+ |
+ // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0. |
+ |
+ waiter.Init(); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_0->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_READABLE, 456)); |
+ |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_1->WriteMessage(1, |
+ world, sizeof(world), |
+ NULL, 0, |
+ MOJO_WRITE_MESSAGE_FLAG_NONE)); |
+ |
+ EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE)); |
+ mp_0->RemoveWaiter(0, &waiter); |
+ |
+ buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_0->ReadMessage(0, |
+ buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size)); |
+ EXPECT_EQ(0, strcmp(buffer, world)); |
+ |
+ // Close MP 0, port 0. |
+ mp_0->Close(0); |
+ |
+ // Try to wait for MP 1, port 1 to become readable. This will eventually fail |
+ // when it realizes that MP 0, port 0 has been closed. (It may also fail |
+ // immediately.) |
+ waiter.Init(); |
+ MojoResult result = mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789); |
+ if (result == MOJO_RESULT_OK) { |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
+ waiter.Wait(MOJO_DEADLINE_INDEFINITE)); |
+ mp_1->RemoveWaiter(1, &waiter); |
+ } else { |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); |
+ } |
+ |
+ // And MP 1, port 1. |
+ mp_1->Close(1); |
+} |
+ |
+TEST_F(RemoteMessagePipeTest, Multiplex) { |
+ const char hello[] = "hello"; |
+ const char world[] = "world!!!1!!!1!"; |
+ char buffer[100] = { 0 }; |
+ uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ Waiter waiter; |
+ |
+ // Connect message pipes as in the |Basic| test. |
+ |
+ scoped_refptr<MessagePipe> mp_0(new MessagePipe( |
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), |
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); |
+ scoped_refptr<MessagePipe> mp_1(new MessagePipe( |
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), |
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); |
+ test::PostTaskAndWait( |
+ io_thread_task_runner(), |
+ FROM_HERE, |
+ base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread, |
+ base::Unretained(this), mp_0, mp_1)); |
+ |
+ // Now put another message pipe on the channel. |
+ |
+ scoped_refptr<MessagePipe> mp_2(new MessagePipe( |
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()), |
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()))); |
+ scoped_refptr<MessagePipe> mp_3(new MessagePipe( |
+ scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()), |
+ scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()))); |
+ test::PostTaskAndWait( |
+ io_thread_task_runner(), |
+ FROM_HERE, |
+ base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread, |
+ base::Unretained(this), mp_2, mp_3)); |
+ |
+ // Write: MP 2, port 0 -> MP 3, port 1. |
+ |
+ waiter.Init(); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_3->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789)); |
+ |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_2->WriteMessage(0, |
+ hello, sizeof(hello), |
+ NULL, 0, |
+ MOJO_WRITE_MESSAGE_FLAG_NONE)); |
+ |
+ EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE)); |
+ mp_3->RemoveWaiter(1, &waiter); |
+ |
+ // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0. |
+ buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND, |
+ mp_0->ReadMessage(0, |
+ buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND, |
+ mp_1->ReadMessage(1, |
+ buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND, |
+ mp_2->ReadMessage(0, |
+ buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ |
+ // Read from MP 3, port 1. |
+ buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_3->ReadMessage(1, |
+ buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size)); |
+ EXPECT_EQ(0, strcmp(buffer, hello)); |
+ |
+ // Write: MP 0, port 0 -> MP 1, port 1 again. |
+ |
+ waiter.Init(); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123)); |
+ |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_0->WriteMessage(0, |
+ world, sizeof(world), |
+ NULL, 0, |
+ MOJO_WRITE_MESSAGE_FLAG_NONE)); |
+ |
+ EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE)); |
+ mp_1->RemoveWaiter(1, &waiter); |
+ |
+ // Make sure there's nothing on the other ports. |
+ buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND, |
+ mp_0->ReadMessage(0, |
+ buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND, |
+ mp_2->ReadMessage(0, |
+ buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_NOT_FOUND, |
+ mp_3->ReadMessage(1, |
+ buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ |
+ buffer_size = static_cast<uint32_t>(sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ mp_1->ReadMessage(1, |
+ buffer, &buffer_size, |
+ NULL, NULL, |
+ MOJO_READ_MESSAGE_FLAG_NONE)); |
+ EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size)); |
+ EXPECT_EQ(0, strcmp(buffer, world)); |
+} |
+ |
+} // namespace |
+} // namespace system |
+} // namespace mojo |