Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(941)

Side by Side Diff: mojo/edk/system/ports/ports_unittest.cc

Issue 2236473003: [mojo-edk] Revert ObserveProxy retransmission behavior (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@2785
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/ports/node.cc ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include <inttypes.h>
5 #include <stdio.h> 6 #include <stdio.h>
6 #include <stdlib.h> 7 #include <stdlib.h>
7 #include <string.h> 8 #include <string.h>
8 9
9 #include <map> 10 #include <map>
10 #include <queue> 11 #include <queue>
11 #include <sstream> 12 #include <sstream>
13 #include <utility>
12 14
15 #include "base/bind.h"
16 #include "base/callback.h"
13 #include "base/logging.h" 17 #include "base/logging.h"
18 #include "base/memory/ref_counted.h"
14 #include "base/rand_util.h" 19 #include "base/rand_util.h"
20 #include "base/strings/string_piece.h"
21 #include "base/strings/stringprintf.h"
22 #include "base/synchronization/lock.h"
23 #include "base/synchronization/waitable_event.h"
24 #include "base/threading/thread.h"
15 #include "mojo/edk/system/ports/event.h" 25 #include "mojo/edk/system/ports/event.h"
16 #include "mojo/edk/system/ports/node.h" 26 #include "mojo/edk/system/ports/node.h"
17 #include "mojo/edk/system/ports/node_delegate.h" 27 #include "mojo/edk/system/ports/node_delegate.h"
18 #include "testing/gtest/include/gtest/gtest.h" 28 #include "testing/gtest/include/gtest/gtest.h"
19 29
20 namespace mojo { 30 namespace mojo {
21 namespace edk { 31 namespace edk {
22 namespace ports { 32 namespace ports {
23 namespace test { 33 namespace test {
24 34
25 namespace { 35 namespace {
26 36
27 void LogMessage(const Message* message) { 37 bool MessageEquals(const ScopedMessage& message, const base::StringPiece& s) {
28 std::stringstream ports; 38 return !strcmp(static_cast<const char*>(message->payload_bytes()), s.data());
29 for (size_t i = 0; i < message->num_ports(); ++i) {
30 if (i > 0)
31 ports << ",";
32 ports << message->ports()[i];
33 }
34 DVLOG(1) << "message: \""
35 << static_cast<const char*>(message->payload_bytes())
36 << "\" ports=[" << ports.str() << "]";
37 }
38
39 void ClosePortsInMessage(Node* node, Message* message) {
40 for (size_t i = 0; i < message->num_ports(); ++i) {
41 PortRef port;
42 ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port));
43 EXPECT_EQ(OK, node->ClosePort(port));
44 }
45 } 39 }
46 40
47 class TestMessage : public Message { 41 class TestMessage : public Message {
48 public: 42 public:
49 static ScopedMessage NewUserMessage(size_t num_payload_bytes, 43 static ScopedMessage NewUserMessage(size_t num_payload_bytes,
50 size_t num_ports) { 44 size_t num_ports) {
51 return ScopedMessage(new TestMessage(num_payload_bytes, num_ports)); 45 return ScopedMessage(new TestMessage(num_payload_bytes, num_ports));
52 } 46 }
53 47
54 TestMessage(size_t num_payload_bytes, size_t num_ports) 48 TestMessage(size_t num_payload_bytes, size_t num_ports)
55 : Message(num_payload_bytes, num_ports) { 49 : Message(num_payload_bytes, num_ports) {
56 start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes]; 50 start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes];
57 InitializeUserMessageHeader(start_); 51 InitializeUserMessageHeader(start_);
58 } 52 }
59 53
60 TestMessage(size_t num_header_bytes, 54 TestMessage(size_t num_header_bytes,
61 size_t num_payload_bytes, 55 size_t num_payload_bytes,
62 size_t num_ports_bytes) 56 size_t num_ports_bytes)
63 : Message(num_header_bytes, 57 : Message(num_header_bytes,
64 num_payload_bytes, 58 num_payload_bytes,
65 num_ports_bytes) { 59 num_ports_bytes) {
66 start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes]; 60 start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes];
67 } 61 }
68 62
69 ~TestMessage() override { 63 ~TestMessage() override {
70 delete[] start_; 64 delete[] start_;
71 } 65 }
72 }; 66 };
73 67
74 struct Task { 68 class TestNode;
75 Task(NodeName node_name, ScopedMessage message) 69
76 : node_name(node_name), 70 class MessageRouter {
77 message(std::move(message)), 71 public:
78 priority(base::RandUint64()) { 72 virtual ~MessageRouter() {}
73
74 virtual void GeneratePortName(PortName* name) = 0;
75 virtual void ForwardMessage(TestNode* from_node,
76 const NodeName& node_name,
77 ScopedMessage message) = 0;
78 virtual void BroadcastMessage(TestNode* from_node, ScopedMessage message) = 0;
79 };
80
81 class TestNode : public NodeDelegate {
82 public:
83 explicit TestNode(uint64_t id)
84 : node_name_(id, 1),
85 node_(node_name_, this),
86 node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
87 messages_available_event_(
88 base::WaitableEvent::ResetPolicy::AUTOMATIC,
89 base::WaitableEvent::InitialState::NOT_SIGNALED),
90 idle_event_(
91 base::WaitableEvent::ResetPolicy::MANUAL,
92 base::WaitableEvent::InitialState::SIGNALED) {
79 } 93 }
80 94
81 NodeName node_name; 95 ~TestNode() override {
82 ScopedMessage message; 96 StopWhenIdle();
83 uint64_t priority; 97 node_thread_.Stop();
84 };
85
86 struct TaskComparator {
87 bool operator()(const Task* a, const Task* b) {
88 return a->priority < b->priority;
89 }
90 };
91
92 const size_t kMaxNodes = 3;
93
94 std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue;
95 Node* node_map[kMaxNodes];
96
97 Node* GetNode(const NodeName& name) {
98 return node_map[name.v1];
99 }
100
101 void SetNode(const NodeName& name, Node* node) {
102 node_map[name.v1] = node;
103 }
104
105 void PumpTasks() {
106 while (!task_queue.empty()) {
107 Task* task = task_queue.top();
108 task_queue.pop();
109
110 Node* node = GetNode(task->node_name);
111 if (node)
112 node->AcceptMessage(std::move(task->message));
113
114 delete task;
115 }
116 }
117
118 void PumpUntilTask(EventType type) {
119 while (!task_queue.empty()) {
120 Task* task = task_queue.top();
121
122 const EventHeader* header = GetEventHeader(*task->message);
123 if (header->type == type)
124 return;
125
126 task_queue.pop();
127
128 Node* node = GetNode(task->node_name);
129 if (node)
130 node->AcceptMessage(std::move(task->message));
131
132 delete task;
133 }
134 }
135
136 void DiscardPendingTasks() {
137 while (!task_queue.empty()) {
138 Task* task = task_queue.top();
139 task_queue.pop();
140 delete task;
141 }
142 }
143
144 int SendStringMessage(Node* node, const PortRef& port, const std::string& s) {
145 size_t size = s.size() + 1;
146 ScopedMessage message = TestMessage::NewUserMessage(size, 0);
147 memcpy(message->mutable_payload_bytes(), s.data(), size);
148 return node->SendMessage(port, std::move(message));
149 }
150
151 int SendStringMessageWithPort(Node* node,
152 const PortRef& port,
153 const std::string& s,
154 const PortName& sent_port_name) {
155 size_t size = s.size() + 1;
156 ScopedMessage message = TestMessage::NewUserMessage(size, 1);
157 memcpy(message->mutable_payload_bytes(), s.data(), size);
158 message->mutable_ports()[0] = sent_port_name;
159 return node->SendMessage(port, std::move(message));
160 }
161
162 int SendStringMessageWithPort(Node* node,
163 const PortRef& port,
164 const std::string& s,
165 const PortRef& sent_port) {
166 return SendStringMessageWithPort(node, port, s, sent_port.name());
167 }
168
169 const char* ToString(const ScopedMessage& message) {
170 return static_cast<const char*>(message->payload_bytes());
171 }
172
173 class TestNodeDelegate : public NodeDelegate {
174 public:
175 explicit TestNodeDelegate(const NodeName& node_name)
176 : node_name_(node_name),
177 drop_messages_(false),
178 read_messages_(true),
179 save_messages_(false) {
180 } 98 }
181 99
182 void set_drop_messages(bool value) { drop_messages_ = value; } 100 const NodeName& name() const { return node_name_; }
183 void set_read_messages(bool value) { read_messages_ = value; } 101
184 void set_save_messages(bool value) { save_messages_ = value; } 102 // NOTE: Node is thread-safe.
103 Node& node() { return node_; }
104
105 base::WaitableEvent& idle_event() { return idle_event_; }
106
107 bool IsIdle() {
108 base::AutoLock lock(lock_);
109 return started_ && !dispatching_ &&
110 (incoming_messages_.empty() || (block_on_event_ && blocked_));
111 }
112
113 void BlockOnEvent(EventType type) {
114 base::AutoLock lock(lock_);
115 blocked_event_type_ = type;
116 block_on_event_ = true;
117 }
118
119 void Unblock() {
120 base::AutoLock lock(lock_);
121 block_on_event_ = false;
122 messages_available_event_.Signal();
123 }
124
125 void Start(MessageRouter* router) {
126 router_ = router;
127 node_thread_.Start();
128 node_thread_.task_runner()->PostTask(
129 FROM_HERE,
130 base::Bind(&TestNode::ProcessMessages, base::Unretained(this)));
131 }
132
133 void StopWhenIdle() {
134 base::AutoLock lock(lock_);
135 should_quit_ = true;
136 messages_available_event_.Signal();
137 }
138
139 void WakeUp() { messages_available_event_.Signal(); }
140
141 int SendStringMessage(const PortRef& port, const std::string& s) {
142 size_t size = s.size() + 1;
143 ScopedMessage message = TestMessage::NewUserMessage(size, 0);
144 memcpy(message->mutable_payload_bytes(), s.data(), size);
145 return node_.SendMessage(port, std::move(message));
146 }
147
148 int SendStringMessageWithPort(const PortRef& port,
149 const std::string& s,
150 const PortName& sent_port_name) {
151 size_t size = s.size() + 1;
152 ScopedMessage message = TestMessage::NewUserMessage(size, 1);
153 memcpy(message->mutable_payload_bytes(), s.data(), size);
154 message->mutable_ports()[0] = sent_port_name;
155 return node_.SendMessage(port, std::move(message));
156 }
157
158 int SendStringMessageWithPort(const PortRef& port,
159 const std::string& s,
160 const PortRef& sent_port) {
161 return SendStringMessageWithPort(port, s, sent_port.name());
162 }
163
164 void set_drop_messages(bool value) {
165 base::AutoLock lock(lock_);
166 drop_messages_ = value;
167 }
168
169 void set_save_messages(bool value) {
170 base::AutoLock lock(lock_);
171 save_messages_ = value;
172 }
173
174 bool ReadMessage(const PortRef& port, ScopedMessage* message) {
175 return node_.GetMessage(port, message) == OK && *message;
176 }
185 177
186 bool GetSavedMessage(ScopedMessage* message) { 178 bool GetSavedMessage(ScopedMessage* message) {
179 base::AutoLock lock(lock_);
187 if (saved_messages_.empty()) { 180 if (saved_messages_.empty()) {
188 message->reset(); 181 message->reset();
189 return false; 182 return false;
190 } 183 }
191 *message = std::move(saved_messages_.front()); 184 std::swap(*message, saved_messages_.front());
192 saved_messages_.pop(); 185 saved_messages_.pop();
193 return true; 186 return true;
194 } 187 }
195 188
189 void EnqueueMessage(ScopedMessage message) {
190 idle_event_.Reset();
191
192 // NOTE: This may be called from ForwardMessage and thus must not reenter
193 // |node_|.
194 base::AutoLock lock(lock_);
195 incoming_messages_.emplace(std::move(message));
196 messages_available_event_.Signal();
197 }
198
196 void GenerateRandomPortName(PortName* port_name) override { 199 void GenerateRandomPortName(PortName* port_name) override {
197 static uint64_t next_port_name = 1; 200 DCHECK(router_);
198 port_name->v1 = next_port_name++; 201 router_->GeneratePortName(port_name);
199 port_name->v2 = 0;
200 } 202 }
201 203
202 void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override { 204 void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override {
203 message->reset(new TestMessage(num_header_bytes, 0, 0)); 205 message->reset(new TestMessage(num_header_bytes, 0, 0));
204 } 206 }
205 207
206 void ForwardMessage(const NodeName& node_name, 208 void ForwardMessage(const NodeName& node_name,
207 ScopedMessage message) override { 209 ScopedMessage message) override {
208 if (drop_messages_) { 210 {
209 DVLOG(1) << "Dropping ForwardMessage from node " 211 base::AutoLock lock(lock_);
210 << node_name_ << " to " << node_name; 212 if (drop_messages_) {
211 ClosePortsInMessage(GetNode(node_name), message.get()); 213 DVLOG(1) << "Dropping ForwardMessage from node "
212 return; 214 << node_name_ << " to " << node_name;
213 } 215
216 base::AutoUnlock unlock(lock_);
217 ClosePortsInMessage(message.get());
218 return;
219 }
220 }
221
222 DCHECK(router_);
214 DVLOG(1) << "ForwardMessage from node " 223 DVLOG(1) << "ForwardMessage from node "
215 << node_name_ << " to " << node_name; 224 << node_name_ << " to " << node_name;
216 task_queue.push(new Task(node_name, std::move(message))); 225 router_->ForwardMessage(this, node_name, std::move(message));
217 } 226 }
218 227
219 void BroadcastMessage(ScopedMessage message) override { 228 void BroadcastMessage(ScopedMessage message) override {
220 for (size_t i = 0; i < kMaxNodes; ++i) { 229 router_->BroadcastMessage(this, std::move(message));
221 Node* node = node_map[i];
222 // Broadcast doesn't deliver to the local node.
223 if (node && node != GetNode(node_name_)) {
224 // NOTE: We only need to support broadcast of events, which have no
225 // payload or ports bytes.
226 ScopedMessage new_message(
227 new TestMessage(message->num_header_bytes(), 0, 0));
228 memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
229 message->num_header_bytes());
230 node->AcceptMessage(std::move(new_message));
231 }
232 }
233 } 230 }
234 231
235 void PortStatusChanged(const PortRef& port) override { 232 void PortStatusChanged(const PortRef& port) override {
236 DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_; 233 // The port may be closed, in which case we ignore the notification.
237 if (!read_messages_) 234 base::AutoLock lock(lock_);
235 if (!save_messages_)
238 return; 236 return;
239 Node* node = GetNode(node_name_); 237
240 for (;;) { 238 for (;;) {
241 ScopedMessage message; 239 ScopedMessage message;
242 int rv = node->GetMessage(port, &message); 240 {
243 EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED); 241 base::AutoUnlock unlock(lock_);
244 if (rv == ERROR_PORT_PEER_CLOSED || !message) 242 if (!ReadMessage(port, &message))
245 break; 243 break;
246 if (save_messages_) { 244 }
247 SaveMessage(std::move(message)); 245
248 } else { 246 saved_messages_.emplace(std::move(message));
249 LogMessage(message.get()); 247 }
250 for (size_t i = 0; i < message->num_ports(); ++i) { 248 }
251 std::stringstream buf; 249
252 buf << "got port: " << message->ports()[i]; 250 void ClosePortsInMessage(Message* message) {
253 251 for (size_t i = 0; i < message->num_ports(); ++i) {
254 PortRef received_port; 252 PortRef port;
255 node->GetPort(message->ports()[i], &received_port); 253 ASSERT_EQ(OK, node_.GetPort(message->ports()[i], &port));
256 254 EXPECT_EQ(OK, node_.ClosePort(port));
257 SendStringMessage(node, received_port, buf.str()); 255 }
258 256 }
259 // Avoid leaking these ports. 257
260 node->ClosePort(received_port); 258 private:
259 void ProcessMessages() {
260 for (;;) {
261 messages_available_event_.Wait();
262
263 base::AutoLock lock(lock_);
264
265 if (should_quit_)
266 return;
267
268 dispatching_ = true;
269 while (!incoming_messages_.empty()) {
270 if (block_on_event_ &&
271 GetEventHeader(*incoming_messages_.front())->type ==
272 blocked_event_type_) {
273 blocked_ = true;
274 // Go idle if we hit a blocked event type.
275 break;
276 } else {
277 blocked_ = false;
261 } 278 }
279 ScopedMessage message = std::move(incoming_messages_.front());
280 incoming_messages_.pop();
281
282 // NOTE: AcceptMessage() can re-enter this object to call any of the
283 // NodeDelegate interface methods.
284 base::AutoUnlock unlock(lock_);
285 node_.AcceptMessage(std::move(message));
262 } 286 }
287
288 dispatching_ = false;
289 started_ = true;
290 idle_event_.Signal();
291 };
292 }
293
294 const NodeName node_name_;
295 Node node_;
296 MessageRouter* router_ = nullptr;
297
298 base::Thread node_thread_;
299 base::WaitableEvent messages_available_event_;
300 base::WaitableEvent idle_event_;
301
302 // Guards fields below.
303 base::Lock lock_;
304 bool started_ = false;
305 bool dispatching_ = false;
306 bool should_quit_ = false;
307 bool drop_messages_ = false;
308 bool save_messages_ = false;
309 bool blocked_ = false;
310 bool block_on_event_ = false;
311 EventType blocked_event_type_;
312 std::queue<ScopedMessage> incoming_messages_;
313 std::queue<ScopedMessage> saved_messages_;
314 };
315
316 class PortsTest : public testing::Test, public MessageRouter {
317 public:
318 void AddNode(TestNode* node) {
319 {
320 base::AutoLock lock(lock_);
321 nodes_[node->name()] = node;
322 }
323 node->Start(this);
324 }
325
326 void RemoveNode(TestNode* node) {
327 {
328 base::AutoLock lock(lock_);
329 nodes_.erase(node->name());
330 }
331
332 for (const auto& entry : nodes_)
333 entry.second->node().LostConnectionToNode(node->name());
334 }
335
336 // Waits until all known Nodes are idle. Message forwarding and processing
337 // is handled in such a way that idleness is a stable state: once all nodes in
338 // the system are idle, they will remain idle until the test explicitly
339 // initiates some further event (e.g. sending a message, closing a port, or
340 // removing a Node).
341 void WaitForIdle() {
342 for (;;) {
343 base::AutoLock global_lock(global_lock_);
344 bool all_nodes_idle = true;
345 for (const auto& entry : nodes_) {
346 if (!entry.second->IsIdle())
347 all_nodes_idle = false;
348 entry.second->WakeUp();
349 }
350 if (all_nodes_idle)
351 return;
352
353 // Wait for any Node to signal that it's idle.
354 base::AutoUnlock global_unlock(global_lock_);
355 std::vector<base::WaitableEvent*> events;
356 for (const auto& entry : nodes_)
357 events.push_back(&entry.second->idle_event());
358 base::WaitableEvent::WaitMany(events.data(), events.size());
359 }
360 }
361
362 void CreatePortPair(TestNode* node0,
363 PortRef* port0,
364 TestNode* node1,
365 PortRef* port1) {
366 if (node0 == node1) {
367 EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
368 } else {
369 EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
370 EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
371 EXPECT_EQ(OK, node0->node().InitializePort(*port0, node1->name(),
372 port1->name()));
373 EXPECT_EQ(OK, node1->node().InitializePort(*port1, node0->name(),
374 port0->name()));
263 } 375 }
264 } 376 }
265 377
266 private: 378 private:
267 void SaveMessage(ScopedMessage message) { 379 // MessageRouter:
268 saved_messages_.emplace(std::move(message)); 380 void GeneratePortName(PortName* name) override {
269 } 381 base::AutoLock lock(lock_);
270 382 name->v1 = next_port_id_++;
271 std::queue<ScopedMessage> saved_messages_; 383 name->v2 = 0;
272 NodeName node_name_; 384 }
273 bool drop_messages_; 385
274 bool read_messages_; 386 void ForwardMessage(TestNode* from_node,
275 bool save_messages_; 387 const NodeName& node_name,
388 ScopedMessage message) override {
389 base::AutoLock global_lock(global_lock_);
390 base::AutoLock lock(lock_);
391 // Drop messages from nodes that have been removed.
392 if (nodes_.find(from_node->name()) == nodes_.end()) {
393 from_node->ClosePortsInMessage(message.get());
394 return;
395 }
396
397 auto it = nodes_.find(node_name);
398 if (it == nodes_.end()) {
399 DVLOG(1) << "Node not found: " << node_name;
400 return;
401 }
402
403 it->second->EnqueueMessage(std::move(message));
404 }
405
406 void BroadcastMessage(TestNode* from_node, ScopedMessage message) override {
407 base::AutoLock global_lock(global_lock_);
408 base::AutoLock lock(lock_);
409
410 // Drop messages from nodes that have been removed.
411 if (nodes_.find(from_node->name()) == nodes_.end())
412 return;
413
414 for (const auto& entry : nodes_) {
415 TestNode* node = entry.second;
416 // Broadcast doesn't deliver to the local node.
417 if (node == from_node)
418 continue;
419
420 // NOTE: We only need to support broadcast of events. Events have no
421 // payload or ports bytes.
422 ScopedMessage new_message(
423 new TestMessage(message->num_header_bytes(), 0, 0));
424 memcpy(new_message->mutable_header_bytes(), message->header_bytes(),
425 message->num_header_bytes());
426 node->EnqueueMessage(std::move(new_message));
427 }
428 }
429
430 base::MessageLoop message_loop_;
431
432 // Acquired before any operation which makes a Node busy, and before testing
433 // if all nodes are idle.
434 base::Lock global_lock_;
435
436 base::Lock lock_;
437 uint64_t next_port_id_ = 1;
438 std::map<NodeName, TestNode*> nodes_;
276 }; 439 };
277 440
278 class PortsTest : public testing::Test {
279 public:
280 void SetUp() override {
281 DiscardPendingTasks();
282 SetNode(NodeName(0, 1), nullptr);
283 SetNode(NodeName(1, 1), nullptr);
284 SetNode(NodeName(2, 1), nullptr);
285 }
286 };
287
288 } // namespace 441 } // namespace
289 442
290 TEST_F(PortsTest, Basic1) { 443 TEST_F(PortsTest, Basic1) {
291 NodeName node0_name(0, 1); 444 TestNode node0(0);
292 TestNodeDelegate node0_delegate(node0_name); 445 AddNode(&node0);
293 Node node0(node0_name, &node0_delegate); 446
294 SetNode(node0_name, &node0); 447 TestNode node1(1);
295 448 AddNode(&node1);
296 NodeName node1_name(1, 1); 449
297 TestNodeDelegate node1_delegate(node1_name);
298 Node node1(node1_name, &node1_delegate);
299 SetNode(node1_name, &node1);
300
301 // Setup pipe between node0 and node1.
302 PortRef x0, x1; 450 PortRef x0, x1;
303 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); 451 CreatePortPair(&node0, &x0, &node1, &x1);
304 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); 452
305 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); 453 PortRef a0, a1;
306 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); 454 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
307 455 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
308 // Transfer a port from node0 to node1. 456 EXPECT_EQ(OK, node0.node().ClosePort(a0));
309 PortRef a0, a1; 457
310 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); 458 EXPECT_EQ(OK, node0.node().ClosePort(x0));
311 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1)); 459 EXPECT_EQ(OK, node1.node().ClosePort(x1));
312 460
313 EXPECT_EQ(OK, node0.ClosePort(a0)); 461 WaitForIdle();
314 462
315 EXPECT_EQ(OK, node0.ClosePort(x0)); 463 EXPECT_TRUE(node0.node().CanShutdownCleanly());
316 EXPECT_EQ(OK, node1.ClosePort(x1)); 464 EXPECT_TRUE(node1.node().CanShutdownCleanly());
317
318 PumpTasks();
319
320 EXPECT_TRUE(node0.CanShutdownCleanly(false));
321 EXPECT_TRUE(node1.CanShutdownCleanly(false));
322 } 465 }
323 466
324 TEST_F(PortsTest, Basic2) { 467 TEST_F(PortsTest, Basic2) {
325 NodeName node0_name(0, 1); 468 TestNode node0(0);
326 TestNodeDelegate node0_delegate(node0_name); 469 AddNode(&node0);
327 Node node0(node0_name, &node0_delegate); 470
328 SetNode(node0_name, &node0); 471 TestNode node1(1);
329 472 AddNode(&node1);
330 NodeName node1_name(1, 1); 473
331 TestNodeDelegate node1_delegate(node1_name);
332 Node node1(node1_name, &node1_delegate);
333 SetNode(node1_name, &node1);
334
335 // Setup pipe between node0 and node1.
336 PortRef x0, x1; 474 PortRef x0, x1;
337 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); 475 CreatePortPair(&node0, &x0, &node1, &x1);
338 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
339 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
340 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
341 476
342 PortRef b0, b1; 477 PortRef b0, b1;
343 EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1)); 478 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
344 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1)); 479 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
345 EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again")); 480 EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));
346 481
347 // This may cause a SendMessage(b1) failure. 482 EXPECT_EQ(OK, node0.node().ClosePort(b0));
348 EXPECT_EQ(OK, node0.ClosePort(b0)); 483
349 484 EXPECT_EQ(OK, node0.node().ClosePort(x0));
350 EXPECT_EQ(OK, node0.ClosePort(x0)); 485 EXPECT_EQ(OK, node1.node().ClosePort(x1));
351 EXPECT_EQ(OK, node1.ClosePort(x1)); 486
352 487 WaitForIdle();
353 PumpTasks(); 488
354 489 EXPECT_TRUE(node0.node().CanShutdownCleanly());
355 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 490 EXPECT_TRUE(node1.node().CanShutdownCleanly());
356 EXPECT_TRUE(node1.CanShutdownCleanly(false));
357 } 491 }
358 492
359 TEST_F(PortsTest, Basic3) { 493 TEST_F(PortsTest, Basic3) {
360 NodeName node0_name(0, 1); 494 TestNode node0(0);
361 TestNodeDelegate node0_delegate(node0_name); 495 AddNode(&node0);
362 Node node0(node0_name, &node0_delegate); 496
363 SetNode(node0_name, &node0); 497 TestNode node1(1);
364 498 AddNode(&node1);
365 NodeName node1_name(1, 1); 499
366 TestNodeDelegate node1_delegate(node1_name);
367 Node node1(node1_name, &node1_delegate);
368 SetNode(node1_name, &node1);
369
370 // Setup pipe between node0 and node1.
371 PortRef x0, x1; 500 PortRef x0, x1;
372 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); 501 CreatePortPair(&node0, &x0, &node1, &x1);
373 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); 502
374 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); 503 PortRef a0, a1;
375 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); 504 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
376 505
377 // Transfer a port from node0 to node1. 506 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
378 PortRef a0, a1; 507 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));
379 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); 508
380 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1)); 509 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));
381 EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again"));
382
383 // Transfer a0 as well.
384 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0));
385 510
386 PortRef b0, b1; 511 PortRef b0, b1;
387 EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1)); 512 EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
388 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1)); 513 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
389 EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz")); 514 EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));
390 515
391 // This may cause a SendMessage(b1) failure. 516 EXPECT_EQ(OK, node0.node().ClosePort(b0));
392 EXPECT_EQ(OK, node0.ClosePort(b0)); 517
393 518 EXPECT_EQ(OK, node0.node().ClosePort(x0));
394 EXPECT_EQ(OK, node0.ClosePort(x0)); 519 EXPECT_EQ(OK, node1.node().ClosePort(x1));
395 EXPECT_EQ(OK, node1.ClosePort(x1)); 520
396 521 WaitForIdle();
397 PumpTasks(); 522
398 523 EXPECT_TRUE(node0.node().CanShutdownCleanly());
399 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 524 EXPECT_TRUE(node1.node().CanShutdownCleanly());
400 EXPECT_TRUE(node1.CanShutdownCleanly(false));
401 } 525 }
402 526
403 TEST_F(PortsTest, LostConnectionToNode1) { 527 TEST_F(PortsTest, LostConnectionToNode1) {
404 NodeName node0_name(0, 1); 528 TestNode node0(0);
405 TestNodeDelegate node0_delegate(node0_name); 529 AddNode(&node0);
406 Node node0(node0_name, &node0_delegate); 530
407 SetNode(node0_name, &node0); 531 TestNode node1(1);
408 532 AddNode(&node1);
409 NodeName node1_name(1, 1); 533 node1.set_drop_messages(true);
410 TestNodeDelegate node1_delegate(node1_name); 534
411 Node node1(node1_name, &node1_delegate);
412 SetNode(node1_name, &node1);
413
414 // Setup pipe between node0 and node1.
415 PortRef x0, x1; 535 PortRef x0, x1;
416 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); 536 CreatePortPair(&node0, &x0, &node1, &x1);
417 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); 537
418 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); 538 // Transfer a port to node1 and simulate a lost connection to node1.
419 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); 539
420 540 PortRef a0, a1;
421 // Transfer port to node1 and simulate a lost connection to node1. Dropping 541 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
422 // events from node1 is how we simulate the lost connection. 542 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
423 543
424 node1_delegate.set_drop_messages(true); 544 WaitForIdle();
425 545
426 PortRef a0, a1; 546 RemoveNode(&node1);
427 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); 547
428 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1)); 548 WaitForIdle();
429 549
430 PumpTasks(); 550 EXPECT_EQ(OK, node0.node().ClosePort(a0));
431 551 EXPECT_EQ(OK, node0.node().ClosePort(x0));
432 EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name)); 552 EXPECT_EQ(OK, node1.node().ClosePort(x1));
433 553
434 PumpTasks(); 554 WaitForIdle();
435 555
436 EXPECT_EQ(OK, node0.ClosePort(a0)); 556 EXPECT_TRUE(node0.node().CanShutdownCleanly());
437 EXPECT_EQ(OK, node0.ClosePort(x0)); 557 EXPECT_TRUE(node1.node().CanShutdownCleanly());
438 EXPECT_EQ(OK, node1.ClosePort(x1));
439
440 PumpTasks();
441
442 EXPECT_TRUE(node0.CanShutdownCleanly(false));
443 EXPECT_TRUE(node1.CanShutdownCleanly(false));
444 } 558 }
445 559
446 TEST_F(PortsTest, LostConnectionToNode2) { 560 TEST_F(PortsTest, LostConnectionToNode2) {
447 NodeName node0_name(0, 1); 561 TestNode node0(0);
448 TestNodeDelegate node0_delegate(node0_name); 562 AddNode(&node0);
449 Node node0(node0_name, &node0_delegate); 563
450 node_map[0] = &node0; 564 TestNode node1(1);
451 565 AddNode(&node1);
452 NodeName node1_name(1, 1); 566
453 TestNodeDelegate node1_delegate(node1_name);
454 Node node1(node1_name, &node1_delegate);
455 node_map[1] = &node1;
456
457 // Setup pipe between node0 and node1.
458 PortRef x0, x1; 567 PortRef x0, x1;
459 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); 568 CreatePortPair(&node0, &x0, &node1, &x1);
460 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1)); 569
461 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name())); 570 PortRef a0, a1;
462 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name())); 571 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
463 572 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));
464 node1_delegate.set_read_messages(false); 573
465 574 WaitForIdle();
466 PortRef a0, a1; 575
467 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); 576 node1.set_drop_messages(true);
468 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1)); 577
469 578 RemoveNode(&node1);
470 PumpTasks(); 579
471 580 WaitForIdle();
472 node1_delegate.set_drop_messages(true); 581
473 582 // a0 should have eventually detected peer closure after node loss.
474 EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
475
476 PumpTasks();
477
478 ScopedMessage message; 583 ScopedMessage message;
479 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message)); 584 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.node().GetMessage(a0, &message));
480 EXPECT_FALSE(message); 585 EXPECT_FALSE(message);
481 586
482 EXPECT_EQ(OK, node0.ClosePort(a0)); 587 EXPECT_EQ(OK, node0.node().ClosePort(a0));
483 588
484 EXPECT_EQ(OK, node0.ClosePort(x0)); 589 EXPECT_EQ(OK, node0.node().ClosePort(x0));
485 590
486 EXPECT_EQ(OK, node1.GetMessage(x1, &message)); 591 EXPECT_EQ(OK, node1.node().GetMessage(x1, &message));
487 EXPECT_TRUE(message); 592 EXPECT_TRUE(message);
488 ClosePortsInMessage(&node1, message.get()); 593 node1.ClosePortsInMessage(message.get());
489 594
490 EXPECT_EQ(OK, node1.ClosePort(x1)); 595 EXPECT_EQ(OK, node1.node().ClosePort(x1));
491 596
492 PumpTasks(); 597 WaitForIdle();
493 598
494 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 599 EXPECT_TRUE(node0.node().CanShutdownCleanly());
495 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 600 EXPECT_TRUE(node1.node().CanShutdownCleanly());
496 } 601 }
497 602
498 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { 603 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
499 // Tests that a proxy gets cleaned up when its indirect peer lives on a lost 604 // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
500 // node. 605 // node.
501 606
502 NodeName node0_name(0, 1); 607 TestNode node0(0);
503 TestNodeDelegate node0_delegate(node0_name); 608 AddNode(&node0);
504 Node node0(node0_name, &node0_delegate); 609
505 node_map[0] = &node0; 610 TestNode node1(1);
506 611 AddNode(&node1);
507 NodeName node1_name(1, 1); 612
508 TestNodeDelegate node1_delegate(node1_name); 613 TestNode node2(2);
509 Node node1(node1_name, &node1_delegate); 614 AddNode(&node2);
510 node_map[1] = &node1;
511
512 NodeName node2_name(2, 1);
513 TestNodeDelegate node2_delegate(node2_name);
514 Node node2(node2_name, &node2_delegate);
515 node_map[2] = &node2;
516
517 node1_delegate.set_save_messages(true);
518 615
519 // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. 616 // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
520 PortRef A, B, C, D; 617 PortRef A, B, C, D;
521 EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); 618 CreatePortPair(&node0, &A, &node1, &B);
522 EXPECT_EQ(OK, node1.CreateUninitializedPort(&B)); 619 CreatePortPair(&node1, &C, &node2, &D);
523 EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
524 EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
525 EXPECT_EQ(OK, node1.CreateUninitializedPort(&C));
526 EXPECT_EQ(OK, node2.CreateUninitializedPort(&D));
527 EXPECT_EQ(OK, node1.InitializePort(C, node2_name, D.name()));
528 EXPECT_EQ(OK, node2.InitializePort(D, node1_name, C.name()));
529 620
530 // Create E-F and send F over A to node 1. 621 // Create E-F and send F over A to node 1.
531 PortRef E, F; 622 PortRef E, F;
532 EXPECT_EQ(OK, node0.CreatePortPair(&E, &F)); 623 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
533 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", F)); 624 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));
534 625
535 PumpTasks(); 626 WaitForIdle();
536 627
537 ScopedMessage message; 628 ScopedMessage message;
538 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); 629 ASSERT_TRUE(node1.ReadMessage(B, &message));
539 ASSERT_EQ(1u, message->num_ports()); 630 ASSERT_EQ(1u, message->num_ports());
540 631
541 EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &F)); 632 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));
542 633
543 // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 634 // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
544 // will trivially become aware of the loss, and this test verifies that the 635 // will trivially become aware of the loss, and this test verifies that the
545 // port A on node 0 will eventually also become aware of it. 636 // port A on node 0 will eventually also become aware of it.
546 637
547 EXPECT_EQ(OK, SendStringMessageWithPort(&node1, C, ".", F)); 638 // Make sure node2 stops processing events when it encounters an ObserveProxy.
548 639 node2.BlockOnEvent(EventType::kObserveProxy);
549 node_map[2] = nullptr; 640
550 EXPECT_EQ(OK, node1.LostConnectionToNode(node2_name)); 641 EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
551 642 WaitForIdle();
552 PumpTasks(); 643
644 // Simulate node 1 and 2 disconnecting.
645 EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));
646
647 // Let node2 continue processing events and wait for everyone to go idle.
648 node2.Unblock();
649 WaitForIdle();
553 650
554 // Port F should be gone. 651 // Port F should be gone.
555 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(F.name(), &F)); 652 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));
556 653
557 // Port E should have detected peer closure despite the fact that there is 654 // Port E should have detected peer closure despite the fact that there is
558 // no longer a continuous route from F to E over which the event could travel. 655 // no longer a continuous route from F to E over which the event could travel.
559 PortStatus status; 656 PortStatus status;
560 EXPECT_EQ(OK, node0.GetStatus(E, &status)); 657 EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
561 EXPECT_TRUE(status.peer_closed); 658 EXPECT_TRUE(status.peer_closed);
562 659
563 EXPECT_EQ(OK, node0.ClosePort(A)); 660 EXPECT_EQ(OK, node0.node().ClosePort(A));
564 EXPECT_EQ(OK, node1.ClosePort(B)); 661 EXPECT_EQ(OK, node1.node().ClosePort(B));
565 EXPECT_EQ(OK, node1.ClosePort(C)); 662 EXPECT_EQ(OK, node1.node().ClosePort(C));
566 EXPECT_EQ(OK, node0.ClosePort(E)); 663 EXPECT_EQ(OK, node0.node().ClosePort(E));
567 664
568 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 665 WaitForIdle();
569 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 666
667 EXPECT_TRUE(node0.node().CanShutdownCleanly());
668 EXPECT_TRUE(node1.node().CanShutdownCleanly());
570 } 669 }
571 670
572 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) { 671 TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
573 // Tests that a proxy gets cleaned up when its direct peer lives on a lost 672 // Tests that a proxy gets cleaned up when its direct peer lives on a lost
574 // node and it's predecessor lives on the same node. 673 // node and it's predecessor lives on the same node.
575 674
576 NodeName node0_name(0, 1); 675 TestNode node0(0);
577 TestNodeDelegate node0_delegate(node0_name); 676 AddNode(&node0);
578 Node node0(node0_name, &node0_delegate); 677
579 node_map[0] = &node0; 678 TestNode node1(1);
580 679 AddNode(&node1);
581 NodeName node1_name(1, 1); 680
582 TestNodeDelegate node1_delegate(node1_name);
583 Node node1(node1_name, &node1_delegate);
584 node_map[1] = &node1;
585
586 node1_delegate.set_save_messages(true);
587
588 // Create A-B spanning nodes 0 and 1.
589 PortRef A, B; 681 PortRef A, B;
590 EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); 682 CreatePortPair(&node0, &A, &node1, &B);
591 EXPECT_EQ(OK, node1.CreateUninitializedPort(&B)); 683
592 EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
593 EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
594
595 // Create C-D and send D over A to node 1.
596 PortRef C, D; 684 PortRef C, D;
597 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); 685 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
598 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", D)); 686
599 687 // Send D but block node0 on an ObserveProxy event.
600 // Pump tasks until the start of port collapse for port D, which should become 688 node0.BlockOnEvent(EventType::kObserveProxy);
601 // a proxy. 689 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));
602 PumpUntilTask(EventType::kObserveProxy); 690
691 // node0 won't collapse the proxy but node1 will receive the message before
692 // going idle.
693 WaitForIdle();
603 694
604 ScopedMessage message; 695 ScopedMessage message;
605 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); 696 ASSERT_TRUE(node1.ReadMessage(B, &message));
606 ASSERT_EQ(1u, message->num_ports()); 697 ASSERT_EQ(1u, message->num_ports());
607
608 PortRef E; 698 PortRef E;
609 EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &E)); 699 EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
610 700
611 EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name)); 701 RemoveNode(&node1);
612 PumpTasks(); 702
703 node0.Unblock();
704 WaitForIdle();
613 705
614 // Port C should have detected peer closure. 706 // Port C should have detected peer closure.
615 PortStatus status; 707 PortStatus status;
616 EXPECT_EQ(OK, node0.GetStatus(C, &status)); 708 EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
617 EXPECT_TRUE(status.peer_closed); 709 EXPECT_TRUE(status.peer_closed);
618 710
619 EXPECT_EQ(OK, node0.ClosePort(A)); 711 EXPECT_EQ(OK, node0.node().ClosePort(A));
620 EXPECT_EQ(OK, node1.ClosePort(B)); 712 EXPECT_EQ(OK, node1.node().ClosePort(B));
621 EXPECT_EQ(OK, node0.ClosePort(C)); 713 EXPECT_EQ(OK, node0.node().ClosePort(C));
622 EXPECT_EQ(OK, node1.ClosePort(E)); 714 EXPECT_EQ(OK, node1.node().ClosePort(E));
623 715
624 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 716 EXPECT_TRUE(node0.node().CanShutdownCleanly());
625 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 717 EXPECT_TRUE(node1.node().CanShutdownCleanly());
626 } 718 }
627 719
628 TEST_F(PortsTest, GetMessage1) { 720 TEST_F(PortsTest, GetMessage1) {
629 NodeName node0_name(0, 1); 721 TestNode node(0);
630 TestNodeDelegate node0_delegate(node0_name); 722 AddNode(&node);
631 Node node0(node0_name, &node0_delegate); 723
632 node_map[0] = &node0; 724 PortRef a0, a1;
633 725 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
634 PortRef a0, a1;
635 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
636 726
637 ScopedMessage message; 727 ScopedMessage message;
638 EXPECT_EQ(OK, node0.GetMessage(a0, &message)); 728 EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
639 EXPECT_FALSE(message); 729 EXPECT_FALSE(message);
640 730
641 EXPECT_EQ(OK, node0.ClosePort(a1)); 731 EXPECT_EQ(OK, node.node().ClosePort(a1));
642 732
643 EXPECT_EQ(OK, node0.GetMessage(a0, &message)); 733 WaitForIdle();
734
735 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node.node().GetMessage(a0, &message));
644 EXPECT_FALSE(message); 736 EXPECT_FALSE(message);
645 737
646 PumpTasks(); 738 EXPECT_EQ(OK, node.node().ClosePort(a0));
647 739
648 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message)); 740 WaitForIdle();
649 EXPECT_FALSE(message); 741
650 742 EXPECT_TRUE(node.node().CanShutdownCleanly());
651 EXPECT_EQ(OK, node0.ClosePort(a0));
652
653 EXPECT_TRUE(node0.CanShutdownCleanly(false));
654 } 743 }
655 744
656 TEST_F(PortsTest, GetMessage2) { 745 TEST_F(PortsTest, GetMessage2) {
657 NodeName node0_name(0, 1); 746 TestNode node(0);
658 TestNodeDelegate node0_delegate(node0_name); 747 AddNode(&node);
659 Node node0(node0_name, &node0_delegate); 748
660 node_map[0] = &node0; 749 PortRef a0, a1;
661 750 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
662 node0_delegate.set_read_messages(false); 751
663 752 EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));
664 PortRef a0, a1;
665 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
666
667 EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1"));
668 753
669 ScopedMessage message; 754 ScopedMessage message;
670 EXPECT_EQ(OK, node0.GetMessage(a0, &message)); 755 EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
671 756
672 ASSERT_TRUE(message); 757 ASSERT_TRUE(message);
673 EXPECT_EQ(0, strcmp("1", ToString(message))); 758 EXPECT_TRUE(MessageEquals(message, "1"));
674 759
675 EXPECT_EQ(OK, node0.ClosePort(a0)); 760 EXPECT_EQ(OK, node.node().ClosePort(a0));
676 EXPECT_EQ(OK, node0.ClosePort(a1)); 761 EXPECT_EQ(OK, node.node().ClosePort(a1));
677 762
678 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 763 EXPECT_TRUE(node.node().CanShutdownCleanly());
679 } 764 }
680 765
681 TEST_F(PortsTest, GetMessage3) { 766 TEST_F(PortsTest, GetMessage3) {
682 NodeName node0_name(0, 1); 767 TestNode node(0);
683 TestNodeDelegate node0_delegate(node0_name); 768 AddNode(&node);
684 Node node0(node0_name, &node0_delegate); 769
685 node_map[0] = &node0; 770 PortRef a0, a1;
686 771 EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));
687 node0_delegate.set_read_messages(false);
688
689 PortRef a0, a1;
690 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
691 772
692 const char* kStrings[] = { 773 const char* kStrings[] = {
693 "1", 774 "1",
694 "2", 775 "2",
695 "3" 776 "3"
696 }; 777 };
697 778
698 for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) 779 for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i)
699 EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i])); 780 EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));
700 781
701 ScopedMessage message; 782 ScopedMessage message;
702 for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) { 783 for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
703 EXPECT_EQ(OK, node0.GetMessage(a0, &message)); 784 EXPECT_EQ(OK, node.node().GetMessage(a0, &message));
704 ASSERT_TRUE(message); 785 ASSERT_TRUE(message);
705 EXPECT_EQ(0, strcmp(kStrings[i], ToString(message))); 786 EXPECT_TRUE(MessageEquals(message, kStrings[i]));
706 DVLOG(1) << "got " << kStrings[i];
707 } 787 }
708 788
709 EXPECT_EQ(OK, node0.ClosePort(a0)); 789 EXPECT_EQ(OK, node.node().ClosePort(a0));
710 EXPECT_EQ(OK, node0.ClosePort(a1)); 790 EXPECT_EQ(OK, node.node().ClosePort(a1));
711 791
712 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 792 EXPECT_TRUE(node.node().CanShutdownCleanly());
713 } 793 }
714 794
715 TEST_F(PortsTest, Delegation1) { 795 TEST_F(PortsTest, Delegation1) {
716 NodeName node0_name(0, 1); 796 TestNode node0(0);
717 TestNodeDelegate node0_delegate(node0_name); 797 AddNode(&node0);
718 Node node0(node0_name, &node0_delegate); 798
719 SetNode(node0_name, &node0); 799 TestNode node1(1);
720 800 AddNode(&node1);
721 NodeName node1_name(1, 1); 801
722 TestNodeDelegate node1_delegate(node1_name);
723 Node node1(node1_name, &node1_delegate);
724 node_map[1] = &node1;
725
726 node0_delegate.set_save_messages(true);
727 node1_delegate.set_save_messages(true);
728
729 // Setup pipe between node0 and node1.
730 PortRef x0, x1; 802 PortRef x0, x1;
731 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); 803 CreatePortPair(&node0, &x0, &node1, &x1);
732 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
733 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
734 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
735 804
736 // In this test, we send a message to a port that has been moved. 805 // In this test, we send a message to a port that has been moved.
737 806
738 PortRef a0, a1; 807 PortRef a0, a1;
739 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); 808 EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
740 809 EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
741 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1)); 810 WaitForIdle();
742 811
743 PumpTasks(); 812 ScopedMessage message;
744 813 ASSERT_TRUE(node1.ReadMessage(x1, &message));
745 ScopedMessage message; 814 ASSERT_EQ(1u, message->num_ports());
746 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); 815 EXPECT_TRUE(MessageEquals(message, "a1"));
747
748 ASSERT_EQ(1u, message->num_ports());
749 816
750 // This is "a1" from the point of view of node1. 817 // This is "a1" from the point of view of node1.
751 PortName a2_name = message->ports()[0]; 818 PortName a2_name = message->ports()[0];
752 819 EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
753 EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name)); 820 EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));
754 821
755 PumpTasks(); 822 WaitForIdle();
756 823
757 EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello")); 824 ASSERT_TRUE(node0.ReadMessage(x0, &message));
758 825 ASSERT_EQ(1u, message->num_ports());
759 PumpTasks(); 826 EXPECT_TRUE(MessageEquals(message, "a2"));
760
761 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
762
763 ASSERT_EQ(1u, message->num_ports());
764 827
765 // This is "a2" from the point of view of node1. 828 // This is "a2" from the point of view of node1.
766 PortName a3_name = message->ports()[0]; 829 PortName a3_name = message->ports()[0];
767 830
768 PortRef a3; 831 PortRef a3;
769 EXPECT_EQ(OK, node0.GetPort(a3_name, &a3)); 832 EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));
770 833
771 EXPECT_EQ(0, strcmp("a2", ToString(message))); 834 ASSERT_TRUE(node0.ReadMessage(a3, &message));
772
773 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
774
775 EXPECT_EQ(0u, message->num_ports()); 835 EXPECT_EQ(0u, message->num_ports());
776 EXPECT_EQ(0, strcmp("hello", ToString(message))); 836 EXPECT_TRUE(MessageEquals(message, "hello"));
777 837
778 EXPECT_EQ(OK, node0.ClosePort(a0)); 838 EXPECT_EQ(OK, node0.node().ClosePort(a0));
779 EXPECT_EQ(OK, node0.ClosePort(a3)); 839 EXPECT_EQ(OK, node0.node().ClosePort(a3));
780 840
781 EXPECT_EQ(OK, node0.ClosePort(x0)); 841 EXPECT_EQ(OK, node0.node().ClosePort(x0));
782 EXPECT_EQ(OK, node1.ClosePort(x1)); 842 EXPECT_EQ(OK, node1.node().ClosePort(x1));
783 843
784 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 844 EXPECT_TRUE(node0.node().CanShutdownCleanly());
785 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 845 EXPECT_TRUE(node1.node().CanShutdownCleanly());
786 } 846 }
787 847
788 TEST_F(PortsTest, Delegation2) { 848 TEST_F(PortsTest, Delegation2) {
789 NodeName node0_name(0, 1); 849 TestNode node0(0);
790 TestNodeDelegate node0_delegate(node0_name); 850 AddNode(&node0);
791 Node node0(node0_name, &node0_delegate); 851
792 SetNode(node0_name, &node0); 852 TestNode node1(1);
793 853 AddNode(&node1);
794 NodeName node1_name(1, 1); 854
795 TestNodeDelegate node1_delegate(node1_name); 855 for (int i = 0; i < 100; ++i) {
796 Node node1(node1_name, &node1_delegate);
797 node_map[1] = &node1;
798
799 node0_delegate.set_save_messages(true);
800 node1_delegate.set_save_messages(true);
801
802 for (int i = 0; i < 10; ++i) {
803 // Setup pipe a<->b between node0 and node1. 856 // Setup pipe a<->b between node0 and node1.
804 PortRef A, B; 857 PortRef A, B;
805 EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); 858 CreatePortPair(&node0, &A, &node1, &B);
806 EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
807 EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
808 EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
809 859
810 PortRef C, D; 860 PortRef C, D;
811 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); 861 EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));
812 862
813 PortRef E, F; 863 PortRef E, F;
814 EXPECT_EQ(OK, node0.CreatePortPair(&E, &F)); 864 EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
865
866 node1.set_save_messages(true);
815 867
816 // Pass D over A to B. 868 // Pass D over A to B.
817 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D)); 869 EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));
818 870
819 // Pass F over C to D. 871 // Pass F over C to D.
820 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F)); 872 EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));
821 873
822 // This message should find its way to node1. 874 // This message should find its way to node1.
823 EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello")); 875 EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));
824 876
825 PumpTasks(); 877 WaitForIdle();
826 878
827 EXPECT_EQ(OK, node0.ClosePort(C)); 879 EXPECT_EQ(OK, node0.node().ClosePort(C));
828 EXPECT_EQ(OK, node0.ClosePort(E)); 880 EXPECT_EQ(OK, node0.node().ClosePort(E));
829 881
830 EXPECT_EQ(OK, node0.ClosePort(A)); 882 EXPECT_EQ(OK, node0.node().ClosePort(A));
831 EXPECT_EQ(OK, node1.ClosePort(B)); 883 EXPECT_EQ(OK, node1.node().ClosePort(B));
832 884
833 for (;;) { 885 bool got_hello = false;
834 ScopedMessage message; 886 ScopedMessage message;
835 if (node1_delegate.GetSavedMessage(&message)) { 887 while (node1.GetSavedMessage(&message)) {
836 ClosePortsInMessage(&node1, message.get()); 888 node1.ClosePortsInMessage(message.get());
837 if (strcmp("hello", ToString(message)) == 0) 889 if (MessageEquals(message, "hello")) {
838 break; 890 got_hello = true;
839 } else {
840 ASSERT_TRUE(false); // "hello" message not delivered!
841 break; 891 break;
842 } 892 }
843 } 893 }
844 894
845 PumpTasks(); // Because ClosePort may have generated tasks. 895 EXPECT_TRUE(got_hello);
896
897 WaitForIdle(); // Because closing ports may have generated tasks.
846 } 898 }
847 899
848 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 900 EXPECT_TRUE(node0.node().CanShutdownCleanly());
849 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 901 EXPECT_TRUE(node1.node().CanShutdownCleanly());
850 } 902 }
851 903
852 TEST_F(PortsTest, SendUninitialized) { 904 TEST_F(PortsTest, SendUninitialized) {
853 NodeName node0_name(0, 1); 905 TestNode node(0);
854 TestNodeDelegate node0_delegate(node0_name); 906 AddNode(&node);
855 Node node0(node0_name, &node0_delegate);
856 node_map[0] = &node0;
857 907
858 PortRef x0; 908 PortRef x0;
859 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0)); 909 EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
860 EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, 910 EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
861 SendStringMessage(&node0, x0, "oops")); 911 EXPECT_EQ(OK, node.node().ClosePort(x0));
862 EXPECT_EQ(OK, node0.ClosePort(x0)); 912 EXPECT_TRUE(node.node().CanShutdownCleanly());
863 EXPECT_TRUE(node0.CanShutdownCleanly(false));
864 } 913 }
865 914
866 TEST_F(PortsTest, SendFailure) { 915 TEST_F(PortsTest, SendFailure) {
867 NodeName node0_name(0, 1); 916 TestNode node(0);
868 TestNodeDelegate node0_delegate(node0_name); 917 AddNode(&node);
869 Node node0(node0_name, &node0_delegate); 918
870 node_map[0] = &node0; 919 node.set_save_messages(true);
871
872 node0_delegate.set_save_messages(true);
873 920
874 PortRef A, B; 921 PortRef A, B;
875 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); 922 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
876 923
877 // Try to send A over itself. 924 // Try to send A over itself.
878 925
879 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF, 926 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
880 SendStringMessageWithPort(&node0, A, "oops", A)); 927 node.SendStringMessageWithPort(A, "oops", A));
881 928
882 // Try to send B over A. 929 // Try to send B over A.
883 930
884 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER, 931 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
885 SendStringMessageWithPort(&node0, A, "nope", B)); 932 node.SendStringMessageWithPort(A, "nope", B));
886 933
887 // B should be closed immediately. 934 // B should be closed immediately.
888 EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B)); 935 EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));
889 936
890 PumpTasks(); 937 WaitForIdle();
891 938
892 // There should have been no messages accepted. 939 // There should have been no messages accepted.
893 ScopedMessage message; 940 ScopedMessage message;
894 EXPECT_FALSE(node0_delegate.GetSavedMessage(&message)); 941 EXPECT_FALSE(node.GetSavedMessage(&message));
895 942
896 EXPECT_EQ(OK, node0.ClosePort(A)); 943 EXPECT_EQ(OK, node.node().ClosePort(A));
897 944
898 PumpTasks(); 945 WaitForIdle();
899 946
900 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 947 EXPECT_TRUE(node.node().CanShutdownCleanly());
901 } 948 }
902 949
903 TEST_F(PortsTest, DontLeakUnreceivedPorts) { 950 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
904 NodeName node0_name(0, 1); 951 TestNode node(0);
905 TestNodeDelegate node0_delegate(node0_name); 952 AddNode(&node);
906 Node node0(node0_name, &node0_delegate); 953
907 node_map[0] = &node0; 954 PortRef A, B, C, D;
908 955 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
909 node0_delegate.set_read_messages(false); 956 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
957
958 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
959
960 EXPECT_EQ(OK, node.node().ClosePort(C));
961 EXPECT_EQ(OK, node.node().ClosePort(A));
962 EXPECT_EQ(OK, node.node().ClosePort(B));
963
964 WaitForIdle();
965
966 EXPECT_TRUE(node.node().CanShutdownCleanly());
967 }
968
969 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
970 TestNode node(0);
971 AddNode(&node);
972
973 PortRef A, B, C, D;
974 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
975 EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));
976
977 EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));
978
979 ScopedMessage message;
980 EXPECT_TRUE(node.ReadMessage(B, &message));
981 ASSERT_EQ(1u, message->num_ports());
982 EXPECT_TRUE(MessageEquals(message, "foo"));
983 PortRef E;
984 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
985
986 EXPECT_TRUE(
987 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
988
989 WaitForIdle();
990
991 EXPECT_TRUE(
992 node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
993 EXPECT_FALSE(node.node().CanShutdownCleanly());
994
995 EXPECT_EQ(OK, node.node().ClosePort(A));
996 EXPECT_EQ(OK, node.node().ClosePort(B));
997 EXPECT_EQ(OK, node.node().ClosePort(C));
998 EXPECT_EQ(OK, node.node().ClosePort(E));
999
1000 WaitForIdle();
1001
1002 EXPECT_TRUE(node.node().CanShutdownCleanly());
1003 }
1004
1005 TEST_F(PortsTest, ProxyCollapse1) {
1006 TestNode node(0);
1007 AddNode(&node);
910 1008
911 PortRef A, B; 1009 PortRef A, B;
912 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); 1010 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
913 1011
914 PortRef C, D; 1012 PortRef X, Y;
915 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D)); 1013 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
916 1014
917 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D)); 1015 ScopedMessage message;
918 1016
919 PumpTasks(); 1017 // Send B and receive it as C.
920 1018 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
921 EXPECT_EQ(OK, node0.ClosePort(C)); 1019 ASSERT_TRUE(node.ReadMessage(Y, &message));
922 1020 ASSERT_EQ(1u, message->num_ports());
923 EXPECT_EQ(OK, node0.ClosePort(A)); 1021 PortRef C;
924 EXPECT_EQ(OK, node0.ClosePort(B)); 1022 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
925 1023
926 PumpTasks(); 1024 // Send C and receive it as D.
927 1025 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
928 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 1026 ASSERT_TRUE(node.ReadMessage(Y, &message));
929 } 1027 ASSERT_EQ(1u, message->num_ports());
930 1028 PortRef D;
931 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) { 1029 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
932 NodeName node0_name(0, 1); 1030
933 TestNodeDelegate node0_delegate(node0_name); 1031 // Send D and receive it as E.
934 Node node0(node0_name, &node0_delegate); 1032 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
935 node_map[0] = &node0; 1033 ASSERT_TRUE(node.ReadMessage(Y, &message));
936 1034 ASSERT_EQ(1u, message->num_ports());
937 node0_delegate.set_save_messages(true); 1035 PortRef E;
1036 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1037
1038 EXPECT_EQ(OK, node.node().ClosePort(X));
1039 EXPECT_EQ(OK, node.node().ClosePort(Y));
1040
1041 EXPECT_EQ(OK, node.node().ClosePort(A));
1042 EXPECT_EQ(OK, node.node().ClosePort(E));
1043
1044 // The node should not idle until all proxies are collapsed.
1045 WaitForIdle();
1046
1047 EXPECT_TRUE(node.node().CanShutdownCleanly());
1048 }
1049
1050 TEST_F(PortsTest, ProxyCollapse2) {
1051 TestNode node(0);
1052 AddNode(&node);
938 1053
939 PortRef A, B; 1054 PortRef A, B;
940 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); 1055 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
941
942 PortRef C, D;
943 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
944
945 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
946
947 ScopedMessage message;
948 EXPECT_TRUE(node0_delegate.GetSavedMessage(&message));
949 ASSERT_EQ(1u, message->num_ports());
950
951 PortRef E;
952 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
953
954 EXPECT_TRUE(node0.CanShutdownCleanly(true));
955
956 PumpTasks();
957
958 EXPECT_TRUE(node0.CanShutdownCleanly(true));
959 EXPECT_FALSE(node0.CanShutdownCleanly(false));
960
961 EXPECT_EQ(OK, node0.ClosePort(A));
962 EXPECT_EQ(OK, node0.ClosePort(B));
963 EXPECT_EQ(OK, node0.ClosePort(C));
964 EXPECT_EQ(OK, node0.ClosePort(E));
965
966 PumpTasks();
967
968 EXPECT_TRUE(node0.CanShutdownCleanly(false));
969 }
970
971 TEST_F(PortsTest, ProxyCollapse1) {
972 NodeName node0_name(0, 1);
973 TestNodeDelegate node0_delegate(node0_name);
974 Node node0(node0_name, &node0_delegate);
975 node_map[0] = &node0;
976
977 node0_delegate.set_save_messages(true);
978
979 PortRef A, B;
980 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
981 1056
982 PortRef X, Y; 1057 PortRef X, Y;
983 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); 1058 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
984 1059
985 ScopedMessage message; 1060 ScopedMessage message;
986 1061
987 // Send B and receive it as C. 1062 // Send B and A to create proxies in each direction.
988 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); 1063 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
989 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); 1064 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
990 ASSERT_EQ(1u, message->num_ports()); 1065
991 PortRef C; 1066 EXPECT_EQ(OK, node.node().ClosePort(X));
992 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); 1067 EXPECT_EQ(OK, node.node().ClosePort(Y));
993
994 // Send C and receive it as D.
995 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
996 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
997 ASSERT_EQ(1u, message->num_ports());
998 PortRef D;
999 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
1000
1001 // Send D and receive it as E.
1002 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D));
1003 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1004 ASSERT_EQ(1u, message->num_ports());
1005 PortRef E;
1006 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
1007
1008 EXPECT_EQ(OK, node0.ClosePort(X));
1009 EXPECT_EQ(OK, node0.ClosePort(Y));
1010
1011 EXPECT_EQ(OK, node0.ClosePort(A));
1012 EXPECT_EQ(OK, node0.ClosePort(E));
1013
1014 PumpTasks();
1015
1016 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1017 }
1018
1019 TEST_F(PortsTest, ProxyCollapse2) {
1020 NodeName node0_name(0, 1);
1021 TestNodeDelegate node0_delegate(node0_name);
1022 Node node0(node0_name, &node0_delegate);
1023 node_map[0] = &node0;
1024
1025 node0_delegate.set_save_messages(true);
1026
1027 PortRef A, B;
1028 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1029
1030 PortRef X, Y;
1031 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
1032
1033 ScopedMessage message;
1034
1035 // Send B and receive it as C.
1036 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
1037 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1038 ASSERT_EQ(1u, message->num_ports());
1039 PortRef C;
1040 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
1041
1042 // Send A and receive it as D.
1043 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
1044 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1045 ASSERT_EQ(1u, message->num_ports());
1046 PortRef D;
1047 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
1048 1068
1049 // At this point we have a scenario with: 1069 // At this point we have a scenario with:
1050 // 1070 //
1051 // D -> [B] -> C -> [A] 1071 // D -> [B] -> C -> [A]
1052 // 1072 //
1053 // Ensure that the proxies can collapse. 1073 // Ensure that the proxies can collapse. The sent ports will be closed
1054 1074 // eventually as a result of Y's closure.
1055 EXPECT_EQ(OK, node0.ClosePort(X)); 1075
1056 EXPECT_EQ(OK, node0.ClosePort(Y)); 1076 WaitForIdle();
1057 1077
1058 EXPECT_EQ(OK, node0.ClosePort(C)); 1078 EXPECT_TRUE(node.node().CanShutdownCleanly());
1059 EXPECT_EQ(OK, node0.ClosePort(D));
1060
1061 PumpTasks();
1062
1063 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1064 } 1079 }
1065 1080
1066 TEST_F(PortsTest, SendWithClosedPeer) { 1081 TEST_F(PortsTest, SendWithClosedPeer) {
1067 // This tests that if a port is sent when its peer is already known to be 1082 // This tests that if a port is sent when its peer is already known to be
1068 // closed, the newly created port will be aware of that peer closure, and the 1083 // closed, the newly created port will be aware of that peer closure, and the
1069 // proxy will eventually collapse. 1084 // proxy will eventually collapse.
1070 1085
1071 NodeName node0_name(0, 1); 1086 TestNode node(0);
1072 TestNodeDelegate node0_delegate(node0_name); 1087 AddNode(&node);
1073 Node node0(node0_name, &node0_delegate);
1074 node_map[0] = &node0;
1075
1076 node0_delegate.set_read_messages(false);
1077 1088
1078 // Send a message from A to B, then close A. 1089 // Send a message from A to B, then close A.
1079 PortRef A, B; 1090 PortRef A, B;
1080 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); 1091 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1081 EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); 1092 EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
1082 EXPECT_EQ(OK, node0.ClosePort(A)); 1093 EXPECT_EQ(OK, node.node().ClosePort(A));
1083
1084 PumpTasks();
1085 1094
1086 // Now send B over X-Y as new port C. 1095 // Now send B over X-Y as new port C.
1087 PortRef X, Y; 1096 PortRef X, Y;
1088 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); 1097 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1098 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1099 ScopedMessage message;
1100 ASSERT_TRUE(node.ReadMessage(Y, &message));
1101 ASSERT_EQ(1u, message->num_ports());
1102 PortRef C;
1103 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1089 1104
1090 node0_delegate.set_read_messages(true); 1105 EXPECT_EQ(OK, node.node().ClosePort(X));
1091 node0_delegate.set_save_messages(true); 1106 EXPECT_EQ(OK, node.node().ClosePort(Y));
1092 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
1093 1107
1094 EXPECT_EQ(OK, node0.ClosePort(X)); 1108 WaitForIdle();
1095 EXPECT_EQ(OK, node0.ClosePort(Y));
1096 1109
1097 ScopedMessage message; 1110 // C should have received the message originally sent to B, and it should also
1098 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); 1111 // be aware of A's closure.
1099 ASSERT_EQ(1u, message->num_ports());
1100 1112
1101 PortRef C; 1113 ASSERT_TRUE(node.ReadMessage(C, &message));
1102 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); 1114 EXPECT_TRUE(MessageEquals(message, "hey"));
1103
1104 PumpTasks();
1105
1106 // C should receive the message originally sent to B, and it should also be
1107 // aware of A's closure.
1108
1109 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1110 EXPECT_EQ(0, strcmp("hey", ToString(message)));
1111 1115
1112 PortStatus status; 1116 PortStatus status;
1113 EXPECT_EQ(OK, node0.GetStatus(C, &status)); 1117 EXPECT_EQ(OK, node.node().GetStatus(C, &status));
1114 EXPECT_FALSE(status.receiving_messages); 1118 EXPECT_FALSE(status.receiving_messages);
1115 EXPECT_FALSE(status.has_messages); 1119 EXPECT_FALSE(status.has_messages);
1116 EXPECT_TRUE(status.peer_closed); 1120 EXPECT_TRUE(status.peer_closed);
1117 1121
1118 node0.ClosePort(C); 1122 node.node().ClosePort(C);
1119 1123
1120 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 1124 WaitForIdle();
1125
1126 EXPECT_TRUE(node.node().CanShutdownCleanly());
1121 } 1127 }
1122 1128
1123 TEST_F(PortsTest, SendWithClosedPeerSent) { 1129 TEST_F(PortsTest, SendWithClosedPeerSent) {
1124 // This tests that if a port is closed while some number of proxies are still 1130 // This tests that if a port is closed while some number of proxies are still
1125 // routing messages (directly or indirectly) to it, that the peer port is 1131 // routing messages (directly or indirectly) to it, that the peer port is
1126 // eventually notified of the closure, and the dead-end proxies will 1132 // eventually notified of the closure, and the dead-end proxies will
1127 // eventually be removed. 1133 // eventually be removed.
1128 1134
1129 NodeName node0_name(0, 1); 1135 TestNode node(0);
1130 TestNodeDelegate node0_delegate(node0_name); 1136 AddNode(&node);
1131 Node node0(node0_name, &node0_delegate);
1132 node_map[0] = &node0;
1133
1134 node0_delegate.set_save_messages(true);
1135 1137
1136 PortRef X, Y; 1138 PortRef X, Y;
1137 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); 1139 EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
1138 1140
1139 PortRef A, B; 1141 PortRef A, B;
1140 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); 1142 EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
1141 1143
1142 ScopedMessage message; 1144 ScopedMessage message;
1143 1145
1144 // Send A as new port C. 1146 // Send A as new port C.
1145 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); 1147 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));
1146 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); 1148
1149 ASSERT_TRUE(node.ReadMessage(Y, &message));
1147 ASSERT_EQ(1u, message->num_ports()); 1150 ASSERT_EQ(1u, message->num_ports());
1148 PortRef C; 1151 PortRef C;
1149 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C)); 1152 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));
1150 1153
1151 // Send C as new port D. 1154 // Send C as new port D.
1152 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C)); 1155 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
1153 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); 1156
1157 ASSERT_TRUE(node.ReadMessage(Y, &message));
1154 ASSERT_EQ(1u, message->num_ports()); 1158 ASSERT_EQ(1u, message->num_ports());
1155 PortRef D; 1159 PortRef D;
1156 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D)); 1160 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));
1157
1158 node0_delegate.set_read_messages(false);
1159 1161
1160 // Send a message to B through D, then close D. 1162 // Send a message to B through D, then close D.
1161 EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey")); 1163 EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
1162 EXPECT_EQ(OK, node0.ClosePort(D)); 1164 EXPECT_EQ(OK, node.node().ClosePort(D));
1163
1164 PumpTasks();
1165 1165
1166 // Now send B as new port E. 1166 // Now send B as new port E.
1167 1167
1168 node0_delegate.set_read_messages(true); 1168 EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
1169 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B)); 1169 EXPECT_EQ(OK, node.node().ClosePort(X));
1170 1170
1171 EXPECT_EQ(OK, node0.ClosePort(X)); 1171 ASSERT_TRUE(node.ReadMessage(Y, &message));
1172 EXPECT_EQ(OK, node0.ClosePort(Y)); 1172 ASSERT_EQ(1u, message->num_ports());
1173
1174 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1175 ASSERT_EQ(1u, message->num_ports());
1176
1177 PortRef E; 1173 PortRef E;
1178 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); 1174 ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));
1179 1175
1180 PumpTasks(); 1176 EXPECT_EQ(OK, node.node().ClosePort(Y));
1177
1178 WaitForIdle();
1181 1179
1182 // E should receive the message originally sent to B, and it should also be 1180 // E should receive the message originally sent to B, and it should also be
1183 // aware of D's closure. 1181 // aware of D's closure.
1184 1182
1185 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); 1183 ASSERT_TRUE(node.ReadMessage(E, &message));
1186 EXPECT_EQ(0, strcmp("hey", ToString(message))); 1184 EXPECT_TRUE(MessageEquals(message, "hey"));
1187 1185
1188 PortStatus status; 1186 PortStatus status;
1189 EXPECT_EQ(OK, node0.GetStatus(E, &status)); 1187 EXPECT_EQ(OK, node.node().GetStatus(E, &status));
1190 EXPECT_FALSE(status.receiving_messages); 1188 EXPECT_FALSE(status.receiving_messages);
1191 EXPECT_FALSE(status.has_messages); 1189 EXPECT_FALSE(status.has_messages);
1192 EXPECT_TRUE(status.peer_closed); 1190 EXPECT_TRUE(status.peer_closed);
1193 1191
1194 node0.ClosePort(E); 1192 EXPECT_EQ(OK, node.node().ClosePort(E));
1195 1193
1196 PumpTasks(); 1194 WaitForIdle();
1197 1195
1198 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 1196 EXPECT_TRUE(node.node().CanShutdownCleanly());
1199 } 1197 }
1200 1198
1201 TEST_F(PortsTest, MergePorts) { 1199 TEST_F(PortsTest, MergePorts) {
1202 NodeName node0_name(0, 1); 1200 TestNode node0(0);
1203 TestNodeDelegate node0_delegate(node0_name); 1201 AddNode(&node0);
1204 Node node0(node0_name, &node0_delegate); 1202
1205 node_map[0] = &node0; 1203 TestNode node1(1);
1206 1204 AddNode(&node1);
1207 NodeName node1_name(1, 1); 1205
1208 TestNodeDelegate node1_delegate(node1_name); 1206 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1209 Node node1(node1_name, &node1_delegate); 1207 PortRef A, B, C, D;
1210 node_map[1] = &node1; 1208 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1211 1209 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1212 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1213 PortRef A, B, C, D;
1214 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1215 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1216
1217 node0_delegate.set_read_messages(false);
1218 node1_delegate.set_save_messages(true);
1219 1210
1220 // Write a message on A. 1211 // Write a message on A.
1221 EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); 1212 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1222 1213
1223 PumpTasks(); 1214 // Initiate a merge between B and C.
1224 1215 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1225 // Initiate a merge between B and C. 1216
1226 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); 1217 WaitForIdle();
1227 1218
1228 PumpTasks(); 1219 // Expect all proxies to be gone once idle.
1229 1220 EXPECT_TRUE(
1230 // Expect only two receiving ports to be left after pumping tasks. 1221 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1231 EXPECT_TRUE(node0.CanShutdownCleanly(true)); 1222 EXPECT_TRUE(
1232 EXPECT_TRUE(node1.CanShutdownCleanly(true)); 1223 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1233 1224
1234 // Expect D to have received the message sent on A. 1225 // Expect D to have received the message sent on A.
1235 ScopedMessage message; 1226 ScopedMessage message;
1236 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); 1227 ASSERT_TRUE(node1.ReadMessage(D, &message));
1237 EXPECT_EQ(0, strcmp("hey", ToString(message))); 1228 EXPECT_TRUE(MessageEquals(message, "hey"));
1238 1229
1239 EXPECT_EQ(OK, node0.ClosePort(A)); 1230 EXPECT_EQ(OK, node0.node().ClosePort(A));
1240 EXPECT_EQ(OK, node1.ClosePort(D)); 1231 EXPECT_EQ(OK, node1.node().ClosePort(D));
1241 1232
1242 // No more ports should be open. 1233 // No more ports should be open.
1243 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 1234 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1244 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 1235 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1245 } 1236 }
1246 1237
1247 TEST_F(PortsTest, MergePortWithClosedPeer1) { 1238 TEST_F(PortsTest, MergePortWithClosedPeer1) {
1248 // This tests that the right thing happens when initiating a merge on a port 1239 // This tests that the right thing happens when initiating a merge on a port
1249 // whose peer has already been closed. 1240 // whose peer has already been closed.
1250 1241
1251 NodeName node0_name(0, 1); 1242 TestNode node0(0);
1252 TestNodeDelegate node0_delegate(node0_name); 1243 AddNode(&node0);
1253 Node node0(node0_name, &node0_delegate); 1244
1254 node_map[0] = &node0; 1245 TestNode node1(1);
1255 1246 AddNode(&node1);
1256 NodeName node1_name(1, 1); 1247
1257 TestNodeDelegate node1_delegate(node1_name); 1248 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1258 Node node1(node1_name, &node1_delegate); 1249 PortRef A, B, C, D;
1259 node_map[1] = &node1; 1250 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1260 1251 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1261 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1262 PortRef A, B, C, D;
1263 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1264 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1265
1266 node0_delegate.set_read_messages(false);
1267 node1_delegate.set_save_messages(true);
1268 1252
1269 // Write a message on A. 1253 // Write a message on A.
1270 EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey")); 1254 EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));
1271
1272 PumpTasks();
1273 1255
1274 // Close A. 1256 // Close A.
1275 EXPECT_EQ(OK, node0.ClosePort(A)); 1257 EXPECT_EQ(OK, node0.node().ClosePort(A));
1276 1258
1277 // Initiate a merge between B and C. 1259 // Initiate a merge between B and C.
1278 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); 1260 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1279 1261
1280 PumpTasks(); 1262 WaitForIdle();
1281 1263
1282 // Expect only one receiving port to be left after pumping tasks. 1264 // Expect all proxies to be gone once idle. node0 should have no ports since
1283 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 1265 // A was explicitly closed.
1284 EXPECT_TRUE(node1.CanShutdownCleanly(true)); 1266 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1267 EXPECT_TRUE(
1268 node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1285 1269
1286 // Expect D to have received the message sent on A. 1270 // Expect D to have received the message sent on A.
1287 ScopedMessage message; 1271 ScopedMessage message;
1288 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); 1272 ASSERT_TRUE(node1.ReadMessage(D, &message));
1289 EXPECT_EQ(0, strcmp("hey", ToString(message))); 1273 EXPECT_TRUE(MessageEquals(message, "hey"));
1290 1274
1291 EXPECT_EQ(OK, node1.ClosePort(D)); 1275 EXPECT_EQ(OK, node1.node().ClosePort(D));
1292 1276
1293 // No more ports should be open. 1277 // No more ports should be open.
1294 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 1278 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1295 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 1279 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1296 } 1280 }
1297 1281
1298 TEST_F(PortsTest, MergePortWithClosedPeer2) { 1282 TEST_F(PortsTest, MergePortWithClosedPeer2) {
1299 // This tests that the right thing happens when merging into a port whose peer 1283 // This tests that the right thing happens when merging into a port whose peer
1300 // has already been closed. 1284 // has already been closed.
1301 1285
1302 NodeName node0_name(0, 1); 1286 TestNode node0(0);
1303 TestNodeDelegate node0_delegate(node0_name); 1287 AddNode(&node0);
1304 Node node0(node0_name, &node0_delegate); 1288
1305 node_map[0] = &node0; 1289 TestNode node1(1);
1306 1290 AddNode(&node1);
1307 NodeName node1_name(1, 1); 1291
1308 TestNodeDelegate node1_delegate(node1_name); 1292 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1309 Node node1(node1_name, &node1_delegate); 1293 PortRef A, B, C, D;
1310 node_map[1] = &node1; 1294 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1311 1295 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1312 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1296
1313 PortRef A, B, C, D; 1297 // Write a message on D and close it.
1314 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B)); 1298 EXPECT_EQ(OK, node0.SendStringMessage(D, "hey"));
1315 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D)); 1299 EXPECT_EQ(OK, node1.node().ClosePort(D));
1316 1300
1317 node0_delegate.set_save_messages(true); 1301 // Initiate a merge between B and C.
1318 node1_delegate.set_read_messages(false); 1302 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1319 1303
1320 // Write a message on D. 1304 WaitForIdle();
1321 EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey")); 1305
1322 1306 // Expect all proxies to be gone once idle. node1 should have no ports since
1323 PumpTasks(); 1307 // D was explicitly closed.
1324 1308 EXPECT_TRUE(
1325 // Close D. 1309 node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
1326 EXPECT_EQ(OK, node1.ClosePort(D)); 1310 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1327
1328 // Initiate a merge between B and C.
1329 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1330
1331 PumpTasks();
1332
1333 // Expect only one receiving port to be left after pumping tasks.
1334 EXPECT_TRUE(node0.CanShutdownCleanly(true));
1335 EXPECT_TRUE(node1.CanShutdownCleanly(false));
1336 1311
1337 // Expect A to have received the message sent on D. 1312 // Expect A to have received the message sent on D.
1338 ScopedMessage message; 1313 ScopedMessage message;
1339 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); 1314 ASSERT_TRUE(node0.ReadMessage(A, &message));
1340 EXPECT_EQ(0, strcmp("hey", ToString(message))); 1315 EXPECT_TRUE(MessageEquals(message, "hey"));
1341 1316
1342 EXPECT_EQ(OK, node0.ClosePort(A)); 1317 EXPECT_EQ(OK, node0.node().ClosePort(A));
1343 1318
1344 // No more ports should be open. 1319 // No more ports should be open.
1345 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 1320 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1346 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 1321 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1347 } 1322 }
1348 1323
1349 TEST_F(PortsTest, MergePortsWithClosedPeers) { 1324 TEST_F(PortsTest, MergePortsWithClosedPeers) {
1350 // This tests that no residual ports are left behind if two ports are merged 1325 // This tests that no residual ports are left behind if two ports are merged
1351 // when both of their peers have been closed. 1326 // when both of their peers have been closed.
1352 1327
1353 NodeName node0_name(0, 1); 1328 TestNode node0(0);
1354 TestNodeDelegate node0_delegate(node0_name); 1329 AddNode(&node0);
1355 Node node0(node0_name, &node0_delegate); 1330
1356 node_map[0] = &node0; 1331 TestNode node1(1);
1357 1332 AddNode(&node1);
1358 NodeName node1_name(1, 1); 1333
1359 TestNodeDelegate node1_delegate(node1_name); 1334 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1360 Node node1(node1_name, &node1_delegate); 1335 PortRef A, B, C, D;
1361 node_map[1] = &node1; 1336 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1362 1337 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1363 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1364 PortRef A, B, C, D;
1365 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1366 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1367
1368 node0_delegate.set_save_messages(true);
1369 node1_delegate.set_read_messages(false);
1370 1338
1371 // Close A and D. 1339 // Close A and D.
1372 EXPECT_EQ(OK, node0.ClosePort(A)); 1340 EXPECT_EQ(OK, node0.node().ClosePort(A));
1373 EXPECT_EQ(OK, node1.ClosePort(D)); 1341 EXPECT_EQ(OK, node1.node().ClosePort(D));
1374 1342
1375 PumpTasks(); 1343 WaitForIdle();
1376 1344
1377 // Initiate a merge between B and C. 1345 // Initiate a merge between B and C.
1378 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); 1346 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1379 1347
1380 PumpTasks(); 1348 WaitForIdle();
1381 1349
1382 // Expect everything to have gone away. 1350 // Expect everything to have gone away.
1383 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 1351 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1384 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 1352 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1385 } 1353 }
1386 1354
1387 TEST_F(PortsTest, MergePortsWithMovedPeers) { 1355 TEST_F(PortsTest, MergePortsWithMovedPeers) {
1388 // This tests that no ports can be merged successfully even if their peers 1356 // This tests that ports can be merged successfully even if their peers are
1389 // are moved around. 1357 // moved around.
1390 1358
1391 NodeName node0_name(0, 1); 1359 TestNode node0(0);
1392 TestNodeDelegate node0_delegate(node0_name); 1360 AddNode(&node0);
1393 Node node0(node0_name, &node0_delegate); 1361
1394 node_map[0] = &node0; 1362 TestNode node1(1);
1395 1363 AddNode(&node1);
1396 NodeName node1_name(1, 1); 1364
1397 TestNodeDelegate node1_delegate(node1_name); 1365 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1398 Node node1(node1_name, &node1_delegate); 1366 PortRef A, B, C, D;
1399 node_map[1] = &node1; 1367 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1400 1368 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1401 node0_delegate.set_save_messages(true);
1402 node1_delegate.set_read_messages(false);
1403
1404 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1405 PortRef A, B, C, D;
1406 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1407 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1408 1369
1409 // Set up another pair X-Y for moving ports on node0. 1370 // Set up another pair X-Y for moving ports on node0.
1410 PortRef X, Y; 1371 PortRef X, Y;
1411 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y)); 1372 EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));
1412 1373
1413 ScopedMessage message; 1374 ScopedMessage message;
1414 1375
1415 // Move A to new port E. 1376 // Move A to new port E.
1416 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A)); 1377 EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
1417 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); 1378 ASSERT_TRUE(node0.ReadMessage(Y, &message));
1418 ASSERT_EQ(1u, message->num_ports()); 1379 ASSERT_EQ(1u, message->num_ports());
1419 PortRef E; 1380 PortRef E;
1420 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E)); 1381 ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));
1421 1382
1422 EXPECT_EQ(OK, node0.ClosePort(X)); 1383 EXPECT_EQ(OK, node0.node().ClosePort(X));
1423 EXPECT_EQ(OK, node0.ClosePort(Y)); 1384 EXPECT_EQ(OK, node0.node().ClosePort(Y));
1424
1425 node0_delegate.set_read_messages(false);
1426 1385
1427 // Write messages on E and D. 1386 // Write messages on E and D.
1428 EXPECT_EQ(OK, SendStringMessage(&node0, E, "hey")); 1387 EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
1429 EXPECT_EQ(OK, SendStringMessage(&node1, D, "hi")); 1388 EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));
1430 1389
1431 // Initiate a merge between B and C. 1390 // Initiate a merge between B and C.
1432 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name())); 1391 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1433 1392
1434 node0_delegate.set_read_messages(true); 1393 WaitForIdle();
1435 node1_delegate.set_read_messages(true);
1436 node1_delegate.set_save_messages(true);
1437
1438 PumpTasks();
1439 1394
1440 // Expect to receive D's message on E and E's message on D. 1395 // Expect to receive D's message on E and E's message on D.
1441 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message)); 1396 ASSERT_TRUE(node0.ReadMessage(E, &message));
1442 EXPECT_EQ(0, strcmp("hi", ToString(message))); 1397 EXPECT_TRUE(MessageEquals(message, "hi"));
1443 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); 1398 ASSERT_TRUE(node1.ReadMessage(D, &message));
1444 EXPECT_EQ(0, strcmp("hey", ToString(message))); 1399 EXPECT_TRUE(MessageEquals(message, "hey"));
1445 1400
1446 // Close E and D. 1401 // Close E and D.
1447 EXPECT_EQ(OK, node0.ClosePort(E)); 1402 EXPECT_EQ(OK, node0.node().ClosePort(E));
1448 EXPECT_EQ(OK, node1.ClosePort(D)); 1403 EXPECT_EQ(OK, node1.node().ClosePort(D));
1449 1404
1450 PumpTasks(); 1405 WaitForIdle();
1451 1406
1452 // Expect everything to have gone away. 1407 // Expect everything to have gone away.
1453 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 1408 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1454 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 1409 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1455 } 1410 }
1456 1411
1457 TEST_F(PortsTest, MergePortsFailsGracefully) { 1412 TEST_F(PortsTest, MergePortsFailsGracefully) {
1458 // This tests that the system remains in a well-defined state if something 1413 // This tests that the system remains in a well-defined state if something
1459 // goes wrong during port merge. 1414 // goes wrong during port merge.
1460 1415
1461 NodeName node0_name(0, 1); 1416 TestNode node0(0);
1462 TestNodeDelegate node0_delegate(node0_name); 1417 AddNode(&node0);
1463 Node node0(node0_name, &node0_delegate); 1418
1464 node_map[0] = &node0; 1419 TestNode node1(1);
1465 1420 AddNode(&node1);
1466 NodeName node1_name(1, 1); 1421
1467 TestNodeDelegate node1_delegate(node1_name); 1422 // Setup two independent port pairs, A-B on node0 and C-D on node1.
1468 Node node1(node1_name, &node1_delegate); 1423 PortRef A, B, C, D;
1469 node_map[1] = &node1; 1424 EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
1470 1425 EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));
1471 // Setup two independent port pairs, A-B on node0 and C-D on node1. 1426
1472 PortRef A, B, C, D;
1473 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1474 EXPECT_EQ(OK, node1.CreatePortPair(&C, &D));
1475
1476 PumpTasks();
1477
1478 // Initiate a merge between B and C.
1479 EXPECT_EQ(OK, node0.MergePorts(B, node1_name, C.name()));
1480
1481 // Move C to a new port E. This is dumb and nobody should do it, but it's
1482 // possible. MergePorts will fail as a result because C won't be in a
1483 // receiving state when the event arrives at node1, so B should be closed.
1484 ScopedMessage message; 1427 ScopedMessage message;
1485 PortRef X, Y; 1428 PortRef X, Y;
1486 EXPECT_EQ(OK, node1.CreatePortPair(&X, &Y)); 1429 EXPECT_EQ(OK, node1.node().CreatePortPair(&X, &Y));
1487 node1_delegate.set_save_messages(true); 1430
1488 EXPECT_EQ(OK, SendStringMessageWithPort(&node1, X, "foo", C)); 1431 // Block the merge from proceeding until we can do something stupid with port
1489 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); 1432 // C. This avoids the test logic racing with async merge logic.
1433 node1.BlockOnEvent(EventType::kMergePort);
1434
1435 // Initiate the merge between B and C.
1436 EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));
1437
1438 // Move C to a new port E. This is not a sane use of Node's public API but
1439 // is still hypothetically possible. It allows us to force a merge failure
1440 // because C will be in an invalid state by the term the merge is processed.
1441 // As a result, B should be closed.
1442 EXPECT_EQ(OK, node1.SendStringMessageWithPort(X, "foo", C));
1443
1444 node1.Unblock();
1445
1446 ASSERT_TRUE(node1.ReadMessage(Y, &message));
1490 ASSERT_EQ(1u, message->num_ports()); 1447 ASSERT_EQ(1u, message->num_ports());
1491 PortRef E; 1448 PortRef E;
1492 ASSERT_EQ(OK, node1.GetPort(message->ports()[0], &E)); 1449 ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));
1493 EXPECT_EQ(OK, node1.ClosePort(X)); 1450
1494 EXPECT_EQ(OK, node1.ClosePort(Y)); 1451 EXPECT_EQ(OK, node1.node().ClosePort(X));
1495 1452 EXPECT_EQ(OK, node1.node().ClosePort(Y));
1496 // C goes away as a result of normal proxy removal. 1453
1497 PumpTasks(); 1454 WaitForIdle();
1498 1455
1499 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(C.name(), &C)); 1456 // C goes away as a result of normal proxy removal. B should have been closed
1500 1457 // cleanly by the failed MergePorts.
1501 // B should have been closed cleanly. 1458 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
1502 EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.GetPort(B.name(), &B)); 1459 EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));
1503 1460
1504 // Close A, D, and E. 1461 // Close A, D, and E.
1505 EXPECT_EQ(OK, node0.ClosePort(A)); 1462 EXPECT_EQ(OK, node0.node().ClosePort(A));
1506 EXPECT_EQ(OK, node1.ClosePort(D)); 1463 EXPECT_EQ(OK, node1.node().ClosePort(D));
1507 EXPECT_EQ(OK, node1.ClosePort(E)); 1464 EXPECT_EQ(OK, node1.node().ClosePort(E));
1508 1465
1509 PumpTasks(); 1466 WaitForIdle();
1510 1467
1511 // Expect everything to have gone away. 1468 // Expect everything to have gone away.
1512 EXPECT_TRUE(node0.CanShutdownCleanly(false)); 1469 EXPECT_TRUE(node0.node().CanShutdownCleanly());
1513 EXPECT_TRUE(node1.CanShutdownCleanly(false)); 1470 EXPECT_TRUE(node1.node().CanShutdownCleanly());
1514 } 1471 }
1515 1472
1516 } // namespace test 1473 } // namespace test
1517 } // namespace ports 1474 } // namespace ports
1518 } // namespace edk 1475 } // namespace edk
1519 } // namespace mojo 1476 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/ports/node.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698