OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include <stdio.h> | 5 #include <stdio.h> |
6 #include <stdlib.h> | 6 #include <stdlib.h> |
7 #include <string.h> | 7 #include <string.h> |
8 | 8 |
9 #include <map> | 9 #include <map> |
10 #include <queue> | 10 #include <queue> |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
81 ScopedMessage message; | 81 ScopedMessage message; |
82 uint64_t priority; | 82 uint64_t priority; |
83 }; | 83 }; |
84 | 84 |
85 struct TaskComparator { | 85 struct TaskComparator { |
86 bool operator()(const Task* a, const Task* b) { | 86 bool operator()(const Task* a, const Task* b) { |
87 return a->priority < b->priority; | 87 return a->priority < b->priority; |
88 } | 88 } |
89 }; | 89 }; |
90 | 90 |
| 91 const size_t kMaxNodes = 3; |
| 92 |
91 std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue; | 93 std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue; |
92 Node* node_map[2]; | 94 Node* node_map[kMaxNodes]; |
93 | 95 |
94 Node* GetNode(const NodeName& name) { | 96 Node* GetNode(const NodeName& name) { |
95 return node_map[name.v1]; | 97 return node_map[name.v1]; |
96 } | 98 } |
97 | 99 |
98 void SetNode(const NodeName& name, Node* node) { | 100 void SetNode(const NodeName& name, Node* node) { |
99 node_map[name.v1] = node; | 101 node_map[name.v1] = node; |
100 } | 102 } |
101 | 103 |
102 void PumpTasks() { | 104 void PumpTasks() { |
103 while (!task_queue.empty()) { | 105 while (!task_queue.empty()) { |
104 Task* task = task_queue.top(); | 106 Task* task = task_queue.top(); |
105 task_queue.pop(); | 107 task_queue.pop(); |
106 | 108 |
107 Node* node = GetNode(task->node_name); | 109 Node* node = GetNode(task->node_name); |
108 node->AcceptMessage(std::move(task->message)); | 110 if (node) |
| 111 node->AcceptMessage(std::move(task->message)); |
109 | 112 |
110 delete task; | 113 delete task; |
111 } | 114 } |
112 } | 115 } |
113 | 116 |
114 void DiscardPendingTasks() { | 117 void DiscardPendingTasks() { |
115 while (!task_queue.empty()) { | 118 while (!task_queue.empty()) { |
116 Task* task = task_queue.top(); | 119 Task* task = task_queue.top(); |
117 task_queue.pop(); | 120 task_queue.pop(); |
118 delete task; | 121 delete task; |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
187 DVLOG(1) << "Dropping ForwardMessage from node " | 190 DVLOG(1) << "Dropping ForwardMessage from node " |
188 << node_name_ << " to " << node_name; | 191 << node_name_ << " to " << node_name; |
189 ClosePortsInMessage(GetNode(node_name), message.get()); | 192 ClosePortsInMessage(GetNode(node_name), message.get()); |
190 return; | 193 return; |
191 } | 194 } |
192 DVLOG(1) << "ForwardMessage from node " | 195 DVLOG(1) << "ForwardMessage from node " |
193 << node_name_ << " to " << node_name; | 196 << node_name_ << " to " << node_name; |
194 task_queue.push(new Task(node_name, std::move(message))); | 197 task_queue.push(new Task(node_name, std::move(message))); |
195 } | 198 } |
196 | 199 |
| 200 void BroadcastMessage(ScopedMessage message) override { |
| 201 for (size_t i = 0; i < kMaxNodes; ++i) { |
| 202 Node* node = node_map[i]; |
| 203 if (node) { |
| 204 // NOTE: We only need to support broadcast of events, which have no |
| 205 // payload or ports bytes. |
| 206 ScopedMessage new_message( |
| 207 new TestMessage(message->num_header_bytes(), 0, 0)); |
| 208 memcpy(new_message->mutable_header_bytes(), message->header_bytes(), |
| 209 message->num_header_bytes()); |
| 210 node->AcceptMessage(std::move(new_message)); |
| 211 } |
| 212 } |
| 213 } |
| 214 |
197 void PortStatusChanged(const PortRef& port) override { | 215 void PortStatusChanged(const PortRef& port) override { |
198 DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_; | 216 DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_; |
199 if (!read_messages_) | 217 if (!read_messages_) |
200 return; | 218 return; |
201 Node* node = GetNode(node_name_); | 219 Node* node = GetNode(node_name_); |
202 for (;;) { | 220 for (;;) { |
203 ScopedMessage message; | 221 ScopedMessage message; |
204 int rv = node->GetMessage(port, &message); | 222 int rv = node->GetMessage(port, &message); |
205 EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED); | 223 EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED); |
206 if (rv == ERROR_PORT_PEER_CLOSED || !message) | 224 if (rv == ERROR_PORT_PEER_CLOSED || !message) |
(...skipping 25 matching lines...) Expand all Loading... |
232 | 250 |
233 std::queue<ScopedMessage> saved_messages_; | 251 std::queue<ScopedMessage> saved_messages_; |
234 NodeName node_name_; | 252 NodeName node_name_; |
235 bool drop_messages_; | 253 bool drop_messages_; |
236 bool read_messages_; | 254 bool read_messages_; |
237 bool save_messages_; | 255 bool save_messages_; |
238 }; | 256 }; |
239 | 257 |
240 class PortsTest : public testing::Test { | 258 class PortsTest : public testing::Test { |
241 public: | 259 public: |
242 PortsTest() { | 260 void SetUp() override { |
243 SetNode(NodeName(0, 1), nullptr); | |
244 SetNode(NodeName(1, 1), nullptr); | |
245 } | |
246 | |
247 ~PortsTest() override { | |
248 DiscardPendingTasks(); | 261 DiscardPendingTasks(); |
249 SetNode(NodeName(0, 1), nullptr); | 262 SetNode(NodeName(0, 1), nullptr); |
250 SetNode(NodeName(1, 1), nullptr); | 263 SetNode(NodeName(1, 1), nullptr); |
| 264 SetNode(NodeName(2, 1), nullptr); |
251 } | 265 } |
252 }; | 266 }; |
253 | 267 |
254 } // namespace | 268 } // namespace |
255 | 269 |
256 TEST_F(PortsTest, Basic1) { | 270 TEST_F(PortsTest, Basic1) { |
257 NodeName node0_name(0, 1); | 271 NodeName node0_name(0, 1); |
258 TestNodeDelegate node0_delegate(node0_name); | 272 TestNodeDelegate node0_delegate(node0_name); |
259 Node node0(node0_name, &node0_delegate); | 273 Node node0(node0_name, &node0_delegate); |
260 SetNode(node0_name, &node0); | 274 SetNode(node0_name, &node0); |
(...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
454 ClosePortsInMessage(&node1, message.get()); | 468 ClosePortsInMessage(&node1, message.get()); |
455 | 469 |
456 EXPECT_EQ(OK, node1.ClosePort(x1)); | 470 EXPECT_EQ(OK, node1.ClosePort(x1)); |
457 | 471 |
458 PumpTasks(); | 472 PumpTasks(); |
459 | 473 |
460 EXPECT_TRUE(node0.CanShutdownCleanly(false)); | 474 EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
461 EXPECT_TRUE(node1.CanShutdownCleanly(false)); | 475 EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
462 } | 476 } |
463 | 477 |
| 478 TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) { |
| 479 // Tests that a proxy gets cleaned up when its indirect peer lives on a lost |
| 480 // node. |
| 481 |
| 482 NodeName node0_name(0, 1); |
| 483 TestNodeDelegate node0_delegate(node0_name); |
| 484 Node node0(node0_name, &node0_delegate); |
| 485 node_map[0] = &node0; |
| 486 |
| 487 NodeName node1_name(1, 1); |
| 488 TestNodeDelegate node1_delegate(node1_name); |
| 489 Node node1(node1_name, &node1_delegate); |
| 490 node_map[1] = &node1; |
| 491 |
| 492 NodeName node2_name(2, 1); |
| 493 TestNodeDelegate node2_delegate(node2_name); |
| 494 Node node2(node2_name, &node2_delegate); |
| 495 node_map[2] = &node2; |
| 496 |
| 497 node1_delegate.set_save_messages(true); |
| 498 |
| 499 // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2. |
| 500 PortRef A, B, C, D; |
| 501 EXPECT_EQ(OK, node0.CreateUninitializedPort(&A)); |
| 502 EXPECT_EQ(OK, node1.CreateUninitializedPort(&B)); |
| 503 EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name())); |
| 504 EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name())); |
| 505 EXPECT_EQ(OK, node1.CreateUninitializedPort(&C)); |
| 506 EXPECT_EQ(OK, node2.CreateUninitializedPort(&D)); |
| 507 EXPECT_EQ(OK, node1.InitializePort(C, node2_name, D.name())); |
| 508 EXPECT_EQ(OK, node2.InitializePort(D, node1_name, C.name())); |
| 509 |
| 510 // Create E-F and send F over A to node 1. |
| 511 PortRef E, F; |
| 512 EXPECT_EQ(OK, node0.CreatePortPair(&E, &F)); |
| 513 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, ".", F)); |
| 514 |
| 515 PumpTasks(); |
| 516 |
| 517 ScopedMessage message; |
| 518 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message)); |
| 519 ASSERT_EQ(1u, message->num_ports()); |
| 520 |
| 521 EXPECT_EQ(OK, node1.GetPort(message->ports()[0], &F)); |
| 522 |
| 523 // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1 |
| 524 // will trivially become aware of the loss, and this test verifies that the |
| 525 // port A on node 0 will eventually also become aware of it. |
| 526 |
| 527 EXPECT_EQ(OK, SendStringMessageWithPort(&node1, C, ".", F)); |
| 528 |
| 529 node_map[2] = nullptr; |
| 530 EXPECT_EQ(OK, node1.LostConnectionToNode(node2_name)); |
| 531 |
| 532 PumpTasks(); |
| 533 |
| 534 // Port F should be gone. |
| 535 EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.GetPort(F.name(), &F)); |
| 536 |
| 537 // Port E should have detected peer closure despite the fact that there is |
| 538 // no longer a continuous route from F to E over which the event could travel. |
| 539 PortStatus status; |
| 540 EXPECT_EQ(OK, node0.GetStatus(E, &status)); |
| 541 EXPECT_TRUE(status.peer_closed); |
| 542 |
| 543 EXPECT_EQ(OK, node0.ClosePort(A)); |
| 544 EXPECT_EQ(OK, node1.ClosePort(B)); |
| 545 EXPECT_EQ(OK, node1.ClosePort(C)); |
| 546 EXPECT_EQ(OK, node0.ClosePort(E)); |
| 547 |
| 548 EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
| 549 EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
| 550 } |
| 551 |
464 TEST_F(PortsTest, GetMessage1) { | 552 TEST_F(PortsTest, GetMessage1) { |
465 NodeName node0_name(0, 1); | 553 NodeName node0_name(0, 1); |
466 TestNodeDelegate node0_delegate(node0_name); | 554 TestNodeDelegate node0_delegate(node0_name); |
467 Node node0(node0_name, &node0_delegate); | 555 Node node0(node0_name, &node0_delegate); |
468 node_map[0] = &node0; | 556 node_map[0] = &node0; |
469 | 557 |
470 PortRef a0, a1; | 558 PortRef a0, a1; |
471 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); | 559 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1)); |
472 | 560 |
473 ScopedMessage message; | 561 ScopedMessage message; |
(...skipping 872 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1346 | 1434 |
1347 // Expect everything to have gone away. | 1435 // Expect everything to have gone away. |
1348 EXPECT_TRUE(node0.CanShutdownCleanly(false)); | 1436 EXPECT_TRUE(node0.CanShutdownCleanly(false)); |
1349 EXPECT_TRUE(node1.CanShutdownCleanly(false)); | 1437 EXPECT_TRUE(node1.CanShutdownCleanly(false)); |
1350 } | 1438 } |
1351 | 1439 |
1352 } // namespace test | 1440 } // namespace test |
1353 } // namespace ports | 1441 } // namespace ports |
1354 } // namespace edk | 1442 } // namespace edk |
1355 } // namespace mojo | 1443 } // namespace mojo |
OLD | NEW |