Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(365)

Side by Side Diff: mojo/edk/system/ports/ports_unittest.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8
9 #include <map>
10 #include <queue>
11 #include <sstream>
12
13 #include "base/logging.h"
14 #include "mojo/edk/system/ports/node.h"
15 #include "mojo/edk/system/ports/node_delegate.h"
16 #include "testing/gtest/include/gtest/gtest.h"
17
18 namespace mojo {
19 namespace edk {
20 namespace ports {
21 namespace test {
22
23 namespace {
24
25 void LogMessage(const Message* message) {
26 std::stringstream ports;
27 for (size_t i = 0; i < message->num_ports(); ++i) {
28 if (i > 0)
29 ports << ",";
30 ports << message->ports()[i];
31 }
32 DVLOG(1) << "message: \""
33 << static_cast<const char*>(message->payload_bytes())
34 << "\" ports=[" << ports.str() << "]";
35 }
36
37 void ClosePortsInMessage(Node* node, Message* message) {
38 for (size_t i = 0; i < message->num_ports(); ++i) {
39 PortRef port;
40 ASSERT_EQ(OK, node->GetPort(message->ports()[i], &port));
41 EXPECT_EQ(OK, node->ClosePort(port));
42 }
43 }
44
45 class TestMessage : public Message {
46 public:
47 static ScopedMessage NewUserMessage(size_t num_payload_bytes,
48 size_t num_ports) {
49 return ScopedMessage(new TestMessage(num_payload_bytes, num_ports));
50 }
51
52 TestMessage(size_t num_payload_bytes, size_t num_ports)
53 : Message(num_payload_bytes, num_ports) {
54 start_ = new char[num_header_bytes_ + num_ports_bytes_ + num_payload_bytes];
55 InitializeUserMessageHeader(start_);
56 }
57
58 TestMessage(size_t num_header_bytes,
59 size_t num_payload_bytes,
60 size_t num_ports_bytes)
61 : Message(num_header_bytes,
62 num_payload_bytes,
63 num_ports_bytes) {
64 start_ = new char[num_header_bytes + num_payload_bytes + num_ports_bytes];
65 }
66
67 ~TestMessage() override {
68 delete[] start_;
69 }
70 };
71
72 struct Task {
73 Task(NodeName node_name, ScopedMessage message)
74 : node_name(node_name),
75 message(std::move(message)),
76 priority(rand()) {
77 }
78
79 NodeName node_name;
80 ScopedMessage message;
81 int32_t priority;
82 };
83
84 struct TaskComparator {
85 bool operator()(const Task* a, const Task* b) {
86 return a->priority < b->priority;
87 }
88 };
89
90 std::priority_queue<Task*, std::vector<Task*>, TaskComparator> task_queue;
91 Node* node_map[2];
92
93 Node* GetNode(const NodeName& name) {
94 return node_map[name.v1];
95 }
96
97 void SetNode(const NodeName& name, Node* node) {
98 node_map[name.v1] = node;
99 }
100
101 void PumpTasks() {
102 while (!task_queue.empty()) {
103 Task* task = task_queue.top();
104 task_queue.pop();
105
106 Node* node = GetNode(task->node_name);
107 node->AcceptMessage(std::move(task->message));
108
109 delete task;
110 }
111 }
112
113 void DiscardPendingTasks() {
114 while (!task_queue.empty()) {
115 Task* task = task_queue.top();
116 task_queue.pop();
117 delete task;
118 }
119 }
120
121 int SendStringMessage(Node* node, const PortRef& port, const std::string& s) {
122 size_t size = s.size() + 1;
123 ScopedMessage message = TestMessage::NewUserMessage(size, 0);
124 memcpy(message->mutable_payload_bytes(), s.data(), size);
125 return node->SendMessage(port, &message);
126 }
127
128 int SendStringMessageWithPort(Node* node,
129 const PortRef& port,
130 const std::string& s,
131 const PortName& sent_port_name) {
132 size_t size = s.size() + 1;
133 ScopedMessage message = TestMessage::NewUserMessage(size, 1);
134 memcpy(message->mutable_payload_bytes(), s.data(), size);
135 message->mutable_ports()[0] = sent_port_name;
136 return node->SendMessage(port ,&message);
137 }
138
139 int SendStringMessageWithPort(Node* node,
140 const PortRef& port,
141 const std::string& s,
142 const PortRef& sent_port) {
143 return SendStringMessageWithPort(node, port, s, sent_port.name());
144 }
145
146 const char* ToString(const ScopedMessage& message) {
147 return static_cast<const char*>(message->payload_bytes());
148 }
149
150 class TestNodeDelegate : public NodeDelegate {
151 public:
152 explicit TestNodeDelegate(const NodeName& node_name)
153 : node_name_(node_name),
154 drop_messages_(false),
155 read_messages_(true),
156 save_messages_(false) {
157 }
158
159 void set_drop_messages(bool value) { drop_messages_ = value; }
160 void set_read_messages(bool value) { read_messages_ = value; }
161 void set_save_messages(bool value) { save_messages_ = value; }
162
163 bool GetSavedMessage(ScopedMessage* message) {
164 if (saved_messages_.empty()) {
165 message->reset();
166 return false;
167 }
168 *message = std::move(saved_messages_.front());
169 saved_messages_.pop();
170 return true;
171 }
172
173 void GenerateRandomPortName(PortName* port_name) override {
174 static uint64_t next_port_name = 1;
175 port_name->v1 = next_port_name++;
176 port_name->v2 = 0;
177 }
178
179 void AllocMessage(size_t num_header_bytes, ScopedMessage* message) override {
180 message->reset(new TestMessage(num_header_bytes, 0, 0));
181 }
182
183 void ForwardMessage(const NodeName& node_name,
184 ScopedMessage message) override {
185 if (drop_messages_) {
186 DVLOG(1) << "Dropping ForwardMessage from node "
187 << node_name_ << " to " << node_name;
188 ClosePortsInMessage(GetNode(node_name), message.get());
189 return;
190 }
191 DVLOG(1) << "ForwardMessage from node "
192 << node_name_ << " to " << node_name;
193 task_queue.push(new Task(node_name, std::move(message)));
194 }
195
196 void PortStatusChanged(const PortRef& port) override {
197 DVLOG(1) << "PortStatusChanged for " << port.name() << "@" << node_name_;
198 if (!read_messages_)
199 return;
200 Node* node = GetNode(node_name_);
201 for (;;) {
202 ScopedMessage message;
203 int rv = node->GetMessage(port, &message);
204 EXPECT_TRUE(rv == OK || rv == ERROR_PORT_PEER_CLOSED);
205 if (rv == ERROR_PORT_PEER_CLOSED || !message)
206 break;
207 if (save_messages_) {
208 SaveMessage(std::move(message));
209 } else {
210 LogMessage(message.get());
211 for (size_t i = 0; i < message->num_ports(); ++i) {
212 std::stringstream buf;
213 buf << "got port: " << message->ports()[i];
214
215 PortRef received_port;
216 node->GetPort(message->ports()[i], &received_port);
217
218 SendStringMessage(node, received_port, buf.str());
219
220 // Avoid leaking these ports.
221 node->ClosePort(received_port);
222 }
223 }
224 }
225 }
226
227 private:
228 void SaveMessage(ScopedMessage message) {
229 saved_messages_.emplace(std::move(message));
230 }
231
232 std::queue<ScopedMessage> saved_messages_;
233 NodeName node_name_;
234 bool drop_messages_;
235 bool read_messages_;
236 bool save_messages_;
237 };
238
239 class PortsTest : public testing::Test {
240 public:
241 PortsTest() {
242 SetNode(NodeName(0, 1), nullptr);
243 SetNode(NodeName(1, 1), nullptr);
244 }
245
246 ~PortsTest() override {
247 DiscardPendingTasks();
248 SetNode(NodeName(0, 1), nullptr);
249 SetNode(NodeName(1, 1), nullptr);
250 }
251 };
252
253 } // namespace
254
255 TEST_F(PortsTest, Basic1) {
256 NodeName node0_name(0, 1);
257 TestNodeDelegate node0_delegate(node0_name);
258 Node node0(node0_name, &node0_delegate);
259 SetNode(node0_name, &node0);
260
261 NodeName node1_name(1, 1);
262 TestNodeDelegate node1_delegate(node1_name);
263 Node node1(node1_name, &node1_delegate);
264 SetNode(node1_name, &node1);
265
266 // Setup pipe between node0 and node1.
267 PortRef x0, x1;
268 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
269 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
270 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
271 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
272
273 // Transfer a port from node0 to node1.
274 PortRef a0, a1;
275 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
276 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
277
278 EXPECT_EQ(OK, node0.ClosePort(a0));
279
280 EXPECT_EQ(OK, node0.ClosePort(x0));
281 EXPECT_EQ(OK, node1.ClosePort(x1));
282
283 PumpTasks();
284
285 EXPECT_TRUE(node0.CanShutdownCleanly(false));
286 EXPECT_TRUE(node1.CanShutdownCleanly(false));
287 }
288
289 TEST_F(PortsTest, Basic2) {
290 NodeName node0_name(0, 1);
291 TestNodeDelegate node0_delegate(node0_name);
292 Node node0(node0_name, &node0_delegate);
293 SetNode(node0_name, &node0);
294
295 NodeName node1_name(1, 1);
296 TestNodeDelegate node1_delegate(node1_name);
297 Node node1(node1_name, &node1_delegate);
298 SetNode(node1_name, &node1);
299
300 // Setup pipe between node0 and node1.
301 PortRef x0, x1;
302 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
303 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
304 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
305 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
306
307 PortRef b0, b1;
308 EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
309 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", b1));
310 EXPECT_EQ(OK, SendStringMessage(&node0, b0, "hello again"));
311
312 // This may cause a SendMessage(b1) failure.
313 EXPECT_EQ(OK, node0.ClosePort(b0));
314
315 EXPECT_EQ(OK, node0.ClosePort(x0));
316 EXPECT_EQ(OK, node1.ClosePort(x1));
317
318 PumpTasks();
319
320 EXPECT_TRUE(node0.CanShutdownCleanly(false));
321 EXPECT_TRUE(node1.CanShutdownCleanly(false));
322 }
323
324 TEST_F(PortsTest, Basic3) {
325 NodeName node0_name(0, 1);
326 TestNodeDelegate node0_delegate(node0_name);
327 Node node0(node0_name, &node0_delegate);
328 SetNode(node0_name, &node0);
329
330 NodeName node1_name(1, 1);
331 TestNodeDelegate node1_delegate(node1_name);
332 Node node1(node1_name, &node1_delegate);
333 SetNode(node1_name, &node1);
334
335 // Setup pipe between node0 and node1.
336 PortRef x0, x1;
337 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
338 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
339 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
340 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
341
342 // Transfer a port from node0 to node1.
343 PortRef a0, a1;
344 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
345 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "hello", a1));
346 EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello again"));
347
348 // Transfer a0 as well.
349 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a0));
350
351 PortRef b0, b1;
352 EXPECT_EQ(OK, node0.CreatePortPair(&b0, &b1));
353 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "bar", b1));
354 EXPECT_EQ(OK, SendStringMessage(&node0, b0, "baz"));
355
356 // This may cause a SendMessage(b1) failure.
357 EXPECT_EQ(OK, node0.ClosePort(b0));
358
359 EXPECT_EQ(OK, node0.ClosePort(x0));
360 EXPECT_EQ(OK, node1.ClosePort(x1));
361
362 PumpTasks();
363
364 EXPECT_TRUE(node0.CanShutdownCleanly(false));
365 EXPECT_TRUE(node1.CanShutdownCleanly(false));
366 }
367
368 TEST_F(PortsTest, LostConnectionToNode1) {
369 NodeName node0_name(0, 1);
370 TestNodeDelegate node0_delegate(node0_name);
371 Node node0(node0_name, &node0_delegate);
372 SetNode(node0_name, &node0);
373
374 NodeName node1_name(1, 1);
375 TestNodeDelegate node1_delegate(node1_name);
376 Node node1(node1_name, &node1_delegate);
377 SetNode(node1_name, &node1);
378
379 // Setup pipe between node0 and node1.
380 PortRef x0, x1;
381 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
382 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
383 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
384 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
385
386 // Transfer port to node1 and simulate a lost connection to node1. Dropping
387 // events from node1 is how we simulate the lost connection.
388
389 node1_delegate.set_drop_messages(true);
390
391 PortRef a0, a1;
392 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
393 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "foo", a1));
394
395 PumpTasks();
396
397 EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
398
399 PumpTasks();
400
401 EXPECT_EQ(OK, node0.ClosePort(a0));
402 EXPECT_EQ(OK, node0.ClosePort(x0));
403 EXPECT_EQ(OK, node1.ClosePort(x1));
404
405 PumpTasks();
406
407 EXPECT_TRUE(node0.CanShutdownCleanly(false));
408 EXPECT_TRUE(node1.CanShutdownCleanly(false));
409 }
410
411 TEST_F(PortsTest, LostConnectionToNode2) {
412 NodeName node0_name(0, 1);
413 TestNodeDelegate node0_delegate(node0_name);
414 Node node0(node0_name, &node0_delegate);
415 node_map[0] = &node0;
416
417 NodeName node1_name(1, 1);
418 TestNodeDelegate node1_delegate(node1_name);
419 Node node1(node1_name, &node1_delegate);
420 node_map[1] = &node1;
421
422 // Setup pipe between node0 and node1.
423 PortRef x0, x1;
424 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
425 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
426 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
427 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
428
429 node1_delegate.set_read_messages(false);
430
431 PortRef a0, a1;
432 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
433 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "take a1", a1));
434
435 PumpTasks();
436
437 node1_delegate.set_drop_messages(true);
438
439 EXPECT_EQ(OK, node0.LostConnectionToNode(node1_name));
440
441 PumpTasks();
442
443 ScopedMessage message;
444 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
445 EXPECT_FALSE(message);
446
447 EXPECT_EQ(OK, node0.ClosePort(a0));
448
449 EXPECT_EQ(OK, node0.ClosePort(x0));
450
451 EXPECT_EQ(OK, node1.GetMessage(x1, &message));
452 EXPECT_TRUE(message);
453 ClosePortsInMessage(&node1, message.get());
454
455 EXPECT_EQ(OK, node1.ClosePort(x1));
456
457 PumpTasks();
458
459 EXPECT_TRUE(node0.CanShutdownCleanly(false));
460 EXPECT_TRUE(node1.CanShutdownCleanly(false));
461 }
462
463 TEST_F(PortsTest, GetMessage1) {
464 NodeName node0_name(0, 1);
465 TestNodeDelegate node0_delegate(node0_name);
466 Node node0(node0_name, &node0_delegate);
467 node_map[0] = &node0;
468
469 PortRef a0, a1;
470 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
471
472 ScopedMessage message;
473 EXPECT_EQ(OK, node0.GetMessage(a0, &message));
474 EXPECT_FALSE(message);
475
476 EXPECT_EQ(OK, node0.ClosePort(a1));
477
478 EXPECT_EQ(OK, node0.GetMessage(a0, &message));
479 EXPECT_FALSE(message);
480
481 PumpTasks();
482
483 EXPECT_EQ(ERROR_PORT_PEER_CLOSED, node0.GetMessage(a0, &message));
484 EXPECT_FALSE(message);
485
486 EXPECT_EQ(OK, node0.ClosePort(a0));
487
488 EXPECT_TRUE(node0.CanShutdownCleanly(false));
489 }
490
491 TEST_F(PortsTest, GetMessage2) {
492 NodeName node0_name(0, 1);
493 TestNodeDelegate node0_delegate(node0_name);
494 Node node0(node0_name, &node0_delegate);
495 node_map[0] = &node0;
496
497 node0_delegate.set_read_messages(false);
498
499 PortRef a0, a1;
500 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
501
502 EXPECT_EQ(OK, SendStringMessage(&node0, a1, "1"));
503
504 ScopedMessage message;
505 EXPECT_EQ(OK, node0.GetMessage(a0, &message));
506
507 ASSERT_TRUE(message);
508 EXPECT_EQ(0, strcmp("1", ToString(message)));
509
510 EXPECT_EQ(OK, node0.ClosePort(a0));
511 EXPECT_EQ(OK, node0.ClosePort(a1));
512
513 EXPECT_TRUE(node0.CanShutdownCleanly(false));
514 }
515
516 TEST_F(PortsTest, GetMessage3) {
517 NodeName node0_name(0, 1);
518 TestNodeDelegate node0_delegate(node0_name);
519 Node node0(node0_name, &node0_delegate);
520 node_map[0] = &node0;
521
522 node0_delegate.set_read_messages(false);
523
524 PortRef a0, a1;
525 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
526
527 const char* kStrings[] = {
528 "1",
529 "2",
530 "3"
531 };
532
533 for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i)
534 EXPECT_EQ(OK, SendStringMessage(&node0, a1, kStrings[i]));
535
536 ScopedMessage message;
537 for (size_t i = 0; i < sizeof(kStrings)/sizeof(kStrings[0]); ++i) {
538 EXPECT_EQ(OK, node0.GetMessage(a0, &message));
539 ASSERT_TRUE(message);
540 EXPECT_EQ(0, strcmp(kStrings[i], ToString(message)));
541 DVLOG(1) << "got " << kStrings[i];
542 }
543
544 EXPECT_EQ(OK, node0.ClosePort(a0));
545 EXPECT_EQ(OK, node0.ClosePort(a1));
546
547 EXPECT_TRUE(node0.CanShutdownCleanly(false));
548 }
549
550 TEST_F(PortsTest, Delegation1) {
551 NodeName node0_name(0, 1);
552 TestNodeDelegate node0_delegate(node0_name);
553 Node node0(node0_name, &node0_delegate);
554 SetNode(node0_name, &node0);
555
556 NodeName node1_name(1, 1);
557 TestNodeDelegate node1_delegate(node1_name);
558 Node node1(node1_name, &node1_delegate);
559 node_map[1] = &node1;
560
561 node0_delegate.set_save_messages(true);
562 node1_delegate.set_save_messages(true);
563
564 // Setup pipe between node0 and node1.
565 PortRef x0, x1;
566 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
567 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
568 EXPECT_EQ(OK, node0.InitializePort(x0, node1_name, x1.name()));
569 EXPECT_EQ(OK, node1.InitializePort(x1, node0_name, x0.name()));
570
571 // In this test, we send a message to a port that has been moved.
572
573 PortRef a0, a1;
574 EXPECT_EQ(OK, node0.CreatePortPair(&a0, &a1));
575
576 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, x0, "a1", a1));
577
578 PumpTasks();
579
580 ScopedMessage message;
581 ASSERT_TRUE(node1_delegate.GetSavedMessage(&message));
582
583 ASSERT_EQ(1u, message->num_ports());
584
585 // This is "a1" from the point of view of node1.
586 PortName a2_name = message->ports()[0];
587
588 EXPECT_EQ(OK, SendStringMessageWithPort(&node1, x1, "a2", a2_name));
589
590 PumpTasks();
591
592 EXPECT_EQ(OK, SendStringMessage(&node0, a0, "hello"));
593
594 PumpTasks();
595
596 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
597
598 ASSERT_EQ(1u, message->num_ports());
599
600 // This is "a2" from the point of view of node1.
601 PortName a3_name = message->ports()[0];
602
603 PortRef a3;
604 EXPECT_EQ(OK, node0.GetPort(a3_name, &a3));
605
606 EXPECT_EQ(0, strcmp("a2", ToString(message)));
607
608 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
609
610 EXPECT_EQ(0u, message->num_ports());
611 EXPECT_EQ(0, strcmp("hello", ToString(message)));
612
613 EXPECT_EQ(OK, node0.ClosePort(a0));
614 EXPECT_EQ(OK, node0.ClosePort(a3));
615
616 EXPECT_EQ(OK, node0.ClosePort(x0));
617 EXPECT_EQ(OK, node1.ClosePort(x1));
618
619 EXPECT_TRUE(node0.CanShutdownCleanly(false));
620 EXPECT_TRUE(node1.CanShutdownCleanly(false));
621 }
622
623 TEST_F(PortsTest, Delegation2) {
624 NodeName node0_name(0, 1);
625 TestNodeDelegate node0_delegate(node0_name);
626 Node node0(node0_name, &node0_delegate);
627 SetNode(node0_name, &node0);
628
629 NodeName node1_name(1, 1);
630 TestNodeDelegate node1_delegate(node1_name);
631 Node node1(node1_name, &node1_delegate);
632 node_map[1] = &node1;
633
634 node0_delegate.set_save_messages(true);
635 node1_delegate.set_save_messages(true);
636
637 for (int i = 0; i < 10; ++i) {
638 // Setup pipe a<->b between node0 and node1.
639 PortRef A, B;
640 EXPECT_EQ(OK, node0.CreateUninitializedPort(&A));
641 EXPECT_EQ(OK, node1.CreateUninitializedPort(&B));
642 EXPECT_EQ(OK, node0.InitializePort(A, node1_name, B.name()));
643 EXPECT_EQ(OK, node1.InitializePort(B, node0_name, A.name()));
644
645 PortRef C, D;
646 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
647
648 PortRef E, F;
649 EXPECT_EQ(OK, node0.CreatePortPair(&E, &F));
650
651 // Pass D over A to B.
652 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "1", D));
653
654 // Pass F over C to D.
655 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, C, "1", F));
656
657 // This message should find its way to node1.
658 EXPECT_EQ(OK, SendStringMessage(&node0, E, "hello"));
659
660 PumpTasks();
661
662 EXPECT_EQ(OK, node0.ClosePort(C));
663 EXPECT_EQ(OK, node0.ClosePort(E));
664
665 EXPECT_EQ(OK, node0.ClosePort(A));
666 EXPECT_EQ(OK, node1.ClosePort(B));
667
668 for (;;) {
669 ScopedMessage message;
670 if (node1_delegate.GetSavedMessage(&message)) {
671 ClosePortsInMessage(&node1, message.get());
672 if (strcmp("hello", ToString(message)) == 0)
673 break;
674 } else {
675 ASSERT_TRUE(false); // "hello" message not delivered!
676 break;
677 }
678 }
679
680 PumpTasks(); // Because ClosePort may have generated tasks.
681 }
682
683 EXPECT_TRUE(node0.CanShutdownCleanly(false));
684 EXPECT_TRUE(node1.CanShutdownCleanly(false));
685 }
686
687 TEST_F(PortsTest, SendUninitialized) {
688 NodeName node0_name(0, 1);
689 TestNodeDelegate node0_delegate(node0_name);
690 Node node0(node0_name, &node0_delegate);
691 node_map[0] = &node0;
692
693 NodeName node1_name(1, 1);
694 TestNodeDelegate node1_delegate(node1_name);
695 Node node1(node1_name, &node1_delegate);
696 node_map[1] = &node1;
697
698 // Begin to setup a pipe between node0 and node1, but don't initialize either
699 // endpoint.
700 PortRef x0, x1;
701 EXPECT_EQ(OK, node0.CreateUninitializedPort(&x0));
702 EXPECT_EQ(OK, node1.CreateUninitializedPort(&x1));
703
704 node0_delegate.set_save_messages(true);
705 node1_delegate.set_save_messages(true);
706
707 // Send a message on each port and expect neither to arrive yet.
708
709 EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
710 SendStringMessage(&node0, x0, "oops"));
711 EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED,
712 SendStringMessage(&node1, x1, "oh well"));
713
714 EXPECT_EQ(OK, node0.ClosePort(x0));
715 EXPECT_EQ(OK, node1.ClosePort(x1));
716
717 EXPECT_TRUE(node0.CanShutdownCleanly(false));
718 EXPECT_TRUE(node1.CanShutdownCleanly(false));
719 }
720
721 TEST_F(PortsTest, SendFailure) {
722 NodeName node0_name(0, 1);
723 TestNodeDelegate node0_delegate(node0_name);
724 Node node0(node0_name, &node0_delegate);
725 node_map[0] = &node0;
726
727 node0_delegate.set_save_messages(true);
728
729 PortRef A, B;
730 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
731
732 // Try to send A over itself.
733
734 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
735 SendStringMessageWithPort(&node0, A, "oops", A));
736
737 // Try to send B over A.
738
739 EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
740 SendStringMessageWithPort(&node0, A, "nope", B));
741
742 PumpTasks();
743
744 // There should have been no messages accepted.
745 ScopedMessage message;
746 EXPECT_FALSE(node0_delegate.GetSavedMessage(&message));
747
748 // Both A and B should still work.
749
750 EXPECT_EQ(OK, SendStringMessage(&node0, A, "hi"));
751 EXPECT_EQ(OK, SendStringMessage(&node0, B, "hey"));
752
753 PumpTasks();
754
755 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
756 EXPECT_EQ(0, strcmp("hi", ToString(message)));
757 ClosePortsInMessage(&node0, message.get());
758
759 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
760 EXPECT_EQ(0, strcmp("hey", ToString(message)));
761 ClosePortsInMessage(&node0, message.get());
762
763 PumpTasks();
764
765 EXPECT_EQ(OK, node0.ClosePort(A));
766 EXPECT_EQ(OK, node0.ClosePort(B));
767
768 PumpTasks();
769
770 EXPECT_TRUE(node0.CanShutdownCleanly(false));
771 }
772
773 TEST_F(PortsTest, DontLeakUnreceivedPorts) {
774 NodeName node0_name(0, 1);
775 TestNodeDelegate node0_delegate(node0_name);
776 Node node0(node0_name, &node0_delegate);
777 node_map[0] = &node0;
778
779 node0_delegate.set_read_messages(false);
780
781 PortRef A, B;
782 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
783
784 PortRef C, D;
785 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
786
787 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
788
789 PumpTasks();
790
791 EXPECT_EQ(OK, node0.ClosePort(C));
792
793 EXPECT_EQ(OK, node0.ClosePort(A));
794 EXPECT_EQ(OK, node0.ClosePort(B));
795
796 PumpTasks();
797
798 EXPECT_TRUE(node0.CanShutdownCleanly(false));
799 }
800
801 TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
802 NodeName node0_name(0, 1);
803 TestNodeDelegate node0_delegate(node0_name);
804 Node node0(node0_name, &node0_delegate);
805 node_map[0] = &node0;
806
807 node0_delegate.set_save_messages(true);
808
809 PortRef A, B;
810 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
811
812 PortRef C, D;
813 EXPECT_EQ(OK, node0.CreatePortPair(&C, &D));
814
815 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, A, "foo", D));
816
817 ScopedMessage message;
818 EXPECT_TRUE(node0_delegate.GetSavedMessage(&message));
819 ASSERT_EQ(1u, message->num_ports());
820
821 PortRef E;
822 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
823
824 EXPECT_TRUE(node0.CanShutdownCleanly(true));
825
826 PumpTasks();
827
828 EXPECT_TRUE(node0.CanShutdownCleanly(true));
829 EXPECT_FALSE(node0.CanShutdownCleanly(false));
830
831 EXPECT_EQ(OK, node0.ClosePort(A));
832 EXPECT_EQ(OK, node0.ClosePort(B));
833 EXPECT_EQ(OK, node0.ClosePort(C));
834 EXPECT_EQ(OK, node0.ClosePort(E));
835
836 PumpTasks();
837
838 EXPECT_TRUE(node0.CanShutdownCleanly(false));
839 }
840
841 TEST_F(PortsTest, ProxyCollapse1) {
842 NodeName node0_name(0, 1);
843 TestNodeDelegate node0_delegate(node0_name);
844 Node node0(node0_name, &node0_delegate);
845 node_map[0] = &node0;
846
847 node0_delegate.set_save_messages(true);
848
849 PortRef A, B;
850 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
851
852 PortRef X, Y;
853 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
854
855 ScopedMessage message;
856
857 // Send B and receive it as C.
858 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
859 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
860 ASSERT_EQ(1u, message->num_ports());
861 PortRef C;
862 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
863
864 // Send C and receive it as D.
865 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
866 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
867 ASSERT_EQ(1u, message->num_ports());
868 PortRef D;
869 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
870
871 // Send D and receive it as E.
872 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", D));
873 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
874 ASSERT_EQ(1u, message->num_ports());
875 PortRef E;
876 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
877
878 EXPECT_EQ(OK, node0.ClosePort(X));
879 EXPECT_EQ(OK, node0.ClosePort(Y));
880
881 EXPECT_EQ(OK, node0.ClosePort(A));
882 EXPECT_EQ(OK, node0.ClosePort(E));
883
884 PumpTasks();
885
886 EXPECT_TRUE(node0.CanShutdownCleanly(false));
887 }
888
889 TEST_F(PortsTest, ProxyCollapse2) {
890 NodeName node0_name(0, 1);
891 TestNodeDelegate node0_delegate(node0_name);
892 Node node0(node0_name, &node0_delegate);
893 node_map[0] = &node0;
894
895 node0_delegate.set_save_messages(true);
896
897 PortRef A, B;
898 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
899
900 PortRef X, Y;
901 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
902
903 ScopedMessage message;
904
905 // Send B and receive it as C.
906 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
907 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
908 ASSERT_EQ(1u, message->num_ports());
909 PortRef C;
910 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
911
912 // Send A and receive it as D.
913 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
914 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
915 ASSERT_EQ(1u, message->num_ports());
916 PortRef D;
917 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
918
919 // At this point we have a scenario with:
920 //
921 // D -> [B] -> C -> [A]
922 //
923 // Ensure that the proxies can collapse.
924
925 EXPECT_EQ(OK, node0.ClosePort(X));
926 EXPECT_EQ(OK, node0.ClosePort(Y));
927
928 EXPECT_EQ(OK, node0.ClosePort(C));
929 EXPECT_EQ(OK, node0.ClosePort(D));
930
931 PumpTasks();
932
933 EXPECT_TRUE(node0.CanShutdownCleanly(false));
934 }
935
936 TEST_F(PortsTest, SendWithClosedPeer) {
937 // This tests that if a port is sent when its peer is already known to be
938 // closed, the newly created port will be aware of that peer closure, and the
939 // proxy will eventually collapse.
940
941 NodeName node0_name(0, 1);
942 TestNodeDelegate node0_delegate(node0_name);
943 Node node0(node0_name, &node0_delegate);
944 node_map[0] = &node0;
945
946 node0_delegate.set_read_messages(false);
947
948 // Send a message from A to B, then close A.
949 PortRef A, B;
950 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
951 EXPECT_EQ(OK, SendStringMessage(&node0, A, "hey"));
952 EXPECT_EQ(OK, node0.ClosePort(A));
953
954 PumpTasks();
955
956 // Now send B over X-Y as new port C.
957 PortRef X, Y;
958 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
959
960 node0_delegate.set_read_messages(true);
961 node0_delegate.set_save_messages(true);
962 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
963
964 EXPECT_EQ(OK, node0.ClosePort(X));
965 EXPECT_EQ(OK, node0.ClosePort(Y));
966
967 ScopedMessage message;
968 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
969 ASSERT_EQ(1u, message->num_ports());
970
971 PortRef C;
972 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
973
974 PumpTasks();
975
976 // C should receive the message originally sent to B, and it should also be
977 // aware of A's closure.
978
979 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
980 EXPECT_EQ(0, strcmp("hey", ToString(message)));
981
982 PortStatus status;
983 EXPECT_EQ(OK, node0.GetStatus(C, &status));
984 EXPECT_FALSE(status.receiving_messages);
985 EXPECT_FALSE(status.has_messages);
986 EXPECT_TRUE(status.peer_closed);
987
988 node0.ClosePort(C);
989
990 EXPECT_TRUE(node0.CanShutdownCleanly(false));
991 }
992
993 TEST_F(PortsTest, SendWithClosedPeerSent) {
994 // This tests that if a port is closed while some number of proxies are still
995 // routing messages (directly or indirectly) to it, that the peer port is
996 // eventually notified of the closure, and the dead-end proxies will
997 // eventually be removed.
998
999 NodeName node0_name(0, 1);
1000 TestNodeDelegate node0_delegate(node0_name);
1001 Node node0(node0_name, &node0_delegate);
1002 node_map[0] = &node0;
1003
1004 node0_delegate.set_save_messages(true);
1005
1006 PortRef X, Y;
1007 EXPECT_EQ(OK, node0.CreatePortPair(&X, &Y));
1008
1009 PortRef A, B;
1010 EXPECT_EQ(OK, node0.CreatePortPair(&A, &B));
1011
1012 ScopedMessage message;
1013
1014 // Send A as new port C.
1015 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", A));
1016 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1017 ASSERT_EQ(1u, message->num_ports());
1018 PortRef C;
1019 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &C));
1020
1021 // Send C as new port D.
1022 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", C));
1023 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1024 ASSERT_EQ(1u, message->num_ports());
1025 PortRef D;
1026 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &D));
1027
1028 node0_delegate.set_read_messages(false);
1029
1030 // Send a message to B through D, then close D.
1031 EXPECT_EQ(OK, SendStringMessage(&node0, D, "hey"));
1032 EXPECT_EQ(OK, node0.ClosePort(D));
1033
1034 PumpTasks();
1035
1036 // Now send B as new port E.
1037
1038 node0_delegate.set_read_messages(true);
1039 EXPECT_EQ(OK, SendStringMessageWithPort(&node0, X, "foo", B));
1040
1041 EXPECT_EQ(OK, node0.ClosePort(X));
1042 EXPECT_EQ(OK, node0.ClosePort(Y));
1043
1044 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1045 ASSERT_EQ(1u, message->num_ports());
1046
1047 PortRef E;
1048 ASSERT_EQ(OK, node0.GetPort(message->ports()[0], &E));
1049
1050 PumpTasks();
1051
1052 // E should receive the message originally sent to B, and it should also be
1053 // aware of D's closure.
1054
1055 ASSERT_TRUE(node0_delegate.GetSavedMessage(&message));
1056 EXPECT_EQ(0, strcmp("hey", ToString(message)));
1057
1058 PortStatus status;
1059 EXPECT_EQ(OK, node0.GetStatus(E, &status));
1060 EXPECT_FALSE(status.receiving_messages);
1061 EXPECT_FALSE(status.has_messages);
1062 EXPECT_TRUE(status.peer_closed);
1063
1064 node0.ClosePort(E);
1065
1066 PumpTasks();
1067
1068 EXPECT_TRUE(node0.CanShutdownCleanly(false));
1069 }
1070
1071 } // namespace test
1072 } // namespace ports
1073 } // namespace edk
1074 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698