Index: mojo/edk/system/ports/ports_unittest.cc |
diff --git a/mojo/edk/system/ports/ports_unittest.cc b/mojo/edk/system/ports/ports_unittest.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..95e2a630af8195044fcffdc8b92cb149f1b5c6ea |
--- /dev/null |
+++ b/mojo/edk/system/ports/ports_unittest.cc |
@@ -0,0 +1,1074 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include <stdio.h> |
+#include <stdlib.h> |
+#include <string.h> |
+ |
+#include <map> |
+#include <queue> |
+#include <sstream> |
+ |
+#include "base/logging.h" |
+#include "mojo/edk/system/ports/node.h" |
+#include "mojo/edk/system/ports/node_delegate.h" |
+#include "testing/gtest/include/gtest/gtest.h" |
+ |
+namespace mojo { |
+namespace edk { |
+namespace ports { |
+namespace test { |
+ |
+namespace { |
+ |
+void LogMessage(const Message* message) { |
+ std::stringstream ports; |
+ for (size_t i = 0; i < message->num_ports(); ++i) { |
+ if (i > 0) |
+ ports << ","; |
+ ports << message->ports()[i]; |
+ } |
+ DVLOG(1) << "message: \"" |
+ << static_cast<const char*>(message->payload_bytes()) |
+ << "\" ports=[" << ports.str() << "]"; |
+} |
+ |
+void ClosePortsInMessage(Node* node, Message* message) { |
+ for (size_t i = 0; i < message->num_ports(); ++i) { |
+ PortRef port; |
+ ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port)); |
+ EXPECT_EQ(OK, node->ClosePort(port)); |
+ } |
+} |
+ |
+class TestMessage : public Message { |
+ public: |
+ static ScopedMessage NewUserMessage(size_t num_payload_bytes, |
+ size_t num_ports) { |
+ return ScopedMessage(new TestMessage(num_payload_bytes, num_ports)); |
+ } |
+ |
+ TestMessage(size_t num_payload_bytes, size_t num_ports) |
+ : Message(num_payload_bytes, num_ports) { |
+ start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes]; |
+ InitializeUserMessageHeader(start_); |
+ } |
+ |
+ TestMessage(size_t num_header_bytes, |
+ size_t num_payload_bytes, |
+ size_t num_ports_bytes) |
+ : Message(num_header_bytes, |
+ num_payload_bytes, |
+ num_ports_bytes) { |
+ start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes]; |
+ } |
+ |
+ ~TestMessage() override { |
+ delete[] start_; |
+ } |
+}; |
+ |
+struct Task { |
+ Task(NodeName node_name, ScopedMessage message) |
+ : node_name(node_name), |
+ message(std::move(message)), |
+ priority(rand()) { |
+ } |
+ |
+ NodeName node_name; |
+ ScopedMessage message; |
+ int32_t priority; |
+}; |
+ |
+struct TaskComparator { |
+ bool operator()(const Task* a, const Task* b) { |
+ return a->priority < b->priority; |
+ } |
+}; |
+ |
+std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue; |
+Node* node_map[2]; |
+ |
+Node* GetNode(const NodeName& name) { |
+ return node_map[name.v1]; |
+} |
+ |
+void SetNode(const NodeName& name, Node* node) { |
+ node_map[name.v1] = node; |
+} |
+ |
+void PumpTasks() { |
+ while (!task_queue.empty()) { |
+ Task* task = task_queue.top(); |
+ task_queue.pop(); |
+ |
+ Node* node = GetNode(task->node_name); |
+ node->AcceptMessage(std::move(task->message)); |
+ |
+ delete task; |
+ } |
+} |
+ |
+void DiscardPendingTasks() { |
+ while (!task_queue.empty()) { |
+ Task* task = task_queue.top(); |
+ task_queue.pop(); |
+ delete task; |
+ } |
+} |
+ |
+int SendStringMessage(Node* node, const PortRef& port, const std::string& s) { |
+ size_t size = s.size() + 1; |
+ ScopedMessage message = TestMessage::NewUserMessage(size, 0); |
+ memcpy(message->mutable_payload_bytes(), s.data(), size); |
+ return node->SendMessage(port, &message); |
+} |
+ |
+int SendStringMessageWithPort(Node* node, |
+ const PortRef& port, |
+ const std::string& s, |
+ const PortName& sent_port_name) { |
+ size_t size = s.size() + 1; |
+ ScopedMessage message = TestMessage::NewUserMessage(size, 1); |
+ memcpy(message->mutable_payload_bytes(), s.data(), size); |
+ message->mutable_ports()[0] = sent_port_name; |
+ return node->SendMessage(port ,&message); |
+} |
+ |
+int SendStringMessageWithPort(Node* node, |
+ const PortRef& port, |
+ const std::string& s, |
+ const PortRef& sent_port) { |
+ return SendStringMessageWithPort(node, port, s, sent_port.name()); |
+} |
+ |
+const char* ToString(const ScopedMessage& message) { |
+ return static_cast<const char*>(message->payload_bytes()); |
+} |
+ |
+class TestNodeDelegate : public NodeDelegate { |
+ public: |
+ explicit TestNodeDelegate(const NodeName& node_name) |
+ : node_name_(node_name), |
+ drop_messages_(false), |
+ read_messages_(true), |
+ save_messages_(false) { |
+ } |
+ |
+ void set_drop_messages(bool value) { drop_messages_ = value; } |
+ void set_read_messages(bool value) { read_messages_ = value; } |
+ void set_save_messages(bool value) { save_messages_ = value; } |
+ |
+ bool GetSavedMessage(ScopedMessage* message) { |
+ if (saved_messages_.empty()) { |
+ message->reset(); |
+ return false; |
+ } |
+ *message = std::move(saved_messages_.front()); |
+ saved_messages_.pop(); |
+ return true; |
+ } |
+ |
+ void GenerateRandomPortName(PortName* port_name) override { |
+ static uint64_t next_port_name = 1; |
+ port_name->v1 = next_port_name++; |
+ port_name->v2 = 0; |
+ } |
+ |
+ void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override { |
+ message->reset(new TestMessage(num_header_bytes, 0, 0)); |
+ } |
+ |
+ void ForwardMessage(const NodeName& node_name, |
+ ScopedMessage message) override { |
+ if (drop_messages_) { |
+ DVLOG(1) << "Dropping ForwardMessage from node " |
+ << node_name_ << " to " << node_name; |
+ ClosePortsInMessage(GetNode(node_name), message.get()); |
+ return; |
+ } |
+ DVLOG(1) << "ForwardMessage from node " |
+ << node_name_ << " to " << node_name; |
+ task_queue.push(new Task(node_name, std::move(message))); |
+ } |
+ |
+ void PortStatusChanged(const PortRef& port) override { |
+ DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_; |
+ if (!read_messages_) |
+ return; |
+ Node* node = GetNode(node_name_); |
+ for (;;) { |
+ ScopedMessage message; |
+ int rv = node->GetMessage(port, &message); |
+ EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED); |
+ if (rv == ERROR_PORT_PEER_CLOSED || !message) |
+ break; |
+ if (save_messages_) { |
+ SaveMessage(std::move(message)); |
+ } else { |
+ LogMessage(message.get()); |
+ for (size_t i = 0; i < message->num_ports(); ++i) { |
+ std::stringstream buf; |
+ buf << "got port: " << message->ports()[i]; |
+ |
+ PortRef received_port; |
+ node->GetPort(message->ports()[i], &received_port); |
+ |
+ SendStringMessage(node, received_port, buf.str()); |
+ |
+ // Avoid leaking these ports. |
+ node->ClosePort(received_port); |
+ } |
+ } |
+ } |
+ } |
+ |
+ private: |
+ void SaveMessage(ScopedMessage message) { |
+ saved_messages_.emplace(std::move(message)); |
+ } |
+ |
+ std::queue<ScopedMessage> saved_messages_; |
+ NodeName node_name_; |
+ bool drop_messages_; |
+ bool read_messages_; |
+ bool save_messages_; |
+}; |
+ |
+class PortsTest : public testing::Test { |
+ public: |
+ PortsTest() { |
+ SetNode(NodeName(0, 1), nullptr); |
+ SetNode(NodeName(1, 1), nullptr); |
+ } |
+ |
+ ~PortsTest() override { |
+ DiscardPendingTasks(); |
+ SetNode(NodeName(0, 1), nullptr); |
+ SetNode(NodeName(1, 1), nullptr); |
+ } |
+}; |
+ |
+} // namespace |
+ |
+TEST_F(PortsTest, Basic1) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ SetNode(node0_name, &node0); |
+ |
+ NodeName node1_name(1, 1); |
+ TestNodeDelegate node1_delegate(node1_name); |
+ Node node1(node1_name, &node1_delegate); |
+ SetNode(node1_name, &node1); |
+ |
+ // Setup pipe between node0 and node1. |
+ PortRef x0, x1; |
+ EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); |
+ EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); |
+ EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); |
+ EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); |
+ |
+ // Transfer a port from node0 to node1. |
+ PortRef a0, a1; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(a0)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(x0)); |
+ EXPECT_EQ(OK, node1.ClosePort(x1)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+ EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, Basic2) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ SetNode(node0_name, &node0); |
+ |
+ NodeName node1_name(1, 1); |
+ TestNodeDelegate node1_delegate(node1_name); |
+ Node node1(node1_name, &node1_delegate); |
+ SetNode(node1_name, &node1); |
+ |
+ // Setup pipe between node0 and node1. |
+ PortRef x0, x1; |
+ EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); |
+ EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); |
+ EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); |
+ EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); |
+ |
+ PortRef b0, b1; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1)); |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1)); |
+ EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again")); |
+ |
+ // This may cause a SendMessage(b1) failure. |
+ EXPECT_EQ(OK, node0.ClosePort(b0)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(x0)); |
+ EXPECT_EQ(OK, node1.ClosePort(x1)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+ EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, Basic3) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ SetNode(node0_name, &node0); |
+ |
+ NodeName node1_name(1, 1); |
+ TestNodeDelegate node1_delegate(node1_name); |
+ Node node1(node1_name, &node1_delegate); |
+ SetNode(node1_name, &node1); |
+ |
+ // Setup pipe between node0 and node1. |
+ PortRef x0, x1; |
+ EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); |
+ EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); |
+ EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); |
+ EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); |
+ |
+ // Transfer a port from node0 to node1. |
+ PortRef a0, a1; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1)); |
+ EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again")); |
+ |
+ // Transfer a0 as well. |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0)); |
+ |
+ PortRef b0, b1; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1)); |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1)); |
+ EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz")); |
+ |
+ // This may cause a SendMessage(b1) failure. |
+ EXPECT_EQ(OK, node0.ClosePort(b0)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(x0)); |
+ EXPECT_EQ(OK, node1.ClosePort(x1)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+ EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, LostConnectionToNode1) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ SetNode(node0_name, &node0); |
+ |
+ NodeName node1_name(1, 1); |
+ TestNodeDelegate node1_delegate(node1_name); |
+ Node node1(node1_name, &node1_delegate); |
+ SetNode(node1_name, &node1); |
+ |
+ // Setup pipe between node0 and node1. |
+ PortRef x0, x1; |
+ EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); |
+ EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); |
+ EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); |
+ EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); |
+ |
+ // Transfer port to node1 and simulate a lost connection to node1. Dropping |
+ // events from node1 is how we simulate the lost connection. |
+ |
+ node1_delegate.set_drop_messages(true); |
+ |
+ PortRef a0, a1; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(a0)); |
+ EXPECT_EQ(OK, node0.ClosePort(x0)); |
+ EXPECT_EQ(OK, node1.ClosePort(x1)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+ EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, LostConnectionToNode2) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ NodeName node1_name(1, 1); |
+ TestNodeDelegate node1_delegate(node1_name); |
+ Node node1(node1_name, &node1_delegate); |
+ node_map[1] = &node1; |
+ |
+ // Setup pipe between node0 and node1. |
+ PortRef x0, x1; |
+ EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); |
+ EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); |
+ EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); |
+ EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); |
+ |
+ node1_delegate.set_read_messages(false); |
+ |
+ PortRef a0, a1; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1)); |
+ |
+ PumpTasks(); |
+ |
+ node1_delegate.set_drop_messages(true); |
+ |
+ EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name)); |
+ |
+ PumpTasks(); |
+ |
+ ScopedMessage message; |
+ EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message)); |
+ EXPECT_FALSE(message); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(a0)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(x0)); |
+ |
+ EXPECT_EQ(OK, node1.GetMessage(x1, &message)); |
+ EXPECT_TRUE(message); |
+ ClosePortsInMessage(&node1, message.get()); |
+ |
+ EXPECT_EQ(OK, node1.ClosePort(x1)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+ EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, GetMessage1) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ PortRef a0, a1; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); |
+ |
+ ScopedMessage message; |
+ EXPECT_EQ(OK, node0.GetMessage(a0, &message)); |
+ EXPECT_FALSE(message); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(a1)); |
+ |
+ EXPECT_EQ(OK, node0.GetMessage(a0, &message)); |
+ EXPECT_FALSE(message); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message)); |
+ EXPECT_FALSE(message); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(a0)); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, GetMessage2) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ node0_delegate.set_read_messages(false); |
+ |
+ PortRef a0, a1; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); |
+ |
+ EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1")); |
+ |
+ ScopedMessage message; |
+ EXPECT_EQ(OK, node0.GetMessage(a0, &message)); |
+ |
+ ASSERT_TRUE(message); |
+ EXPECT_EQ(0, strcmp("1", ToString(message))); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(a0)); |
+ EXPECT_EQ(OK, node0.ClosePort(a1)); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, GetMessage3) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ node0_delegate.set_read_messages(false); |
+ |
+ PortRef a0, a1; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); |
+ |
+ const char* kStrings[] = { |
+ "1", |
+ "2", |
+ "3" |
+ }; |
+ |
+ for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) |
+ EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i])); |
+ |
+ ScopedMessage message; |
+ for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) { |
+ EXPECT_EQ(OK, node0.GetMessage(a0, &message)); |
+ ASSERT_TRUE(message); |
+ EXPECT_EQ(0, strcmp(kStrings[i], ToString(message))); |
+ DVLOG(1) << "got " << kStrings[i]; |
+ } |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(a0)); |
+ EXPECT_EQ(OK, node0.ClosePort(a1)); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, Delegation1) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ SetNode(node0_name, &node0); |
+ |
+ NodeName node1_name(1, 1); |
+ TestNodeDelegate node1_delegate(node1_name); |
+ Node node1(node1_name, &node1_delegate); |
+ node_map[1] = &node1; |
+ |
+ node0_delegate.set_save_messages(true); |
+ node1_delegate.set_save_messages(true); |
+ |
+ // Setup pipe between node0 and node1. |
+ PortRef x0, x1; |
+ EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); |
+ EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); |
+ EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); |
+ EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); |
+ |
+ // In this test, we send a message to a port that has been moved. |
+ |
+ PortRef a0, a1; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); |
+ |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1)); |
+ |
+ PumpTasks(); |
+ |
+ ScopedMessage message; |
+ ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); |
+ |
+ ASSERT_EQ(1u, message->num_ports()); |
+ |
+ // This is "a1" from the point of view of node1. |
+ PortName a2_name = message->ports()[0]; |
+ |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello")); |
+ |
+ PumpTasks(); |
+ |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ |
+ ASSERT_EQ(1u, message->num_ports()); |
+ |
+ // This is "a2" from the point of view of node1. |
+ PortName a3_name = message->ports()[0]; |
+ |
+ PortRef a3; |
+ EXPECT_EQ(OK, node0.GetPort(a3_name, &a3)); |
+ |
+ EXPECT_EQ(0, strcmp("a2", ToString(message))); |
+ |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ |
+ EXPECT_EQ(0u, message->num_ports()); |
+ EXPECT_EQ(0, strcmp("hello", ToString(message))); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(a0)); |
+ EXPECT_EQ(OK, node0.ClosePort(a3)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(x0)); |
+ EXPECT_EQ(OK, node1.ClosePort(x1)); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+ EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, Delegation2) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ SetNode(node0_name, &node0); |
+ |
+ NodeName node1_name(1, 1); |
+ TestNodeDelegate node1_delegate(node1_name); |
+ Node node1(node1_name, &node1_delegate); |
+ node_map[1] = &node1; |
+ |
+ node0_delegate.set_save_messages(true); |
+ node1_delegate.set_save_messages(true); |
+ |
+ for (int i = 0; i < 10; ++i) { |
+ // Setup pipe a<->b between node0 and node1. |
+ PortRef A, B; |
+ EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); |
+ EXPECT_EQ(OK, node1.CreateUninitializedPort(&B)); |
+ EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name())); |
+ EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name())); |
+ |
+ PortRef C, D; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); |
+ |
+ PortRef E, F; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&E, &F)); |
+ |
+ // Pass D over A to B. |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D)); |
+ |
+ // Pass F over C to D. |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F)); |
+ |
+ // This message should find its way to node1. |
+ EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello")); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(C)); |
+ EXPECT_EQ(OK, node0.ClosePort(E)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(A)); |
+ EXPECT_EQ(OK, node1.ClosePort(B)); |
+ |
+ for (;;) { |
+ ScopedMessage message; |
+ if (node1_delegate.GetSavedMessage(&message)) { |
+ ClosePortsInMessage(&node1, message.get()); |
+ if (strcmp("hello", ToString(message)) == 0) |
+ break; |
+ } else { |
+ ASSERT_TRUE(false); // "hello" message not delivered! |
+ break; |
+ } |
+ } |
+ |
+ PumpTasks(); // Because ClosePort may have generated tasks. |
+ } |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+ EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, SendUninitialized) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ NodeName node1_name(1, 1); |
+ TestNodeDelegate node1_delegate(node1_name); |
+ Node node1(node1_name, &node1_delegate); |
+ node_map[1] = &node1; |
+ |
+ // Begin to setup a pipe between node0 and node1, but don't initialize either |
+ // endpoint. |
+ PortRef x0, x1; |
+ EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); |
+ EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); |
+ |
+ node0_delegate.set_save_messages(true); |
+ node1_delegate.set_save_messages(true); |
+ |
+ // Send a message on each port and expect neither to arrive yet. |
+ |
+ EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, |
+ SendStringMessage(&node0, x0, "oops")); |
+ EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, |
+ SendStringMessage(&node1, x1, "oh well")); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(x0)); |
+ EXPECT_EQ(OK, node1.ClosePort(x1)); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+ EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, SendFailure) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ node0_delegate.set_save_messages(true); |
+ |
+ PortRef A, B; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); |
+ |
+ // Try to send A over itself. |
+ |
+ EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF, |
+ SendStringMessageWithPort(&node0, A, "oops", A)); |
+ |
+ // Try to send B over A. |
+ |
+ EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, |
+ SendStringMessageWithPort(&node0, A, "nope", B)); |
+ |
+ PumpTasks(); |
+ |
+ // There should have been no messages accepted. |
+ ScopedMessage message; |
+ EXPECT_FALSE(node0_delegate.GetSavedMessage(&message)); |
+ |
+ // Both A and B should still work. |
+ |
+ EXPECT_EQ(OK, SendStringMessage(&node0, A, "hi")); |
+ EXPECT_EQ(OK, SendStringMessage(&node0, B, "hey")); |
+ |
+ PumpTasks(); |
+ |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ EXPECT_EQ(0, strcmp("hi", ToString(message))); |
+ ClosePortsInMessage(&node0, message.get()); |
+ |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ EXPECT_EQ(0, strcmp("hey", ToString(message))); |
+ ClosePortsInMessage(&node0, message.get()); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(A)); |
+ EXPECT_EQ(OK, node0.ClosePort(B)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, DontLeakUnreceivedPorts) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ node0_delegate.set_read_messages(false); |
+ |
+ PortRef A, B; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); |
+ |
+ PortRef C, D; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); |
+ |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(C)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(A)); |
+ EXPECT_EQ(OK, node0.ClosePort(B)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ node0_delegate.set_save_messages(true); |
+ |
+ PortRef A, B; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); |
+ |
+ PortRef C, D; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); |
+ |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D)); |
+ |
+ ScopedMessage message; |
+ EXPECT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ ASSERT_EQ(1u, message->num_ports()); |
+ |
+ PortRef E; |
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(true)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(true)); |
+ EXPECT_FALSE(node0.CanShutdownCleanly(false)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(A)); |
+ EXPECT_EQ(OK, node0.ClosePort(B)); |
+ EXPECT_EQ(OK, node0.ClosePort(C)); |
+ EXPECT_EQ(OK, node0.ClosePort(E)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, ProxyCollapse1) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ node0_delegate.set_save_messages(true); |
+ |
+ PortRef A, B; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); |
+ |
+ PortRef X, Y; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); |
+ |
+ ScopedMessage message; |
+ |
+ // Send B and receive it as C. |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ ASSERT_EQ(1u, message->num_ports()); |
+ PortRef C; |
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); |
+ |
+ // Send C and receive it as D. |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C)); |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ ASSERT_EQ(1u, message->num_ports()); |
+ PortRef D; |
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D)); |
+ |
+ // Send D and receive it as E. |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D)); |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ ASSERT_EQ(1u, message->num_ports()); |
+ PortRef E; |
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(X)); |
+ EXPECT_EQ(OK, node0.ClosePort(Y)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(A)); |
+ EXPECT_EQ(OK, node0.ClosePort(E)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, ProxyCollapse2) { |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ node0_delegate.set_save_messages(true); |
+ |
+ PortRef A, B; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); |
+ |
+ PortRef X, Y; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); |
+ |
+ ScopedMessage message; |
+ |
+ // Send B and receive it as C. |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ ASSERT_EQ(1u, message->num_ports()); |
+ PortRef C; |
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); |
+ |
+ // Send A and receive it as D. |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ ASSERT_EQ(1u, message->num_ports()); |
+ PortRef D; |
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D)); |
+ |
+ // At this point we have a scenario with: |
+ // |
+ // D -> [B] -> C -> [A] |
+ // |
+ // Ensure that the proxies can collapse. |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(X)); |
+ EXPECT_EQ(OK, node0.ClosePort(Y)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(C)); |
+ EXPECT_EQ(OK, node0.ClosePort(D)); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, SendWithClosedPeer) { |
+ // This tests that if a port is sent when its peer is already known to be |
+ // closed, the newly created port will be aware of that peer closure, and the |
+ // proxy will eventually collapse. |
+ |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ node0_delegate.set_read_messages(false); |
+ |
+ // Send a message from A to B, then close A. |
+ PortRef A, B; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); |
+ EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); |
+ EXPECT_EQ(OK, node0.ClosePort(A)); |
+ |
+ PumpTasks(); |
+ |
+ // Now send B over X-Y as new port C. |
+ PortRef X, Y; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); |
+ |
+ node0_delegate.set_read_messages(true); |
+ node0_delegate.set_save_messages(true); |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(X)); |
+ EXPECT_EQ(OK, node0.ClosePort(Y)); |
+ |
+ ScopedMessage message; |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ ASSERT_EQ(1u, message->num_ports()); |
+ |
+ PortRef C; |
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); |
+ |
+ PumpTasks(); |
+ |
+ // C should receive the message originally sent to B, and it should also be |
+ // aware of A's closure. |
+ |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ EXPECT_EQ(0, strcmp("hey", ToString(message))); |
+ |
+ PortStatus status; |
+ EXPECT_EQ(OK, node0.GetStatus(C, &status)); |
+ EXPECT_FALSE(status.receiving_messages); |
+ EXPECT_FALSE(status.has_messages); |
+ EXPECT_TRUE(status.peer_closed); |
+ |
+ node0.ClosePort(C); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+} |
+ |
+TEST_F(PortsTest, SendWithClosedPeerSent) { |
+ // This tests that if a port is closed while some number of proxies are still |
+ // routing messages (directly or indirectly) to it, that the peer port is |
+ // eventually notified of the closure, and the dead-end proxies will |
+ // eventually be removed. |
+ |
+ NodeName node0_name(0, 1); |
+ TestNodeDelegate node0_delegate(node0_name); |
+ Node node0(node0_name, &node0_delegate); |
+ node_map[0] = &node0; |
+ |
+ node0_delegate.set_save_messages(true); |
+ |
+ PortRef X, Y; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); |
+ |
+ PortRef A, B; |
+ EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); |
+ |
+ ScopedMessage message; |
+ |
+ // Send A as new port C. |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ ASSERT_EQ(1u, message->num_ports()); |
+ PortRef C; |
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); |
+ |
+ // Send C as new port D. |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C)); |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ ASSERT_EQ(1u, message->num_ports()); |
+ PortRef D; |
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D)); |
+ |
+ node0_delegate.set_read_messages(false); |
+ |
+ // Send a message to B through D, then close D. |
+ EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey")); |
+ EXPECT_EQ(OK, node0.ClosePort(D)); |
+ |
+ PumpTasks(); |
+ |
+ // Now send B as new port E. |
+ |
+ node0_delegate.set_read_messages(true); |
+ EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); |
+ |
+ EXPECT_EQ(OK, node0.ClosePort(X)); |
+ EXPECT_EQ(OK, node0.ClosePort(Y)); |
+ |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ ASSERT_EQ(1u, message->num_ports()); |
+ |
+ PortRef E; |
+ ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); |
+ |
+ PumpTasks(); |
+ |
+ // E should receive the message originally sent to B, and it should also be |
+ // aware of D's closure. |
+ |
+ ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); |
+ EXPECT_EQ(0, strcmp("hey", ToString(message))); |
+ |
+ PortStatus status; |
+ EXPECT_EQ(OK, node0.GetStatus(E, &status)); |
+ EXPECT_FALSE(status.receiving_messages); |
+ EXPECT_FALSE(status.has_messages); |
+ EXPECT_TRUE(status.peer_closed); |
+ |
+ node0.ClosePort(E); |
+ |
+ PumpTasks(); |
+ |
+ EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
+} |
+ |
+} // namespace test |
+} // namespace ports |
+} // namespace edk |
+} // namespace mojo |