| Index: mojo/edk/system/ports/ports_unittest.cc
|
| diff --git a/mojo/edk/system/ports/ports_unittest.cc b/mojo/edk/system/ports/ports_unittest.cc
|
| index 200e72bb1b67f5ba6fc6b8889fb50c712a6ae205..b76fa10de5fbb19a5c87278f3cdedfca541c429b 100644
|
| --- a/mojo/edk/system/ports/ports_unittest.cc
|
| +++ b/mojo/edk/system/ports/ports_unittest.cc
|
| @@ -2,6 +2,7 @@
|
| // Use of this source code is governed by a BSD-style license that can be
|
| // found in the LICENSE file.
|
|
|
| +#include <inttypes.h>
|
| #include <stdio.h>
|
| #include <stdlib.h>
|
| #include <string.h>
|
| @@ -9,9 +10,18 @@
|
| #include <map>
|
| #include <queue>
|
| #include <sstream>
|
| +#include <utility>
|
|
|
| +#include "base/bind.h"
|
| +#include "base/callback.h"
|
| #include "base/logging.h"
|
| +#include "base/memory/ref_counted.h"
|
| #include "base/rand_util.h"
|
| +#include "base/strings/string_piece.h"
|
| +#include "base/strings/stringprintf.h"
|
| +#include "base/synchronization/lock.h"
|
| +#include "base/synchronization/waitable_event.h"
|
| +#include "base/threading/thread.h"
|
| #include "mojo/edk/system/ports/event.h"
|
| #include "mojo/edk/system/ports/node.h"
|
| #include "mojo/edk/system/ports/node_delegate.h"
|
| @@ -24,24 +34,8 @@ namespace test {
|
|
|
| namespace {
|
|
|
| -void LogMessage(const Message* message) {
|
| - std::stringstream ports;
|
| - for (size_t i = 0; i < message->num_ports(); ++i) {
|
| - if (i > 0)
|
| - ports << ",";
|
| - ports << message->ports()[i];
|
| - }
|
| - DVLOG(1) << "message: \""
|
| - << static_cast<const char*>(message->payload_bytes())
|
| - << "\" ports=[" << ports.str() << "]";
|
| -}
|
| -
|
| -void ClosePortsInMessage(Node* node, Message* message) {
|
| - for (size_t i = 0; i < message->num_ports(); ++i) {
|
| - PortRef port;
|
| - ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port));
|
| - EXPECT_EQ(OK, node->ClosePort(port));
|
| - }
|
| +bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) {
|
| + return !strcmp(static_cast<const char*>(message->payload_bytes()), s.data());
|
| }
|
|
|
| class TestMessage : public Message {
|
| @@ -71,132 +65,140 @@ class TestMessage : public Message {
|
| }
|
| };
|
|
|
| -struct Task {
|
| - Task(NodeName node_name, ScopedMessage message)
|
| - : node_name(node_name),
|
| - message(std::move(message)),
|
| - priority(base::RandUint64()) {
|
| - }
|
| +class TestNode;
|
|
|
| - NodeName node_name;
|
| - ScopedMessage message;
|
| - uint64_t priority;
|
| +class MessageRouter {
|
| + public:
|
| + virtual ~MessageRouter() {}
|
| +
|
| + virtual void GeneratePortName(PortName* name) = 0;
|
| + virtual void ForwardMessage(TestNode* from_node,
|
| + const NodeName& node_name,
|
| + ScopedMessage message) = 0;
|
| + virtual void BroadcastMessage(TestNode* from_node, ScopedMessage message) = 0;
|
| };
|
|
|
| -struct TaskComparator {
|
| - bool operator()(const Task* a, const Task* b) {
|
| - return a->priority < b->priority;
|
| +class TestNode : public NodeDelegate {
|
| + public:
|
| + explicit TestNode(uint64_t id)
|
| + : node_name_(id, 1),
|
| + node_(node_name_, this),
|
| + node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
|
| + messages_available_event_(
|
| + base::WaitableEvent::ResetPolicy::AUTOMATIC,
|
| + base::WaitableEvent::InitialState::NOT_SIGNALED),
|
| + idle_event_(
|
| + base::WaitableEvent::ResetPolicy::MANUAL,
|
| + base::WaitableEvent::InitialState::SIGNALED) {
|
| }
|
| -};
|
|
|
| -const size_t kMaxNodes = 3;
|
| + ~TestNode() override {
|
| + StopWhenIdle();
|
| + node_thread_.Stop();
|
| + }
|
|
|
| -std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue;
|
| -Node* node_map[kMaxNodes];
|
| + const NodeName& name() const { return node_name_; }
|
|
|
| -Node* GetNode(const NodeName& name) {
|
| - return node_map[name.v1];
|
| -}
|
| + // NOTE: Node is thread-safe.
|
| + Node& node() { return node_; }
|
|
|
| -void SetNode(const NodeName& name, Node* node) {
|
| - node_map[name.v1] = node;
|
| -}
|
| -
|
| -void PumpTasks() {
|
| - while (!task_queue.empty()) {
|
| - Task* task = task_queue.top();
|
| - task_queue.pop();
|
| + base::WaitableEvent& idle_event() { return idle_event_; }
|
|
|
| - Node* node = GetNode(task->node_name);
|
| - if (node)
|
| - node->AcceptMessage(std::move(task->message));
|
| + bool IsIdle() {
|
| + base::AutoLock lock(lock_);
|
| + return started_ && !dispatching_ &&
|
| + (incoming_messages_.empty() || (block_on_event_ && blocked_));
|
| + }
|
|
|
| - delete task;
|
| + void BlockOnEvent(EventType type) {
|
| + base::AutoLock lock(lock_);
|
| + blocked_event_type_ = type;
|
| + block_on_event_ = true;
|
| }
|
| -}
|
|
|
| -void PumpUntilTask(EventType type) {
|
| - while (!task_queue.empty()) {
|
| - Task* task = task_queue.top();
|
| + void Unblock() {
|
| + base::AutoLock lock(lock_);
|
| + block_on_event_ = false;
|
| + messages_available_event_.Signal();
|
| + }
|
|
|
| - const EventHeader* header = GetEventHeader(*task->message);
|
| - if (header->type == type)
|
| - return;
|
| + void Start(MessageRouter* router) {
|
| + router_ = router;
|
| + node_thread_.Start();
|
| + node_thread_.task_runner()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&TestNode::ProcessMessages, base::Unretained(this)));
|
| + }
|
|
|
| - task_queue.pop();
|
| + void StopWhenIdle() {
|
| + base::AutoLock lock(lock_);
|
| + should_quit_ = true;
|
| + messages_available_event_.Signal();
|
| + }
|
|
|
| - Node* node = GetNode(task->node_name);
|
| - if (node)
|
| - node->AcceptMessage(std::move(task->message));
|
| + void WakeUp() { messages_available_event_.Signal(); }
|
|
|
| - delete task;
|
| + int SendStringMessage(const PortRef& port, const std::string& s) {
|
| + size_t size = s.size() + 1;
|
| + ScopedMessage message = TestMessage::NewUserMessage(size, 0);
|
| + memcpy(message->mutable_payload_bytes(), s.data(), size);
|
| + return node_.SendMessage(port, std::move(message));
|
| }
|
| -}
|
|
|
| -void DiscardPendingTasks() {
|
| - while (!task_queue.empty()) {
|
| - Task* task = task_queue.top();
|
| - task_queue.pop();
|
| - delete task;
|
| + int SendStringMessageWithPort(const PortRef& port,
|
| + const std::string& s,
|
| + const PortName& sent_port_name) {
|
| + size_t size = s.size() + 1;
|
| + ScopedMessage message = TestMessage::NewUserMessage(size, 1);
|
| + memcpy(message->mutable_payload_bytes(), s.data(), size);
|
| + message->mutable_ports()[0] = sent_port_name;
|
| + return node_.SendMessage(port, std::move(message));
|
| }
|
| -}
|
| -
|
| -int SendStringMessage(Node* node, const PortRef& port, const std::string& s) {
|
| - size_t size = s.size() + 1;
|
| - ScopedMessage message = TestMessage::NewUserMessage(size, 0);
|
| - memcpy(message->mutable_payload_bytes(), s.data(), size);
|
| - return node->SendMessage(port, std::move(message));
|
| -}
|
| -
|
| -int SendStringMessageWithPort(Node* node,
|
| - const PortRef& port,
|
| - const std::string& s,
|
| - const PortName& sent_port_name) {
|
| - size_t size = s.size() + 1;
|
| - ScopedMessage message = TestMessage::NewUserMessage(size, 1);
|
| - memcpy(message->mutable_payload_bytes(), s.data(), size);
|
| - message->mutable_ports()[0] = sent_port_name;
|
| - return node->SendMessage(port, std::move(message));
|
| -}
|
|
|
| -int SendStringMessageWithPort(Node* node,
|
| - const PortRef& port,
|
| - const std::string& s,
|
| - const PortRef& sent_port) {
|
| - return SendStringMessageWithPort(node, port, s, sent_port.name());
|
| -}
|
| + int SendStringMessageWithPort(const PortRef& port,
|
| + const std::string& s,
|
| + const PortRef& sent_port) {
|
| + return SendStringMessageWithPort(port, s, sent_port.name());
|
| + }
|
|
|
| -const char* ToString(const ScopedMessage& message) {
|
| - return static_cast<const char*>(message->payload_bytes());
|
| -}
|
| + void set_drop_messages(bool value) {
|
| + base::AutoLock lock(lock_);
|
| + drop_messages_ = value;
|
| + }
|
|
|
| -class TestNodeDelegate : public NodeDelegate {
|
| - public:
|
| - explicit TestNodeDelegate(const NodeName& node_name)
|
| - : node_name_(node_name),
|
| - drop_messages_(false),
|
| - read_messages_(true),
|
| - save_messages_(false) {
|
| + void set_save_messages(bool value) {
|
| + base::AutoLock lock(lock_);
|
| + save_messages_ = value;
|
| }
|
|
|
| - void set_drop_messages(bool value) { drop_messages_ = value; }
|
| - void set_read_messages(bool value) { read_messages_ = value; }
|
| - void set_save_messages(bool value) { save_messages_ = value; }
|
| + bool ReadMessage(const PortRef& port, ScopedMessage* message) {
|
| + return node_.GetMessage(port, message) == OK && *message;
|
| + }
|
|
|
| bool GetSavedMessage(ScopedMessage* message) {
|
| + base::AutoLock lock(lock_);
|
| if (saved_messages_.empty()) {
|
| message->reset();
|
| return false;
|
| }
|
| - *message = std::move(saved_messages_.front());
|
| + std::swap(*message, saved_messages_.front());
|
| saved_messages_.pop();
|
| return true;
|
| }
|
|
|
| + void EnqueueMessage(ScopedMessage message) {
|
| + idle_event_.Reset();
|
| +
|
| + // NOTE: This may be called from ForwardMessage and thus must not reenter
|
| + // |node_|.
|
| + base::AutoLock lock(lock_);
|
| + incoming_messages_.emplace(std::move(message));
|
| + messages_available_event_.Signal();
|
| + }
|
| +
|
| void GenerateRandomPortName(PortName* port_name) override {
|
| - static uint64_t next_port_name = 1;
|
| - port_name->v1 = next_port_name++;
|
| - port_name->v2 = 0;
|
| + DCHECK(router_);
|
| + router_->GeneratePortName(port_name);
|
| }
|
|
|
| void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override {
|
| @@ -205,489 +207,568 @@ class TestNodeDelegate : public NodeDelegate {
|
|
|
| void ForwardMessage(const NodeName& node_name,
|
| ScopedMessage message) override {
|
| - if (drop_messages_) {
|
| - DVLOG(1) << "Dropping ForwardMessage from node "
|
| - << node_name_ << " to " << node_name;
|
| - ClosePortsInMessage(GetNode(node_name), message.get());
|
| - return;
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + if (drop_messages_) {
|
| + DVLOG(1) << "Dropping ForwardMessage from node "
|
| + << node_name_ << " to " << node_name;
|
| +
|
| + base::AutoUnlock unlock(lock_);
|
| + ClosePortsInMessage(message.get());
|
| + return;
|
| + }
|
| }
|
| +
|
| + DCHECK(router_);
|
| DVLOG(1) << "ForwardMessage from node "
|
| << node_name_ << " to " << node_name;
|
| - task_queue.push(new Task(node_name, std::move(message)));
|
| + router_->ForwardMessage(this, node_name, std::move(message));
|
| }
|
|
|
| void BroadcastMessage(ScopedMessage message) override {
|
| - for (size_t i = 0; i < kMaxNodes; ++i) {
|
| - Node* node = node_map[i];
|
| - // Broadcast doesn't deliver to the local node.
|
| - if (node && node != GetNode(node_name_)) {
|
| - // NOTE: We only need to support broadcast of events, which have no
|
| - // payload or ports bytes.
|
| - ScopedMessage new_message(
|
| - new TestMessage(message->num_header_bytes(), 0, 0));
|
| - memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
|
| - message->num_header_bytes());
|
| - node->AcceptMessage(std::move(new_message));
|
| - }
|
| - }
|
| + router_->BroadcastMessage(this, std::move(message));
|
| }
|
|
|
| void PortStatusChanged(const PortRef& port) override {
|
| - DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_;
|
| - if (!read_messages_)
|
| + // The port may be closed, in which case we ignore the notification.
|
| + base::AutoLock lock(lock_);
|
| + if (!save_messages_)
|
| return;
|
| - Node* node = GetNode(node_name_);
|
| +
|
| for (;;) {
|
| ScopedMessage message;
|
| - int rv = node->GetMessage(port, &message);
|
| - EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED);
|
| - if (rv == ERROR_PORT_PEER_CLOSED || !message)
|
| - break;
|
| - if (save_messages_) {
|
| - SaveMessage(std::move(message));
|
| - } else {
|
| - LogMessage(message.get());
|
| - for (size_t i = 0; i < message->num_ports(); ++i) {
|
| - std::stringstream buf;
|
| - buf << "got port: " << message->ports()[i];
|
| -
|
| - PortRef received_port;
|
| - node->GetPort(message->ports()[i], &received_port);
|
| + {
|
| + base::AutoUnlock unlock(lock_);
|
| + if (!ReadMessage(port, &message))
|
| + break;
|
| + }
|
|
|
| - SendStringMessage(node, received_port, buf.str());
|
| + saved_messages_.emplace(std::move(message));
|
| + }
|
| + }
|
|
|
| - // Avoid leaking these ports.
|
| - node->ClosePort(received_port);
|
| - }
|
| - }
|
| + void ClosePortsInMessage(Message* message) {
|
| + for (size_t i = 0; i < message->num_ports(); ++i) {
|
| + PortRef port;
|
| + ASSERT_EQ(OK, node_.GetPort(message->ports()[i], &port));
|
| + EXPECT_EQ(OK, node_.ClosePort(port));
|
| }
|
| }
|
|
|
| private:
|
| - void SaveMessage(ScopedMessage message) {
|
| - saved_messages_.emplace(std::move(message));
|
| + void ProcessMessages() {
|
| + for (;;) {
|
| + messages_available_event_.Wait();
|
| +
|
| + base::AutoLock lock(lock_);
|
| +
|
| + if (should_quit_)
|
| + return;
|
| +
|
| + dispatching_ = true;
|
| + while (!incoming_messages_.empty()) {
|
| + if (block_on_event_ &&
|
| + GetEventHeader(*incoming_messages_.front())->type ==
|
| + blocked_event_type_) {
|
| + blocked_ = true;
|
| + // Go idle if we hit a blocked event type.
|
| + break;
|
| + } else {
|
| + blocked_ = false;
|
| + }
|
| + ScopedMessage message = std::move(incoming_messages_.front());
|
| + incoming_messages_.pop();
|
| +
|
| + // NOTE: AcceptMessage() can re-enter this object to call any of the
|
| + // NodeDelegate interface methods.
|
| + base::AutoUnlock unlock(lock_);
|
| + node_.AcceptMessage(std::move(message));
|
| + }
|
| +
|
| + dispatching_ = false;
|
| + started_ = true;
|
| + idle_event_.Signal();
|
| + };
|
| }
|
|
|
| + const NodeName node_name_;
|
| + Node node_;
|
| + MessageRouter* router_ = nullptr;
|
| +
|
| + base::Thread node_thread_;
|
| + base::WaitableEvent messages_available_event_;
|
| + base::WaitableEvent idle_event_;
|
| +
|
| + // Guards fields below.
|
| + base::Lock lock_;
|
| + bool started_ = false;
|
| + bool dispatching_ = false;
|
| + bool should_quit_ = false;
|
| + bool drop_messages_ = false;
|
| + bool save_messages_ = false;
|
| + bool blocked_ = false;
|
| + bool block_on_event_ = false;
|
| + EventType blocked_event_type_;
|
| + std::queue<ScopedMessage> incoming_messages_;
|
| std::queue<ScopedMessage> saved_messages_;
|
| - NodeName node_name_;
|
| - bool drop_messages_;
|
| - bool read_messages_;
|
| - bool save_messages_;
|
| };
|
|
|
| -class PortsTest : public testing::Test {
|
| +class PortsTest : public testing::Test, public MessageRouter {
|
| public:
|
| - void SetUp() override {
|
| - DiscardPendingTasks();
|
| - SetNode(NodeName(0, 1), nullptr);
|
| - SetNode(NodeName(1, 1), nullptr);
|
| - SetNode(NodeName(2, 1), nullptr);
|
| + void AddNode(TestNode* node) {
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + nodes_[node->name()] = node;
|
| + }
|
| + node->Start(this);
|
| + }
|
| +
|
| + void RemoveNode(TestNode* node) {
|
| + {
|
| + base::AutoLock lock(lock_);
|
| + nodes_.erase(node->name());
|
| + }
|
| +
|
| + for (const auto& entry : nodes_)
|
| + entry.second->node().LostConnectionToNode(node->name());
|
| + }
|
| +
|
| + // Waits until all known Nodes are idle. Message forwarding and processing
|
| + // is handled in such a way that idleness is a stable state: once all nodes in
|
| + // the system are idle, they will remain idle until the test explicitly
|
| + // initiates some further event (e.g. sending a message, closing a port, or
|
| + // removing a Node).
|
| + void WaitForIdle() {
|
| + for (;;) {
|
| + base::AutoLock global_lock(global_lock_);
|
| + bool all_nodes_idle = true;
|
| + for (const auto& entry : nodes_) {
|
| + if (!entry.second->IsIdle())
|
| + all_nodes_idle = false;
|
| + entry.second->WakeUp();
|
| + }
|
| + if (all_nodes_idle)
|
| + return;
|
| +
|
| + // Wait for any Node to signal that it's idle.
|
| + base::AutoUnlock global_unlock(global_lock_);
|
| + std::vector<base::WaitableEvent*> events;
|
| + for (const auto& entry : nodes_)
|
| + events.push_back(&entry.second->idle_event());
|
| + base::WaitableEvent::WaitMany(events.data(), events.size());
|
| + }
|
| }
|
| +
|
| + void CreatePortPair(TestNode* node0,
|
| + PortRef* port0,
|
| + TestNode* node1,
|
| + PortRef* port1) {
|
| + if (node0 == node1) {
|
| + EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
|
| + } else {
|
| + EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
|
| + EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
|
| + EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
|
| + port1->name()));
|
| + EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
|
| + port0->name()));
|
| + }
|
| + }
|
| +
|
| + private:
|
| + // MessageRouter:
|
| + void GeneratePortName(PortName* name) override {
|
| + base::AutoLock lock(lock_);
|
| + name->v1 = next_port_id_++;
|
| + name->v2 = 0;
|
| + }
|
| +
|
| + void ForwardMessage(TestNode* from_node,
|
| + const NodeName& node_name,
|
| + ScopedMessage message) override {
|
| + base::AutoLock global_lock(global_lock_);
|
| + base::AutoLock lock(lock_);
|
| + // Drop messages from nodes that have been removed.
|
| + if (nodes_.find(from_node->name()) == nodes_.end()) {
|
| + from_node->ClosePortsInMessage(message.get());
|
| + return;
|
| + }
|
| +
|
| + auto it = nodes_.find(node_name);
|
| + if (it == nodes_.end()) {
|
| + DVLOG(1) << "Node not found: " << node_name;
|
| + return;
|
| + }
|
| +
|
| + it->second->EnqueueMessage(std::move(message));
|
| + }
|
| +
|
| + void BroadcastMessage(TestNode* from_node, ScopedMessage message) override {
|
| + base::AutoLock global_lock(global_lock_);
|
| + base::AutoLock lock(lock_);
|
| +
|
| + // Drop messages from nodes that have been removed.
|
| + if (nodes_.find(from_node->name()) == nodes_.end())
|
| + return;
|
| +
|
| + for (const auto& entry : nodes_) {
|
| + TestNode* node = entry.second;
|
| + // Broadcast doesn't deliver to the local node.
|
| + if (node == from_node)
|
| + continue;
|
| +
|
| + // NOTE: We only need to support broadcast of events. Events have no
|
| + // payload or ports bytes.
|
| + ScopedMessage new_message(
|
| + new TestMessage(message->num_header_bytes(), 0, 0));
|
| + memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
|
| + message->num_header_bytes());
|
| + node->EnqueueMessage(std::move(new_message));
|
| + }
|
| + }
|
| +
|
| + base::MessageLoop message_loop_;
|
| +
|
| + // Acquired before any operation which makes a Node busy, and before testing
|
| + // if all nodes are idle.
|
| + base::Lock global_lock_;
|
| +
|
| + base::Lock lock_;
|
| + uint64_t next_port_id_ = 1;
|
| + std::map<NodeName, TestNode*> nodes_;
|
| };
|
|
|
| } // namespace
|
|
|
| TEST_F(PortsTest, Basic1) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - SetNode(node0_name, &node0);
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - SetNode(node1_name, &node1);
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| - // Setup pipe between node0 and node1.
|
| PortRef x0, x1;
|
| - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
|
| - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
|
| - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
|
| - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
|
| + CreatePortPair(&node0, &x0, &node1, &x1);
|
|
|
| - // Transfer a port from node0 to node1.
|
| PortRef a0, a1;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
|
| -
|
| - EXPECT_EQ(OK, node0.ClosePort(a0));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(a0));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(x0));
|
| - EXPECT_EQ(OK, node1.ClosePort(x1));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(x0));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(x1));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, Basic2) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - SetNode(node0_name, &node0);
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - SetNode(node1_name, &node1);
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| - // Setup pipe between node0 and node1.
|
| PortRef x0, x1;
|
| - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
|
| - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
|
| - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
|
| - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
|
| + CreatePortPair(&node0, &x0, &node1, &x1);
|
|
|
| PortRef b0, b1;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1));
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again"));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
|
| + EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
|
|
|
| - // This may cause a SendMessage(b1) failure.
|
| - EXPECT_EQ(OK, node0.ClosePort(b0));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(b0));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(x0));
|
| - EXPECT_EQ(OK, node1.ClosePort(x1));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(x0));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(x1));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, Basic3) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - SetNode(node0_name, &node0);
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - SetNode(node1_name, &node1);
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| - // Setup pipe between node0 and node1.
|
| PortRef x0, x1;
|
| - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
|
| - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
|
| - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
|
| - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
|
| + CreatePortPair(&node0, &x0, &node1, &x1);
|
|
|
| - // Transfer a port from node0 to node1.
|
| PortRef a0, a1;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again"));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
|
|
|
| - // Transfer a0 as well.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
|
| + EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
|
| +
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
|
|
|
| PortRef b0, b1;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1));
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz"));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
|
| + EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
|
|
|
| - // This may cause a SendMessage(b1) failure.
|
| - EXPECT_EQ(OK, node0.ClosePort(b0));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(b0));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(x0));
|
| - EXPECT_EQ(OK, node1.ClosePort(x1));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(x0));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(x1));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, LostConnectionToNode1) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - SetNode(node0_name, &node0);
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - SetNode(node1_name, &node1);
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
| + node1.set_drop_messages(true);
|
|
|
| - // Setup pipe between node0 and node1.
|
| PortRef x0, x1;
|
| - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
|
| - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
|
| - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
|
| - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
|
| -
|
| - // Transfer port to node1 and simulate a lost connection to node1. Dropping
|
| - // events from node1 is how we simulate the lost connection.
|
| + CreatePortPair(&node0, &x0, &node1, &x1);
|
|
|
| - node1_delegate.set_drop_messages(true);
|
| + // Transfer a port to node1 and simulate a lost connection to node1.
|
|
|
| PortRef a0, a1;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
|
| + RemoveNode(&node1);
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(a0));
|
| - EXPECT_EQ(OK, node0.ClosePort(x0));
|
| - EXPECT_EQ(OK, node1.ClosePort(x1));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(a0));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(x0));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(x1));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, LostConnectionToNode2) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| - // Setup pipe between node0 and node1.
|
| PortRef x0, x1;
|
| - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
|
| - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
|
| - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
|
| - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
|
| -
|
| - node1_delegate.set_read_messages(false);
|
| + CreatePortPair(&node0, &x0, &node1, &x1);
|
|
|
| PortRef a0, a1;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - node1_delegate.set_drop_messages(true);
|
| + node1.set_drop_messages(true);
|
|
|
| - EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
|
| + RemoveNode(&node1);
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| + // a0 should have eventually detected peer closure after node loss.
|
| ScopedMessage message;
|
| - EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
|
| + EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.node().GetMessage(a0, &message));
|
| EXPECT_FALSE(message);
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(a0));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(a0));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(x0));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(x0));
|
|
|
| - EXPECT_EQ(OK, node1.GetMessage(x1, &message));
|
| + EXPECT_EQ(OK, node1.node().GetMessage(x1, &message));
|
| EXPECT_TRUE(message);
|
| - ClosePortsInMessage(&node1, message.get());
|
| + node1.ClosePortsInMessage(message.get());
|
|
|
| - EXPECT_EQ(OK, node1.ClosePort(x1));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(x1));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
|
| // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
|
| // node.
|
|
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| - NodeName node2_name(2, 1);
|
| - TestNodeDelegate node2_delegate(node2_name);
|
| - Node node2(node2_name, &node2_delegate);
|
| - node_map[2] = &node2;
|
| -
|
| - node1_delegate.set_save_messages(true);
|
| + TestNode node2(2);
|
| + AddNode(&node2);
|
|
|
| // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
|
| PortRef A, B, C, D;
|
| - EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
|
| - EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
|
| - EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
|
| - EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
|
| - EXPECT_EQ(OK, node1.CreateUninitializedPort(&C));
|
| - EXPECT_EQ(OK, node2.CreateUninitializedPort(&D));
|
| - EXPECT_EQ(OK, node1.InitializePort(C, node2_name, D.name()));
|
| - EXPECT_EQ(OK, node2.InitializePort(D, node1_name, C.name()));
|
| + CreatePortPair(&node0, &A, &node1, &B);
|
| + CreatePortPair(&node1, &C, &node2, &D);
|
|
|
| // Create E-F and send F over A to node 1.
|
| PortRef E, F;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", F));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| ScopedMessage message;
|
| - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
|
| + ASSERT_TRUE(node1.ReadMessage(B, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
|
|
| - EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &F));
|
| + EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
|
|
|
| // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
|
| // will trivially become aware of the loss, and this test verifies that the
|
| // port A on node 0 will eventually also become aware of it.
|
|
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node1, C, ".", F));
|
| + // Make sure node2 stops processing events when it encounters an ObserveProxy.
|
| + node2.BlockOnEvent(EventType::kObserveProxy);
|
| +
|
| + EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
|
| + WaitForIdle();
|
|
|
| - node_map[2] = nullptr;
|
| - EXPECT_EQ(OK, node1.LostConnectionToNode(node2_name));
|
| + // Simulate node 1 and 2 disconnecting.
|
| + EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
|
|
|
| - PumpTasks();
|
| + // Let node2 continue processing events and wait for everyone to go idle.
|
| + node2.Unblock();
|
| + WaitForIdle();
|
|
|
| // Port F should be gone.
|
| - EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(F.name(), &F));
|
| + EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
|
|
|
| // Port E should have detected peer closure despite the fact that there is
|
| // no longer a continuous route from F to E over which the event could travel.
|
| PortStatus status;
|
| - EXPECT_EQ(OK, node0.GetStatus(E, &status));
|
| + EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
|
| EXPECT_TRUE(status.peer_closed);
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| - EXPECT_EQ(OK, node1.ClosePort(B));
|
| - EXPECT_EQ(OK, node1.ClosePort(C));
|
| - EXPECT_EQ(OK, node0.ClosePort(E));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(A));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(B));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(C));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(E));
|
| +
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
|
| // Tests that a proxy gets cleaned up when its direct peer lives on a lost
|
| // node and it's predecessor lives on the same node.
|
|
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| - node1_delegate.set_save_messages(true);
|
| -
|
| - // Create A-B spanning nodes 0 and 1.
|
| PortRef A, B;
|
| - EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
|
| - EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
|
| - EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
|
| - EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
|
| + CreatePortPair(&node0, &A, &node1, &B);
|
|
|
| - // Create C-D and send D over A to node 1.
|
| PortRef C, D;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", D));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
|
| +
|
| + // Send D but block node0 on an ObserveProxy event.
|
| + node0.BlockOnEvent(EventType::kObserveProxy);
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
|
|
|
| - // Pump tasks until the start of port collapse for port D, which should become
|
| - // a proxy.
|
| - PumpUntilTask(EventType::kObserveProxy);
|
| + // node0 won't collapse the proxy but node1 will receive the message before
|
| + // going idle.
|
| + WaitForIdle();
|
|
|
| ScopedMessage message;
|
| - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
|
| + ASSERT_TRUE(node1.ReadMessage(B, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| -
|
| PortRef E;
|
| - EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &E));
|
| + EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
|
|
|
| - EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
|
| - PumpTasks();
|
| + RemoveNode(&node1);
|
| +
|
| + node0.Unblock();
|
| + WaitForIdle();
|
|
|
| // Port C should have detected peer closure.
|
| PortStatus status;
|
| - EXPECT_EQ(OK, node0.GetStatus(C, &status));
|
| + EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
|
| EXPECT_TRUE(status.peer_closed);
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| - EXPECT_EQ(OK, node1.ClosePort(B));
|
| - EXPECT_EQ(OK, node0.ClosePort(C));
|
| - EXPECT_EQ(OK, node1.ClosePort(E));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(A));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(B));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(C));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(E));
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, GetMessage1) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| PortRef a0, a1;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
|
|
|
| ScopedMessage message;
|
| - EXPECT_EQ(OK, node0.GetMessage(a0, &message));
|
| + EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
|
| EXPECT_FALSE(message);
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(a1));
|
| -
|
| - EXPECT_EQ(OK, node0.GetMessage(a0, &message));
|
| - EXPECT_FALSE(message);
|
| + EXPECT_EQ(OK, node.node().ClosePort(a1));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
|
| + EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node.node().GetMessage(a0, &message));
|
| EXPECT_FALSE(message);
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(a0));
|
| + EXPECT_EQ(OK, node.node().ClosePort(a0));
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + WaitForIdle();
|
| +
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, GetMessage2) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| -
|
| - node0_delegate.set_read_messages(false);
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| PortRef a0, a1;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
|
|
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1"));
|
| + EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
|
|
|
| ScopedMessage message;
|
| - EXPECT_EQ(OK, node0.GetMessage(a0, &message));
|
| + EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
|
|
|
| ASSERT_TRUE(message);
|
| - EXPECT_EQ(0, strcmp("1", ToString(message)));
|
| + EXPECT_TRUE(MessageEquals(message, "1"));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(a0));
|
| - EXPECT_EQ(OK, node0.ClosePort(a1));
|
| + EXPECT_EQ(OK, node.node().ClosePort(a0));
|
| + EXPECT_EQ(OK, node.node().ClosePort(a1));
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, GetMessage3) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| -
|
| - node0_delegate.set_read_messages(false);
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| PortRef a0, a1;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
|
|
|
| const char* kStrings[] = {
|
| "1",
|
| @@ -696,371 +777,305 @@ TEST_F(PortsTest, GetMessage3) {
|
| };
|
|
|
| for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i)
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i]));
|
| + EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));
|
|
|
| ScopedMessage message;
|
| for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
|
| - EXPECT_EQ(OK, node0.GetMessage(a0, &message));
|
| + EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
|
| ASSERT_TRUE(message);
|
| - EXPECT_EQ(0, strcmp(kStrings[i], ToString(message)));
|
| - DVLOG(1) << "got " << kStrings[i];
|
| + EXPECT_TRUE(MessageEquals(message, kStrings[i]));
|
| }
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(a0));
|
| - EXPECT_EQ(OK, node0.ClosePort(a1));
|
| + EXPECT_EQ(OK, node.node().ClosePort(a0));
|
| + EXPECT_EQ(OK, node.node().ClosePort(a1));
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, Delegation1) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - SetNode(node0_name, &node0);
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| - node0_delegate.set_save_messages(true);
|
| - node1_delegate.set_save_messages(true);
|
| -
|
| - // Setup pipe between node0 and node1.
|
| PortRef x0, x1;
|
| - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
|
| - EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
|
| - EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
|
| - EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
|
| + CreatePortPair(&node0, &x0, &node1, &x1);
|
|
|
| // In this test, we send a message to a port that has been moved.
|
|
|
| PortRef a0, a1;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
|
| -
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1));
|
| -
|
| - PumpTasks();
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
|
| + WaitForIdle();
|
|
|
| ScopedMessage message;
|
| - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
|
| -
|
| + ASSERT_TRUE(node1.ReadMessage(x1, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| + EXPECT_TRUE(MessageEquals(message, "a1"));
|
|
|
| // This is "a1" from the point of view of node1.
|
| PortName a2_name = message->ports()[0];
|
| + EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
|
| + EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
|
|
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name));
|
| -
|
| - PumpTasks();
|
| -
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello"));
|
| -
|
| - PumpTasks();
|
| -
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + WaitForIdle();
|
|
|
| + ASSERT_TRUE(node0.ReadMessage(x0, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| + EXPECT_TRUE(MessageEquals(message, "a2"));
|
|
|
| // This is "a2" from the point of view of node1.
|
| PortName a3_name = message->ports()[0];
|
|
|
| PortRef a3;
|
| - EXPECT_EQ(OK, node0.GetPort(a3_name, &a3));
|
| -
|
| - EXPECT_EQ(0, strcmp("a2", ToString(message)));
|
| -
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
|
|
|
| + ASSERT_TRUE(node0.ReadMessage(a3, &message));
|
| EXPECT_EQ(0u, message->num_ports());
|
| - EXPECT_EQ(0, strcmp("hello", ToString(message)));
|
| + EXPECT_TRUE(MessageEquals(message, "hello"));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(a0));
|
| - EXPECT_EQ(OK, node0.ClosePort(a3));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(a0));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(a3));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(x0));
|
| - EXPECT_EQ(OK, node1.ClosePort(x1));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(x0));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(x1));
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, Delegation2) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - SetNode(node0_name, &node0);
|
| -
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - node0_delegate.set_save_messages(true);
|
| - node1_delegate.set_save_messages(true);
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| - for (int i = 0; i < 10; ++i) {
|
| + for (int i = 0; i < 100; ++i) {
|
| // Setup pipe a<->b between node0 and node1.
|
| PortRef A, B;
|
| - EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
|
| - EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
|
| - EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
|
| - EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
|
| + CreatePortPair(&node0, &A, &node1, &B);
|
|
|
| PortRef C, D;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
|
|
|
| PortRef E, F;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
|
| +
|
| + node1.set_save_messages(true);
|
|
|
| // Pass D over A to B.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
|
|
|
| // Pass F over C to D.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
|
|
|
| // This message should find its way to node1.
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello"));
|
| + EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(C));
|
| - EXPECT_EQ(OK, node0.ClosePort(E));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(C));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(E));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| - EXPECT_EQ(OK, node1.ClosePort(B));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(A));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(B));
|
|
|
| - for (;;) {
|
| - ScopedMessage message;
|
| - if (node1_delegate.GetSavedMessage(&message)) {
|
| - ClosePortsInMessage(&node1, message.get());
|
| - if (strcmp("hello", ToString(message)) == 0)
|
| - break;
|
| - } else {
|
| - ASSERT_TRUE(false); // "hello" message not delivered!
|
| + bool got_hello = false;
|
| + ScopedMessage message;
|
| + while (node1.GetSavedMessage(&message)) {
|
| + node1.ClosePortsInMessage(message.get());
|
| + if (MessageEquals(message, "hello")) {
|
| + got_hello = true;
|
| break;
|
| }
|
| }
|
|
|
| - PumpTasks(); // Because ClosePort may have generated tasks.
|
| + EXPECT_TRUE(got_hello);
|
| +
|
| + WaitForIdle(); // Because closing ports may have generated tasks.
|
| }
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, SendUninitialized) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| PortRef x0;
|
| - EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
|
| - EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
|
| - SendStringMessage(&node0, x0, "oops"));
|
| - EXPECT_EQ(OK, node0.ClosePort(x0));
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
|
| + EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
|
| + EXPECT_EQ(OK, node.node().ClosePort(x0));
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, SendFailure) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| - node0_delegate.set_save_messages(true);
|
| + node.set_save_messages(true);
|
|
|
| PortRef A, B;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
|
|
|
| // Try to send A over itself.
|
|
|
| EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
|
| - SendStringMessageWithPort(&node0, A, "oops", A));
|
| + node.SendStringMessageWithPort(A, "oops", A));
|
|
|
| // Try to send B over A.
|
|
|
| EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
|
| - SendStringMessageWithPort(&node0, A, "nope", B));
|
| + node.SendStringMessageWithPort(A, "nope", B));
|
|
|
| // B should be closed immediately.
|
| - EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
|
| + EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| // There should have been no messages accepted.
|
| ScopedMessage message;
|
| - EXPECT_FALSE(node0_delegate.GetSavedMessage(&message));
|
| + EXPECT_FALSE(node.GetSavedMessage(&message));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| + EXPECT_EQ(OK, node.node().ClosePort(A));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, DontLeakUnreceivedPorts) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| -
|
| - node0_delegate.set_read_messages(false);
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| - PortRef A, B;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| -
|
| - PortRef C, D;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
|
| -
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
|
| -
|
| - PumpTasks();
|
| + PortRef A, B, C, D;
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(C));
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| - EXPECT_EQ(OK, node0.ClosePort(B));
|
| + EXPECT_EQ(OK, node.node().ClosePort(C));
|
| + EXPECT_EQ(OK, node.node().ClosePort(A));
|
| + EXPECT_EQ(OK, node.node().ClosePort(B));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| - node0_delegate.set_save_messages(true);
|
| -
|
| - PortRef A, B;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| -
|
| - PortRef C, D;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
|
| + PortRef A, B, C, D;
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
|
|
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
|
|
|
| ScopedMessage message;
|
| - EXPECT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + EXPECT_TRUE(node.ReadMessage(B, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| -
|
| + EXPECT_TRUE(MessageEquals(message, "foo"));
|
| PortRef E;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
|
| + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(true));
|
| + EXPECT_TRUE(
|
| + node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(true));
|
| - EXPECT_FALSE(node0.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(
|
| + node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
|
| + EXPECT_FALSE(node.node().CanShutdownCleanly());
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| - EXPECT_EQ(OK, node0.ClosePort(B));
|
| - EXPECT_EQ(OK, node0.ClosePort(C));
|
| - EXPECT_EQ(OK, node0.ClosePort(E));
|
| + EXPECT_EQ(OK, node.node().ClosePort(A));
|
| + EXPECT_EQ(OK, node.node().ClosePort(B));
|
| + EXPECT_EQ(OK, node.node().ClosePort(C));
|
| + EXPECT_EQ(OK, node.node().ClosePort(E));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, ProxyCollapse1) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| -
|
| - node0_delegate.set_save_messages(true);
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| PortRef A, B;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
|
|
|
| PortRef X, Y;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
|
|
|
| ScopedMessage message;
|
|
|
| // Send B and receive it as C.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
|
| + ASSERT_TRUE(node.ReadMessage(Y, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| PortRef C;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
|
| + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
|
|
|
| // Send C and receive it as D.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
|
| + ASSERT_TRUE(node.ReadMessage(Y, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| PortRef D;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
|
| + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
|
|
|
| // Send D and receive it as E.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D));
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
|
| + ASSERT_TRUE(node.ReadMessage(Y, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| PortRef E;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
|
| + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(X));
|
| - EXPECT_EQ(OK, node0.ClosePort(Y));
|
| + EXPECT_EQ(OK, node.node().ClosePort(X));
|
| + EXPECT_EQ(OK, node.node().ClosePort(Y));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| - EXPECT_EQ(OK, node0.ClosePort(E));
|
| + EXPECT_EQ(OK, node.node().ClosePort(A));
|
| + EXPECT_EQ(OK, node.node().ClosePort(E));
|
|
|
| - PumpTasks();
|
| + // The node should not idle until all proxies are collapsed.
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, ProxyCollapse2) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| -
|
| - node0_delegate.set_save_messages(true);
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| PortRef A, B;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
|
|
|
| PortRef X, Y;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
|
|
|
| ScopedMessage message;
|
|
|
| - // Send B and receive it as C.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| - ASSERT_EQ(1u, message->num_ports());
|
| - PortRef C;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
|
| + // Send B and A to create proxies in each direction.
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
|
|
|
| - // Send A and receive it as D.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| - ASSERT_EQ(1u, message->num_ports());
|
| - PortRef D;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
|
| + EXPECT_EQ(OK, node.node().ClosePort(X));
|
| + EXPECT_EQ(OK, node.node().ClosePort(Y));
|
|
|
| // At this point we have a scenario with:
|
| //
|
| // D -> [B] -> C -> [A]
|
| //
|
| - // Ensure that the proxies can collapse.
|
| -
|
| - EXPECT_EQ(OK, node0.ClosePort(X));
|
| - EXPECT_EQ(OK, node0.ClosePort(Y));
|
| + // Ensure that the proxies can collapse. The sent ports will be closed
|
| + // eventually as a result of Y's closure.
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(C));
|
| - EXPECT_EQ(OK, node0.ClosePort(D));
|
| + WaitForIdle();
|
|
|
| - PumpTasks();
|
| -
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, SendWithClosedPeer) {
|
| @@ -1068,56 +1083,47 @@ TEST_F(PortsTest, SendWithClosedPeer) {
|
| // closed, the newly created port will be aware of that peer closure, and the
|
| // proxy will eventually collapse.
|
|
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| -
|
| - node0_delegate.set_read_messages(false);
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| // Send a message from A to B, then close A.
|
| PortRef A, B;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| -
|
| - PumpTasks();
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
|
| + EXPECT_EQ(OK, node.node().ClosePort(A));
|
|
|
| // Now send B over X-Y as new port C.
|
| PortRef X, Y;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
|
| -
|
| - node0_delegate.set_read_messages(true);
|
| - node0_delegate.set_save_messages(true);
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
|
| -
|
| - EXPECT_EQ(OK, node0.ClosePort(X));
|
| - EXPECT_EQ(OK, node0.ClosePort(Y));
|
| -
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
|
| ScopedMessage message;
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + ASSERT_TRUE(node.ReadMessage(Y, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| -
|
| PortRef C;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
|
| + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
|
| +
|
| + EXPECT_EQ(OK, node.node().ClosePort(X));
|
| + EXPECT_EQ(OK, node.node().ClosePort(Y));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - // C should receive the message originally sent to B, and it should also be
|
| - // aware of A's closure.
|
| + // C should have received the message originally sent to B, and it should also
|
| + // be aware of A's closure.
|
|
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| - EXPECT_EQ(0, strcmp("hey", ToString(message)));
|
| + ASSERT_TRUE(node.ReadMessage(C, &message));
|
| + EXPECT_TRUE(MessageEquals(message, "hey"));
|
|
|
| PortStatus status;
|
| - EXPECT_EQ(OK, node0.GetStatus(C, &status));
|
| + EXPECT_EQ(OK, node.node().GetStatus(C, &status));
|
| EXPECT_FALSE(status.receiving_messages);
|
| EXPECT_FALSE(status.has_messages);
|
| EXPECT_TRUE(status.peer_closed);
|
|
|
| - node0.ClosePort(C);
|
| + node.node().ClosePort(C);
|
| +
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, SendWithClosedPeerSent) {
|
| @@ -1126,391 +1132,342 @@ TEST_F(PortsTest, SendWithClosedPeerSent) {
|
| // eventually notified of the closure, and the dead-end proxies will
|
| // eventually be removed.
|
|
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| -
|
| - node0_delegate.set_save_messages(true);
|
| + TestNode node(0);
|
| + AddNode(&node);
|
|
|
| PortRef X, Y;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
|
|
|
| PortRef A, B;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
|
|
|
| ScopedMessage message;
|
|
|
| // Send A as new port C.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
|
| +
|
| + ASSERT_TRUE(node.ReadMessage(Y, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| PortRef C;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
|
| + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
|
|
|
| // Send C as new port D.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
|
| +
|
| + ASSERT_TRUE(node.ReadMessage(Y, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| PortRef D;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
|
| -
|
| - node0_delegate.set_read_messages(false);
|
| + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
|
|
|
| // Send a message to B through D, then close D.
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
|
| - EXPECT_EQ(OK, node0.ClosePort(D));
|
| -
|
| - PumpTasks();
|
| + EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
|
| + EXPECT_EQ(OK, node.node().ClosePort(D));
|
|
|
| // Now send B as new port E.
|
|
|
| - node0_delegate.set_read_messages(true);
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
|
| + EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
|
| + EXPECT_EQ(OK, node.node().ClosePort(X));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(X));
|
| - EXPECT_EQ(OK, node0.ClosePort(Y));
|
| -
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + ASSERT_TRUE(node.ReadMessage(Y, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| -
|
| PortRef E;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
|
| + ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
|
| +
|
| + EXPECT_EQ(OK, node.node().ClosePort(Y));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| // E should receive the message originally sent to B, and it should also be
|
| // aware of D's closure.
|
|
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| - EXPECT_EQ(0, strcmp("hey", ToString(message)));
|
| + ASSERT_TRUE(node.ReadMessage(E, &message));
|
| + EXPECT_TRUE(MessageEquals(message, "hey"));
|
|
|
| PortStatus status;
|
| - EXPECT_EQ(OK, node0.GetStatus(E, &status));
|
| + EXPECT_EQ(OK, node.node().GetStatus(E, &status));
|
| EXPECT_FALSE(status.receiving_messages);
|
| EXPECT_FALSE(status.has_messages);
|
| EXPECT_TRUE(status.peer_closed);
|
|
|
| - node0.ClosePort(E);
|
| + EXPECT_EQ(OK, node.node().ClosePort(E));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, MergePorts) {
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| // Setup two independent port pairs, A-B on node0 and C-D on node1.
|
| PortRef A, B, C, D;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
|
| -
|
| - node0_delegate.set_read_messages(false);
|
| - node1_delegate.set_save_messages(true);
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
|
|
|
| // Write a message on A.
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
|
| -
|
| - PumpTasks();
|
| + EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
|
|
|
| // Initiate a merge between B and C.
|
| - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
|
| + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - // Expect only two receiving ports to be left after pumping tasks.
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(true));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(true));
|
| + // Expect all proxies to be gone once idle.
|
| + EXPECT_TRUE(
|
| + node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
|
| + EXPECT_TRUE(
|
| + node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
|
|
|
| // Expect D to have received the message sent on A.
|
| ScopedMessage message;
|
| - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
|
| - EXPECT_EQ(0, strcmp("hey", ToString(message)));
|
| + ASSERT_TRUE(node1.ReadMessage(D, &message));
|
| + EXPECT_TRUE(MessageEquals(message, "hey"));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| - EXPECT_EQ(OK, node1.ClosePort(D));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(A));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(D));
|
|
|
| // No more ports should be open.
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, MergePortWithClosedPeer1) {
|
| // This tests that the right thing happens when initiating a merge on a port
|
| // whose peer has already been closed.
|
|
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| // Setup two independent port pairs, A-B on node0 and C-D on node1.
|
| PortRef A, B, C, D;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
|
| -
|
| - node0_delegate.set_read_messages(false);
|
| - node1_delegate.set_save_messages(true);
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
|
|
|
| // Write a message on A.
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
|
| -
|
| - PumpTasks();
|
| + EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
|
|
|
| // Close A.
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(A));
|
|
|
| // Initiate a merge between B and C.
|
| - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
|
| + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - // Expect only one receiving port to be left after pumping tasks.
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(true));
|
| + // Expect all proxies to be gone once idle. node0 should have no ports since
|
| + // A was explicitly closed.
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(
|
| + node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
|
|
|
| // Expect D to have received the message sent on A.
|
| ScopedMessage message;
|
| - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
|
| - EXPECT_EQ(0, strcmp("hey", ToString(message)));
|
| + ASSERT_TRUE(node1.ReadMessage(D, &message));
|
| + EXPECT_TRUE(MessageEquals(message, "hey"));
|
|
|
| - EXPECT_EQ(OK, node1.ClosePort(D));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(D));
|
|
|
| // No more ports should be open.
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, MergePortWithClosedPeer2) {
|
| // This tests that the right thing happens when merging into a port whose peer
|
| // has already been closed.
|
|
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| // Setup two independent port pairs, A-B on node0 and C-D on node1.
|
| PortRef A, B, C, D;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
|
| -
|
| - node0_delegate.set_save_messages(true);
|
| - node1_delegate.set_read_messages(false);
|
| -
|
| - // Write a message on D.
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
|
|
|
| - PumpTasks();
|
| -
|
| - // Close D.
|
| - EXPECT_EQ(OK, node1.ClosePort(D));
|
| + // Write a message on D and close it.
|
| + EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(D));
|
|
|
| // Initiate a merge between B and C.
|
| - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
|
| + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| - // Expect only one receiving port to be left after pumping tasks.
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(true));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + // Expect all proxies to be gone once idle. node1 should have no ports since
|
| + // D was explicitly closed.
|
| + EXPECT_TRUE(
|
| + node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
|
|
| // Expect A to have received the message sent on D.
|
| ScopedMessage message;
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| - EXPECT_EQ(0, strcmp("hey", ToString(message)));
|
| + ASSERT_TRUE(node0.ReadMessage(A, &message));
|
| + EXPECT_TRUE(MessageEquals(message, "hey"));
|
|
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(A));
|
|
|
| // No more ports should be open.
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, MergePortsWithClosedPeers) {
|
| // This tests that no residual ports are left behind if two ports are merged
|
| // when both of their peers have been closed.
|
|
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| // Setup two independent port pairs, A-B on node0 and C-D on node1.
|
| PortRef A, B, C, D;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
|
| -
|
| - node0_delegate.set_save_messages(true);
|
| - node1_delegate.set_read_messages(false);
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
|
|
|
| // Close A and D.
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| - EXPECT_EQ(OK, node1.ClosePort(D));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(A));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(D));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| // Initiate a merge between B and C.
|
| - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
|
| + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| // Expect everything to have gone away.
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, MergePortsWithMovedPeers) {
|
| - // This tests that no ports can be merged successfully even if their peers
|
| - // are moved around.
|
| + // This tests that ports can be merged successfully even if their peers are
|
| + // moved around.
|
|
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| -
|
| - node0_delegate.set_save_messages(true);
|
| - node1_delegate.set_read_messages(false);
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| // Setup two independent port pairs, A-B on node0 and C-D on node1.
|
| PortRef A, B, C, D;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
|
|
|
| // Set up another pair X-Y for moving ports on node0.
|
| PortRef X, Y;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
|
|
|
| ScopedMessage message;
|
|
|
| // Move A to new port E.
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| + EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
|
| + ASSERT_TRUE(node0.ReadMessage(Y, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| PortRef E;
|
| - ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
|
| -
|
| - EXPECT_EQ(OK, node0.ClosePort(X));
|
| - EXPECT_EQ(OK, node0.ClosePort(Y));
|
| + ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
|
|
|
| - node0_delegate.set_read_messages(false);
|
| + EXPECT_EQ(OK, node0.node().ClosePort(X));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(Y));
|
|
|
| // Write messages on E and D.
|
| - EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey"));
|
| - EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi"));
|
| + EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
|
| + EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
|
|
|
| // Initiate a merge between B and C.
|
| - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
|
| -
|
| - node0_delegate.set_read_messages(true);
|
| - node1_delegate.set_read_messages(true);
|
| - node1_delegate.set_save_messages(true);
|
| + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| // Expect to receive D's message on E and E's message on D.
|
| - ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
|
| - EXPECT_EQ(0, strcmp("hi", ToString(message)));
|
| - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
|
| - EXPECT_EQ(0, strcmp("hey", ToString(message)));
|
| + ASSERT_TRUE(node0.ReadMessage(E, &message));
|
| + EXPECT_TRUE(MessageEquals(message, "hi"));
|
| + ASSERT_TRUE(node1.ReadMessage(D, &message));
|
| + EXPECT_TRUE(MessageEquals(message, "hey"));
|
|
|
| // Close E and D.
|
| - EXPECT_EQ(OK, node0.ClosePort(E));
|
| - EXPECT_EQ(OK, node1.ClosePort(D));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(E));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(D));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| // Expect everything to have gone away.
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| TEST_F(PortsTest, MergePortsFailsGracefully) {
|
| // This tests that the system remains in a well-defined state if something
|
| // goes wrong during port merge.
|
|
|
| - NodeName node0_name(0, 1);
|
| - TestNodeDelegate node0_delegate(node0_name);
|
| - Node node0(node0_name, &node0_delegate);
|
| - node_map[0] = &node0;
|
| + TestNode node0(0);
|
| + AddNode(&node0);
|
|
|
| - NodeName node1_name(1, 1);
|
| - TestNodeDelegate node1_delegate(node1_name);
|
| - Node node1(node1_name, &node1_delegate);
|
| - node_map[1] = &node1;
|
| + TestNode node1(1);
|
| + AddNode(&node1);
|
|
|
| // Setup two independent port pairs, A-B on node0 and C-D on node1.
|
| PortRef A, B, C, D;
|
| - EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
|
| - EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
|
| -
|
| - PumpTasks();
|
| -
|
| - // Initiate a merge between B and C.
|
| - EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
|
| + EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
|
| + EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
|
|
|
| - // Move C to a new port E. This is dumb and nobody should do it, but it's
|
| - // possible. MergePorts will fail as a result because C won't be in a
|
| - // receiving state when the event arrives at node1, so B should be closed.
|
| ScopedMessage message;
|
| PortRef X, Y;
|
| - EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y));
|
| - node1_delegate.set_save_messages(true);
|
| - EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C));
|
| - ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
|
| + EXPECT_EQ(OK, node1.node().CreatePortPair(&X, &Y));
|
| +
|
| + // Block the merge from proceeding until we can do something stupid with port
|
| + // C. This avoids the test logic racing with async merge logic.
|
| + node1.BlockOnEvent(EventType::kMergePort);
|
| +
|
| + // Initiate the merge between B and C.
|
| + EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
|
| +
|
| + // Move C to a new port E. This is not a sane use of Node's public API but
|
| + // is still hypothetically possible. It allows us to force a merge failure
|
| + // because C will be in an invalid state by the term the merge is processed.
|
| + // As a result, B should be closed.
|
| + EXPECT_EQ(OK, node1.SendStringMessageWithPort(X, "foo", C));
|
| +
|
| + node1.Unblock();
|
| +
|
| + ASSERT_TRUE(node1.ReadMessage(Y, &message));
|
| ASSERT_EQ(1u, message->num_ports());
|
| PortRef E;
|
| - ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E));
|
| - EXPECT_EQ(OK, node1.ClosePort(X));
|
| - EXPECT_EQ(OK, node1.ClosePort(Y));
|
| + ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
|
|
|
| - // C goes away as a result of normal proxy removal.
|
| - PumpTasks();
|
| + EXPECT_EQ(OK, node1.node().ClosePort(X));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(Y));
|
|
|
| - EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C));
|
| + WaitForIdle();
|
|
|
| - // B should have been closed cleanly.
|
| - EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B));
|
| + // C goes away as a result of normal proxy removal. B should have been closed
|
| + // cleanly by the failed MergePorts.
|
| + EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
|
| + EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
|
|
|
| // Close A, D, and E.
|
| - EXPECT_EQ(OK, node0.ClosePort(A));
|
| - EXPECT_EQ(OK, node1.ClosePort(D));
|
| - EXPECT_EQ(OK, node1.ClosePort(E));
|
| + EXPECT_EQ(OK, node0.node().ClosePort(A));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(D));
|
| + EXPECT_EQ(OK, node1.node().ClosePort(E));
|
|
|
| - PumpTasks();
|
| + WaitForIdle();
|
|
|
| // Expect everything to have gone away.
|
| - EXPECT_TRUE(node0.CanShutdownCleanly(false));
|
| - EXPECT_TRUE(node1.CanShutdownCleanly(false));
|
| + EXPECT_TRUE(node0.node().CanShutdownCleanly());
|
| + EXPECT_TRUE(node1.node().CanShutdownCleanly());
|
| }
|
|
|
| } // namespace test
|
|
|