Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(31)

Side by Side Diff: mojo/system/remote_message_pipe_unittest.cc

Issue 621153003: Move mojo edk into mojo/edk (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix checkdeps Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/system/raw_channel_win.cc ('k') | mojo/system/run_all_unittests.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdint.h>
6 #include <stdio.h>
7 #include <string.h>
8
9 #include <vector>
10
11 #include "base/bind.h"
12 #include "base/files/file_path.h"
13 #include "base/files/file_util.h"
14 #include "base/files/scoped_file.h"
15 #include "base/files/scoped_temp_dir.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/macros.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/test/test_io_thread.h"
21 #include "base/threading/platform_thread.h" // For |Sleep()|.
22 #include "build/build_config.h" // TODO(vtl): Remove this.
23 #include "mojo/common/test/test_utils.h"
24 #include "mojo/embedder/platform_channel_pair.h"
25 #include "mojo/embedder/platform_shared_buffer.h"
26 #include "mojo/embedder/scoped_platform_handle.h"
27 #include "mojo/embedder/simple_platform_support.h"
28 #include "mojo/system/channel.h"
29 #include "mojo/system/channel_endpoint.h"
30 #include "mojo/system/message_pipe.h"
31 #include "mojo/system/message_pipe_dispatcher.h"
32 #include "mojo/system/platform_handle_dispatcher.h"
33 #include "mojo/system/raw_channel.h"
34 #include "mojo/system/shared_buffer_dispatcher.h"
35 #include "mojo/system/test_utils.h"
36 #include "mojo/system/waiter.h"
37 #include "testing/gtest/include/gtest/gtest.h"
38
39 namespace mojo {
40 namespace system {
41 namespace {
42
43 class RemoteMessagePipeTest : public testing::Test {
44 public:
45 RemoteMessagePipeTest() : io_thread_(base::TestIOThread::kAutoStart) {}
46 virtual ~RemoteMessagePipeTest() {}
47
48 virtual void SetUp() override {
49 io_thread_.PostTaskAndWait(
50 FROM_HERE,
51 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
52 base::Unretained(this)));
53 }
54
55 virtual void TearDown() override {
56 io_thread_.PostTaskAndWait(
57 FROM_HERE,
58 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
59 base::Unretained(this)));
60 }
61
62 protected:
63 // This connects the two given |ChannelEndpoint|s.
64 void ConnectChannelEndpoints(scoped_refptr<ChannelEndpoint> ep0,
65 scoped_refptr<ChannelEndpoint> ep1) {
66 io_thread_.PostTaskAndWait(
67 FROM_HERE,
68 base::Bind(&RemoteMessagePipeTest::ConnectChannelEndpointsOnIOThread,
69 base::Unretained(this),
70 ep0,
71 ep1));
72 }
73
74 // This bootstraps |ep| on |channels_[channel_index]|. It assumes/requires
75 // that this is the bootstrap case, i.e., that the endpoint IDs are both/will
76 // both be |Channel::kBootstrapEndpointId|. This returns *without* waiting for
77 // it to finish connecting.
78 void BootstrapChannelEndpointNoWait(unsigned channel_index,
79 scoped_refptr<ChannelEndpoint> ep) {
80 io_thread_.PostTask(
81 FROM_HERE,
82 base::Bind(&RemoteMessagePipeTest::BootstrapChannelEndpointOnIOThread,
83 base::Unretained(this),
84 channel_index,
85 ep));
86 }
87
88 void RestoreInitialState() {
89 io_thread_.PostTaskAndWait(
90 FROM_HERE,
91 base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
92 base::Unretained(this)));
93 }
94
95 embedder::PlatformSupport* platform_support() { return &platform_support_; }
96 base::TestIOThread* io_thread() { return &io_thread_; }
97
98 private:
99 void SetUpOnIOThread() {
100 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
101
102 embedder::PlatformChannelPair channel_pair;
103 platform_handles_[0] = channel_pair.PassServerHandle();
104 platform_handles_[1] = channel_pair.PassClientHandle();
105 }
106
107 void TearDownOnIOThread() {
108 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
109
110 if (channels_[0].get()) {
111 channels_[0]->Shutdown();
112 channels_[0] = nullptr;
113 }
114 if (channels_[1].get()) {
115 channels_[1]->Shutdown();
116 channels_[1] = nullptr;
117 }
118 }
119
120 void CreateAndInitChannel(unsigned channel_index) {
121 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
122 CHECK(channel_index == 0 || channel_index == 1);
123 CHECK(!channels_[channel_index].get());
124
125 channels_[channel_index] = new Channel(&platform_support_);
126 CHECK(channels_[channel_index]->Init(
127 RawChannel::Create(platform_handles_[channel_index].Pass())));
128 }
129
130 void ConnectChannelEndpointsOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
131 scoped_refptr<ChannelEndpoint> ep1) {
132 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
133
134 if (!channels_[0].get())
135 CreateAndInitChannel(0);
136 if (!channels_[1].get())
137 CreateAndInitChannel(1);
138
139 MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(ep0);
140 MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(ep1);
141
142 CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
143 CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
144 }
145
146 void BootstrapChannelEndpointOnIOThread(unsigned channel_index,
147 scoped_refptr<ChannelEndpoint> ep) {
148 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
149 CHECK(channel_index == 0 || channel_index == 1);
150
151 CreateAndInitChannel(channel_index);
152 MessageInTransit::EndpointId endpoint_id =
153 channels_[channel_index]->AttachEndpoint(ep);
154 if (endpoint_id == MessageInTransit::kInvalidEndpointId)
155 return;
156
157 CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
158 CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
159 Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
160 }
161
162 void RestoreInitialStateOnIOThread() {
163 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
164
165 TearDownOnIOThread();
166 SetUpOnIOThread();
167 }
168
169 embedder::SimplePlatformSupport platform_support_;
170 base::TestIOThread io_thread_;
171 embedder::ScopedPlatformHandle platform_handles_[2];
172 scoped_refptr<Channel> channels_[2];
173
174 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
175 };
176
177 TEST_F(RemoteMessagePipeTest, Basic) {
178 static const char kHello[] = "hello";
179 static const char kWorld[] = "world!!!1!!!1!";
180 char buffer[100] = {0};
181 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
182 Waiter waiter;
183 HandleSignalsState hss;
184 uint32_t context = 0;
185
186 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
187 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
188 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
189
190 scoped_refptr<ChannelEndpoint> ep0;
191 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
192 scoped_refptr<ChannelEndpoint> ep1;
193 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
194 ConnectChannelEndpoints(ep0, ep1);
195
196 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
197
198 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
199 // it later, it might already be readable.)
200 waiter.Init();
201 ASSERT_EQ(
202 MOJO_RESULT_OK,
203 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
204
205 // Write to MP 0, port 0.
206 EXPECT_EQ(MOJO_RESULT_OK,
207 mp0->WriteMessage(0,
208 UserPointer<const void>(kHello),
209 sizeof(kHello),
210 nullptr,
211 MOJO_WRITE_MESSAGE_FLAG_NONE));
212
213 // Wait.
214 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
215 EXPECT_EQ(123u, context);
216 hss = HandleSignalsState();
217 mp1->RemoveWaiter(1, &waiter, &hss);
218 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
219 hss.satisfied_signals);
220 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
221 hss.satisfiable_signals);
222
223 // Read from MP 1, port 1.
224 EXPECT_EQ(MOJO_RESULT_OK,
225 mp1->ReadMessage(1,
226 UserPointer<void>(buffer),
227 MakeUserPointer(&buffer_size),
228 nullptr,
229 nullptr,
230 MOJO_READ_MESSAGE_FLAG_NONE));
231 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
232 EXPECT_STREQ(kHello, buffer);
233
234 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
235
236 waiter.Init();
237 ASSERT_EQ(
238 MOJO_RESULT_OK,
239 mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
240
241 EXPECT_EQ(MOJO_RESULT_OK,
242 mp1->WriteMessage(1,
243 UserPointer<const void>(kWorld),
244 sizeof(kWorld),
245 nullptr,
246 MOJO_WRITE_MESSAGE_FLAG_NONE));
247
248 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
249 EXPECT_EQ(456u, context);
250 hss = HandleSignalsState();
251 mp0->RemoveWaiter(0, &waiter, &hss);
252 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
253 hss.satisfied_signals);
254 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
255 hss.satisfiable_signals);
256
257 buffer_size = static_cast<uint32_t>(sizeof(buffer));
258 EXPECT_EQ(MOJO_RESULT_OK,
259 mp0->ReadMessage(0,
260 UserPointer<void>(buffer),
261 MakeUserPointer(&buffer_size),
262 nullptr,
263 nullptr,
264 MOJO_READ_MESSAGE_FLAG_NONE));
265 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
266 EXPECT_STREQ(kWorld, buffer);
267
268 // Close MP 0, port 0.
269 mp0->Close(0);
270
271 // Try to wait for MP 1, port 1 to become readable. This will eventually fail
272 // when it realizes that MP 0, port 0 has been closed. (It may also fail
273 // immediately.)
274 waiter.Init();
275 hss = HandleSignalsState();
276 MojoResult result =
277 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
278 if (result == MOJO_RESULT_OK) {
279 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
280 waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
281 EXPECT_EQ(789u, context);
282 hss = HandleSignalsState();
283 mp1->RemoveWaiter(1, &waiter, &hss);
284 }
285 EXPECT_EQ(0u, hss.satisfied_signals);
286 EXPECT_EQ(0u, hss.satisfiable_signals);
287
288 // And MP 1, port 1.
289 mp1->Close(1);
290 }
291
292 TEST_F(RemoteMessagePipeTest, Multiplex) {
293 static const char kHello[] = "hello";
294 static const char kWorld[] = "world!!!1!!!1!";
295 char buffer[100] = {0};
296 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
297 Waiter waiter;
298 HandleSignalsState hss;
299 uint32_t context = 0;
300
301 // Connect message pipes as in the |Basic| test.
302
303 scoped_refptr<ChannelEndpoint> ep0;
304 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
305 scoped_refptr<ChannelEndpoint> ep1;
306 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
307 ConnectChannelEndpoints(ep0, ep1);
308
309 // Now put another message pipe on the channel.
310
311 scoped_refptr<ChannelEndpoint> ep2;
312 scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy(&ep2));
313 scoped_refptr<ChannelEndpoint> ep3;
314 scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal(&ep3));
315 ConnectChannelEndpoints(ep2, ep3);
316
317 // Write: MP 2, port 0 -> MP 3, port 1.
318
319 waiter.Init();
320 ASSERT_EQ(
321 MOJO_RESULT_OK,
322 mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
323
324 EXPECT_EQ(MOJO_RESULT_OK,
325 mp2->WriteMessage(0,
326 UserPointer<const void>(kHello),
327 sizeof(kHello),
328 nullptr,
329 MOJO_WRITE_MESSAGE_FLAG_NONE));
330
331 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
332 EXPECT_EQ(789u, context);
333 hss = HandleSignalsState();
334 mp3->RemoveWaiter(1, &waiter, &hss);
335 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
336 hss.satisfied_signals);
337 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
338 hss.satisfiable_signals);
339
340 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
341 buffer_size = static_cast<uint32_t>(sizeof(buffer));
342 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
343 mp0->ReadMessage(0,
344 UserPointer<void>(buffer),
345 MakeUserPointer(&buffer_size),
346 nullptr,
347 nullptr,
348 MOJO_READ_MESSAGE_FLAG_NONE));
349 buffer_size = static_cast<uint32_t>(sizeof(buffer));
350 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
351 mp1->ReadMessage(1,
352 UserPointer<void>(buffer),
353 MakeUserPointer(&buffer_size),
354 nullptr,
355 nullptr,
356 MOJO_READ_MESSAGE_FLAG_NONE));
357 buffer_size = static_cast<uint32_t>(sizeof(buffer));
358 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
359 mp2->ReadMessage(0,
360 UserPointer<void>(buffer),
361 MakeUserPointer(&buffer_size),
362 nullptr,
363 nullptr,
364 MOJO_READ_MESSAGE_FLAG_NONE));
365
366 // Read from MP 3, port 1.
367 buffer_size = static_cast<uint32_t>(sizeof(buffer));
368 EXPECT_EQ(MOJO_RESULT_OK,
369 mp3->ReadMessage(1,
370 UserPointer<void>(buffer),
371 MakeUserPointer(&buffer_size),
372 nullptr,
373 nullptr,
374 MOJO_READ_MESSAGE_FLAG_NONE));
375 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
376 EXPECT_STREQ(kHello, buffer);
377
378 // Write: MP 0, port 0 -> MP 1, port 1 again.
379
380 waiter.Init();
381 ASSERT_EQ(
382 MOJO_RESULT_OK,
383 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
384
385 EXPECT_EQ(MOJO_RESULT_OK,
386 mp0->WriteMessage(0,
387 UserPointer<const void>(kWorld),
388 sizeof(kWorld),
389 nullptr,
390 MOJO_WRITE_MESSAGE_FLAG_NONE));
391
392 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
393 EXPECT_EQ(123u, context);
394 hss = HandleSignalsState();
395 mp1->RemoveWaiter(1, &waiter, &hss);
396 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
397 hss.satisfied_signals);
398 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
399 hss.satisfiable_signals);
400
401 // Make sure there's nothing on the other ports.
402 buffer_size = static_cast<uint32_t>(sizeof(buffer));
403 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
404 mp0->ReadMessage(0,
405 UserPointer<void>(buffer),
406 MakeUserPointer(&buffer_size),
407 nullptr,
408 nullptr,
409 MOJO_READ_MESSAGE_FLAG_NONE));
410 buffer_size = static_cast<uint32_t>(sizeof(buffer));
411 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
412 mp2->ReadMessage(0,
413 UserPointer<void>(buffer),
414 MakeUserPointer(&buffer_size),
415 nullptr,
416 nullptr,
417 MOJO_READ_MESSAGE_FLAG_NONE));
418 buffer_size = static_cast<uint32_t>(sizeof(buffer));
419 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
420 mp3->ReadMessage(1,
421 UserPointer<void>(buffer),
422 MakeUserPointer(&buffer_size),
423 nullptr,
424 nullptr,
425 MOJO_READ_MESSAGE_FLAG_NONE));
426
427 buffer_size = static_cast<uint32_t>(sizeof(buffer));
428 EXPECT_EQ(MOJO_RESULT_OK,
429 mp1->ReadMessage(1,
430 UserPointer<void>(buffer),
431 MakeUserPointer(&buffer_size),
432 nullptr,
433 nullptr,
434 MOJO_READ_MESSAGE_FLAG_NONE));
435 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
436 EXPECT_STREQ(kWorld, buffer);
437
438 mp0->Close(0);
439 mp1->Close(1);
440 mp2->Close(0);
441 mp3->Close(1);
442 }
443
444 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
445 static const char kHello[] = "hello";
446 char buffer[100] = {0};
447 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
448 Waiter waiter;
449 HandleSignalsState hss;
450 uint32_t context = 0;
451
452 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
453 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
454 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
455
456 scoped_refptr<ChannelEndpoint> ep0;
457 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
458
459 // Write to MP 0, port 0.
460 EXPECT_EQ(MOJO_RESULT_OK,
461 mp0->WriteMessage(0,
462 UserPointer<const void>(kHello),
463 sizeof(kHello),
464 nullptr,
465 MOJO_WRITE_MESSAGE_FLAG_NONE));
466
467 BootstrapChannelEndpointNoWait(0, ep0);
468
469 // Close MP 0, port 0 before channel 1 is even connected.
470 mp0->Close(0);
471
472 scoped_refptr<ChannelEndpoint> ep1;
473 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
474
475 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
476 // it later, it might already be readable.)
477 waiter.Init();
478 ASSERT_EQ(
479 MOJO_RESULT_OK,
480 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
481
482 BootstrapChannelEndpointNoWait(1, ep1);
483
484 // Wait.
485 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
486 EXPECT_EQ(123u, context);
487 hss = HandleSignalsState();
488 // Note: MP 1, port 1 should definitely should be readable, but it may or may
489 // not appear as writable (there's a race, and it may not have noticed that
490 // the other side was closed yet -- e.g., inserting a sleep here would make it
491 // much more likely to notice that it's no longer writable).
492 mp1->RemoveWaiter(1, &waiter, &hss);
493 EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
494 EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
495
496 // Read from MP 1, port 1.
497 EXPECT_EQ(MOJO_RESULT_OK,
498 mp1->ReadMessage(1,
499 UserPointer<void>(buffer),
500 MakeUserPointer(&buffer_size),
501 nullptr,
502 nullptr,
503 MOJO_READ_MESSAGE_FLAG_NONE));
504 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
505 EXPECT_STREQ(kHello, buffer);
506
507 // And MP 1, port 1.
508 mp1->Close(1);
509 }
510
511 TEST_F(RemoteMessagePipeTest, HandlePassing) {
512 static const char kHello[] = "hello";
513 Waiter waiter;
514 HandleSignalsState hss;
515 uint32_t context = 0;
516
517 scoped_refptr<ChannelEndpoint> ep0;
518 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
519 scoped_refptr<ChannelEndpoint> ep1;
520 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
521 ConnectChannelEndpoints(ep0, ep1);
522
523 // We'll try to pass this dispatcher.
524 scoped_refptr<MessagePipeDispatcher> dispatcher(
525 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
526 scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
527 dispatcher->Init(local_mp, 0);
528
529 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
530 // it later, it might already be readable.)
531 waiter.Init();
532 ASSERT_EQ(
533 MOJO_RESULT_OK,
534 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
535
536 // Write to MP 0, port 0.
537 {
538 DispatcherTransport transport(
539 test::DispatcherTryStartTransport(dispatcher.get()));
540 EXPECT_TRUE(transport.is_valid());
541
542 std::vector<DispatcherTransport> transports;
543 transports.push_back(transport);
544 EXPECT_EQ(MOJO_RESULT_OK,
545 mp0->WriteMessage(0,
546 UserPointer<const void>(kHello),
547 sizeof(kHello),
548 &transports,
549 MOJO_WRITE_MESSAGE_FLAG_NONE));
550 transport.End();
551
552 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
553 // |dispatcher| is destroyed.
554 EXPECT_TRUE(dispatcher->HasOneRef());
555 dispatcher = nullptr;
556 }
557
558 // Wait.
559 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
560 EXPECT_EQ(123u, context);
561 hss = HandleSignalsState();
562 mp1->RemoveWaiter(1, &waiter, &hss);
563 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
564 hss.satisfied_signals);
565 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
566 hss.satisfiable_signals);
567
568 // Read from MP 1, port 1.
569 char read_buffer[100] = {0};
570 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
571 DispatcherVector read_dispatchers;
572 uint32_t read_num_dispatchers = 10; // Maximum to get.
573 EXPECT_EQ(MOJO_RESULT_OK,
574 mp1->ReadMessage(1,
575 UserPointer<void>(read_buffer),
576 MakeUserPointer(&read_buffer_size),
577 &read_dispatchers,
578 &read_num_dispatchers,
579 MOJO_READ_MESSAGE_FLAG_NONE));
580 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
581 EXPECT_STREQ(kHello, read_buffer);
582 EXPECT_EQ(1u, read_dispatchers.size());
583 EXPECT_EQ(1u, read_num_dispatchers);
584 ASSERT_TRUE(read_dispatchers[0].get());
585 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
586
587 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
588 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
589
590 // Add the waiter now, before it becomes readable to avoid a race.
591 waiter.Init();
592 ASSERT_EQ(MOJO_RESULT_OK,
593 dispatcher->AddWaiter(
594 &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
595
596 // Write to "local_mp", port 1.
597 EXPECT_EQ(MOJO_RESULT_OK,
598 local_mp->WriteMessage(1,
599 UserPointer<const void>(kHello),
600 sizeof(kHello),
601 nullptr,
602 MOJO_WRITE_MESSAGE_FLAG_NONE));
603
604 // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
605 // here. (We don't crash if I sleep and then close.)
606
607 // Wait for the dispatcher to become readable.
608 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
609 EXPECT_EQ(456u, context);
610 hss = HandleSignalsState();
611 dispatcher->RemoveWaiter(&waiter, &hss);
612 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
613 hss.satisfied_signals);
614 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
615 hss.satisfiable_signals);
616
617 // Read from the dispatcher.
618 memset(read_buffer, 0, sizeof(read_buffer));
619 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
620 EXPECT_EQ(MOJO_RESULT_OK,
621 dispatcher->ReadMessage(UserPointer<void>(read_buffer),
622 MakeUserPointer(&read_buffer_size),
623 0,
624 nullptr,
625 MOJO_READ_MESSAGE_FLAG_NONE));
626 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
627 EXPECT_STREQ(kHello, read_buffer);
628
629 // Prepare to wait on "local_mp", port 1.
630 waiter.Init();
631 ASSERT_EQ(MOJO_RESULT_OK,
632 local_mp->AddWaiter(
633 1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
634
635 // Write to the dispatcher.
636 EXPECT_EQ(MOJO_RESULT_OK,
637 dispatcher->WriteMessage(UserPointer<const void>(kHello),
638 sizeof(kHello),
639 nullptr,
640 MOJO_WRITE_MESSAGE_FLAG_NONE));
641
642 // Wait.
643 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
644 EXPECT_EQ(789u, context);
645 hss = HandleSignalsState();
646 local_mp->RemoveWaiter(1, &waiter, &hss);
647 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
648 hss.satisfied_signals);
649 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
650 hss.satisfiable_signals);
651
652 // Read from "local_mp", port 1.
653 memset(read_buffer, 0, sizeof(read_buffer));
654 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
655 EXPECT_EQ(MOJO_RESULT_OK,
656 local_mp->ReadMessage(1,
657 UserPointer<void>(read_buffer),
658 MakeUserPointer(&read_buffer_size),
659 nullptr,
660 nullptr,
661 MOJO_READ_MESSAGE_FLAG_NONE));
662 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
663 EXPECT_STREQ(kHello, read_buffer);
664
665 // TODO(vtl): Also test that messages queued up before the handle was sent are
666 // delivered properly.
667
668 // Close everything that belongs to us.
669 mp0->Close(0);
670 mp1->Close(1);
671 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
672 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
673 local_mp->Close(1);
674 }
675
676 #if defined(OS_POSIX)
677 #define MAYBE_SharedBufferPassing SharedBufferPassing
678 #else
679 // Not yet implemented (on Windows).
680 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
681 #endif
682 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
683 static const char kHello[] = "hello";
684 Waiter waiter;
685 HandleSignalsState hss;
686 uint32_t context = 0;
687
688 scoped_refptr<ChannelEndpoint> ep0;
689 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
690 scoped_refptr<ChannelEndpoint> ep1;
691 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
692 ConnectChannelEndpoints(ep0, ep1);
693
694 // We'll try to pass this dispatcher.
695 scoped_refptr<SharedBufferDispatcher> dispatcher;
696 EXPECT_EQ(MOJO_RESULT_OK,
697 SharedBufferDispatcher::Create(
698 platform_support(),
699 SharedBufferDispatcher::kDefaultCreateOptions,
700 100,
701 &dispatcher));
702 ASSERT_TRUE(dispatcher.get());
703
704 // Make a mapping.
705 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping0;
706 EXPECT_EQ(
707 MOJO_RESULT_OK,
708 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping0));
709 ASSERT_TRUE(mapping0);
710 ASSERT_TRUE(mapping0->GetBase());
711 ASSERT_EQ(100u, mapping0->GetLength());
712 static_cast<char*>(mapping0->GetBase())[0] = 'A';
713 static_cast<char*>(mapping0->GetBase())[50] = 'B';
714 static_cast<char*>(mapping0->GetBase())[99] = 'C';
715
716 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
717 // it later, it might already be readable.)
718 waiter.Init();
719 ASSERT_EQ(
720 MOJO_RESULT_OK,
721 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
722
723 // Write to MP 0, port 0.
724 {
725 DispatcherTransport transport(
726 test::DispatcherTryStartTransport(dispatcher.get()));
727 EXPECT_TRUE(transport.is_valid());
728
729 std::vector<DispatcherTransport> transports;
730 transports.push_back(transport);
731 EXPECT_EQ(MOJO_RESULT_OK,
732 mp0->WriteMessage(0,
733 UserPointer<const void>(kHello),
734 sizeof(kHello),
735 &transports,
736 MOJO_WRITE_MESSAGE_FLAG_NONE));
737 transport.End();
738
739 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
740 // |dispatcher| is destroyed.
741 EXPECT_TRUE(dispatcher->HasOneRef());
742 dispatcher = nullptr;
743 }
744
745 // Wait.
746 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
747 EXPECT_EQ(123u, context);
748 hss = HandleSignalsState();
749 mp1->RemoveWaiter(1, &waiter, &hss);
750 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
751 hss.satisfied_signals);
752 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
753 hss.satisfiable_signals);
754
755 // Read from MP 1, port 1.
756 char read_buffer[100] = {0};
757 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
758 DispatcherVector read_dispatchers;
759 uint32_t read_num_dispatchers = 10; // Maximum to get.
760 EXPECT_EQ(MOJO_RESULT_OK,
761 mp1->ReadMessage(1,
762 UserPointer<void>(read_buffer),
763 MakeUserPointer(&read_buffer_size),
764 &read_dispatchers,
765 &read_num_dispatchers,
766 MOJO_READ_MESSAGE_FLAG_NONE));
767 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
768 EXPECT_STREQ(kHello, read_buffer);
769 EXPECT_EQ(1u, read_dispatchers.size());
770 EXPECT_EQ(1u, read_num_dispatchers);
771 ASSERT_TRUE(read_dispatchers[0].get());
772 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
773
774 EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
775 dispatcher = static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
776
777 // Make another mapping.
778 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping1;
779 EXPECT_EQ(
780 MOJO_RESULT_OK,
781 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1));
782 ASSERT_TRUE(mapping1);
783 ASSERT_TRUE(mapping1->GetBase());
784 ASSERT_EQ(100u, mapping1->GetLength());
785 EXPECT_NE(mapping1->GetBase(), mapping0->GetBase());
786 EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
787 EXPECT_EQ('B', static_cast<char*>(mapping1->GetBase())[50]);
788 EXPECT_EQ('C', static_cast<char*>(mapping1->GetBase())[99]);
789
790 // Write stuff either way.
791 static_cast<char*>(mapping1->GetBase())[1] = 'x';
792 EXPECT_EQ('x', static_cast<char*>(mapping0->GetBase())[1]);
793 static_cast<char*>(mapping0->GetBase())[2] = 'y';
794 EXPECT_EQ('y', static_cast<char*>(mapping1->GetBase())[2]);
795
796 // Kill the first mapping; the second should still be valid.
797 mapping0.reset();
798 EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
799
800 // Close everything that belongs to us.
801 mp0->Close(0);
802 mp1->Close(1);
803 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
804
805 // The second mapping should still be good.
806 EXPECT_EQ('x', static_cast<char*>(mapping1->GetBase())[1]);
807 }
808
809 #if defined(OS_POSIX)
810 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
811 #else
812 // Not yet implemented (on Windows).
813 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
814 #endif
815 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
816 base::ScopedTempDir temp_dir;
817 ASSERT_TRUE(temp_dir.CreateUniqueTempDir());
818
819 static const char kHello[] = "hello";
820 static const char kWorld[] = "world";
821 Waiter waiter;
822 uint32_t context = 0;
823 HandleSignalsState hss;
824
825 scoped_refptr<ChannelEndpoint> ep0;
826 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
827 scoped_refptr<ChannelEndpoint> ep1;
828 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
829 ConnectChannelEndpoints(ep0, ep1);
830
831 base::FilePath unused;
832 base::ScopedFILE fp(
833 CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused));
834 EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
835 // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
836 // be passed.
837 scoped_refptr<PlatformHandleDispatcher> dispatcher(
838 new PlatformHandleDispatcher(
839 mojo::test::PlatformHandleFromFILE(fp.Pass())));
840
841 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
842 // it later, it might already be readable.)
843 waiter.Init();
844 ASSERT_EQ(
845 MOJO_RESULT_OK,
846 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
847
848 // Write to MP 0, port 0.
849 {
850 DispatcherTransport transport(
851 test::DispatcherTryStartTransport(dispatcher.get()));
852 EXPECT_TRUE(transport.is_valid());
853
854 std::vector<DispatcherTransport> transports;
855 transports.push_back(transport);
856 EXPECT_EQ(MOJO_RESULT_OK,
857 mp0->WriteMessage(0,
858 UserPointer<const void>(kWorld),
859 sizeof(kWorld),
860 &transports,
861 MOJO_WRITE_MESSAGE_FLAG_NONE));
862 transport.End();
863
864 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
865 // |dispatcher| is destroyed.
866 EXPECT_TRUE(dispatcher->HasOneRef());
867 dispatcher = nullptr;
868 }
869
870 // Wait.
871 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
872 EXPECT_EQ(123u, context);
873 hss = HandleSignalsState();
874 mp1->RemoveWaiter(1, &waiter, &hss);
875 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
876 hss.satisfied_signals);
877 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
878 hss.satisfiable_signals);
879
880 // Read from MP 1, port 1.
881 char read_buffer[100] = {0};
882 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
883 DispatcherVector read_dispatchers;
884 uint32_t read_num_dispatchers = 10; // Maximum to get.
885 EXPECT_EQ(MOJO_RESULT_OK,
886 mp1->ReadMessage(1,
887 UserPointer<void>(read_buffer),
888 MakeUserPointer(&read_buffer_size),
889 &read_dispatchers,
890 &read_num_dispatchers,
891 MOJO_READ_MESSAGE_FLAG_NONE));
892 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
893 EXPECT_STREQ(kWorld, read_buffer);
894 EXPECT_EQ(1u, read_dispatchers.size());
895 EXPECT_EQ(1u, read_num_dispatchers);
896 ASSERT_TRUE(read_dispatchers[0].get());
897 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
898
899 EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
900 dispatcher =
901 static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
902
903 embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
904 EXPECT_TRUE(h.is_valid());
905
906 fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
907 EXPECT_FALSE(h.is_valid());
908 EXPECT_TRUE(fp);
909
910 rewind(fp.get());
911 memset(read_buffer, 0, sizeof(read_buffer));
912 EXPECT_EQ(sizeof(kHello),
913 fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
914 EXPECT_STREQ(kHello, read_buffer);
915
916 // Close everything that belongs to us.
917 mp0->Close(0);
918 mp1->Close(1);
919 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
920 }
921
922 // Test racing closes (on each end).
923 // Note: A flaky failure would almost certainly indicate a problem in the code
924 // itself (not in the test). Also, any logged warnings/errors would also
925 // probably be indicative of bugs.
926 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
927 base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
928
929 for (unsigned i = 0; i < 256; i++) {
930 DVLOG(2) << "---------------------------------------- " << i;
931 scoped_refptr<ChannelEndpoint> ep0;
932 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
933 BootstrapChannelEndpointNoWait(0, ep0);
934
935 scoped_refptr<ChannelEndpoint> ep1;
936 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
937 BootstrapChannelEndpointNoWait(1, ep1);
938
939 if (i & 1u) {
940 io_thread()->task_runner()->PostTask(
941 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
942 }
943 if (i & 2u)
944 base::PlatformThread::Sleep(delay);
945
946 mp0->Close(0);
947
948 if (i & 4u) {
949 io_thread()->task_runner()->PostTask(
950 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
951 }
952 if (i & 8u)
953 base::PlatformThread::Sleep(delay);
954
955 mp1->Close(1);
956
957 RestoreInitialState();
958 }
959 }
960
961 // Tests passing an end of a message pipe over a remote message pipe, and then
962 // passing that end back.
963 // TODO(vtl): Also test passing a message pipe across two remote message pipes.
964 TEST_F(RemoteMessagePipeTest, PassMessagePipeHandleAcrossAndBack) {
965 static const char kHello[] = "hello";
966 static const char kWorld[] = "world";
967 Waiter waiter;
968 HandleSignalsState hss;
969 uint32_t context = 0;
970
971 scoped_refptr<ChannelEndpoint> ep0;
972 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
973 scoped_refptr<ChannelEndpoint> ep1;
974 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
975 ConnectChannelEndpoints(ep0, ep1);
976
977 // We'll try to pass this dispatcher.
978 scoped_refptr<MessagePipeDispatcher> dispatcher(
979 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
980 scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
981 dispatcher->Init(local_mp, 0);
982
983 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
984 // it later, it might already be readable.)
985 waiter.Init();
986 ASSERT_EQ(
987 MOJO_RESULT_OK,
988 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
989
990 // Write to MP 0, port 0.
991 {
992 DispatcherTransport transport(
993 test::DispatcherTryStartTransport(dispatcher.get()));
994 EXPECT_TRUE(transport.is_valid());
995
996 std::vector<DispatcherTransport> transports;
997 transports.push_back(transport);
998 EXPECT_EQ(MOJO_RESULT_OK,
999 mp0->WriteMessage(0,
1000 UserPointer<const void>(kHello),
1001 sizeof(kHello),
1002 &transports,
1003 MOJO_WRITE_MESSAGE_FLAG_NONE));
1004 transport.End();
1005
1006 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
1007 // |dispatcher| is destroyed.
1008 EXPECT_TRUE(dispatcher->HasOneRef());
1009 dispatcher = nullptr;
1010 }
1011
1012 // Wait.
1013 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1014 EXPECT_EQ(123u, context);
1015 hss = HandleSignalsState();
1016 mp1->RemoveWaiter(1, &waiter, &hss);
1017 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1018 hss.satisfied_signals);
1019 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1020 hss.satisfiable_signals);
1021
1022 // Read from MP 1, port 1.
1023 char read_buffer[100] = {0};
1024 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1025 DispatcherVector read_dispatchers;
1026 uint32_t read_num_dispatchers = 10; // Maximum to get.
1027 EXPECT_EQ(MOJO_RESULT_OK,
1028 mp1->ReadMessage(1,
1029 UserPointer<void>(read_buffer),
1030 MakeUserPointer(&read_buffer_size),
1031 &read_dispatchers,
1032 &read_num_dispatchers,
1033 MOJO_READ_MESSAGE_FLAG_NONE));
1034 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
1035 EXPECT_STREQ(kHello, read_buffer);
1036 EXPECT_EQ(1u, read_dispatchers.size());
1037 EXPECT_EQ(1u, read_num_dispatchers);
1038 ASSERT_TRUE(read_dispatchers[0].get());
1039 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
1040
1041 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
1042 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
1043 read_dispatchers.clear();
1044
1045 // Now pass it back.
1046
1047 // Prepare to wait on MP 0, port 0. (Add the waiter now. Otherwise, if we do
1048 // it later, it might already be readable.)
1049 waiter.Init();
1050 ASSERT_EQ(
1051 MOJO_RESULT_OK,
1052 mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
1053
1054 // Write to MP 1, port 1.
1055 {
1056 DispatcherTransport transport(
1057 test::DispatcherTryStartTransport(dispatcher.get()));
1058 EXPECT_TRUE(transport.is_valid());
1059
1060 std::vector<DispatcherTransport> transports;
1061 transports.push_back(transport);
1062 EXPECT_EQ(MOJO_RESULT_OK,
1063 mp1->WriteMessage(1,
1064 UserPointer<const void>(kWorld),
1065 sizeof(kWorld),
1066 &transports,
1067 MOJO_WRITE_MESSAGE_FLAG_NONE));
1068 transport.End();
1069
1070 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
1071 // |dispatcher| is destroyed.
1072 EXPECT_TRUE(dispatcher->HasOneRef());
1073 dispatcher = nullptr;
1074 }
1075
1076 // Wait.
1077 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1078 EXPECT_EQ(456u, context);
1079 hss = HandleSignalsState();
1080 mp0->RemoveWaiter(0, &waiter, &hss);
1081 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1082 hss.satisfied_signals);
1083 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1084 hss.satisfiable_signals);
1085
1086 // Read from MP 0, port 0.
1087 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1088 read_num_dispatchers = 10; // Maximum to get.
1089 EXPECT_EQ(MOJO_RESULT_OK,
1090 mp0->ReadMessage(0,
1091 UserPointer<void>(read_buffer),
1092 MakeUserPointer(&read_buffer_size),
1093 &read_dispatchers,
1094 &read_num_dispatchers,
1095 MOJO_READ_MESSAGE_FLAG_NONE));
1096 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
1097 EXPECT_STREQ(kWorld, read_buffer);
1098 EXPECT_EQ(1u, read_dispatchers.size());
1099 EXPECT_EQ(1u, read_num_dispatchers);
1100 ASSERT_TRUE(read_dispatchers[0].get());
1101 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
1102
1103 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
1104 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
1105 read_dispatchers.clear();
1106
1107 // Add the waiter now, before it becomes readable to avoid a race.
1108 waiter.Init();
1109 ASSERT_EQ(MOJO_RESULT_OK,
1110 dispatcher->AddWaiter(
1111 &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
1112
1113 // Write to "local_mp", port 1.
1114 EXPECT_EQ(MOJO_RESULT_OK,
1115 local_mp->WriteMessage(1,
1116 UserPointer<const void>(kHello),
1117 sizeof(kHello),
1118 nullptr,
1119 MOJO_WRITE_MESSAGE_FLAG_NONE));
1120
1121 // Wait for the dispatcher to become readable.
1122 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1123 EXPECT_EQ(789u, context);
1124 hss = HandleSignalsState();
1125 dispatcher->RemoveWaiter(&waiter, &hss);
1126 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1127 hss.satisfied_signals);
1128 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1129 hss.satisfiable_signals);
1130
1131 // Read from the dispatcher.
1132 memset(read_buffer, 0, sizeof(read_buffer));
1133 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1134 EXPECT_EQ(MOJO_RESULT_OK,
1135 dispatcher->ReadMessage(UserPointer<void>(read_buffer),
1136 MakeUserPointer(&read_buffer_size),
1137 0,
1138 nullptr,
1139 MOJO_READ_MESSAGE_FLAG_NONE));
1140 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
1141 EXPECT_STREQ(kHello, read_buffer);
1142
1143 // Prepare to wait on "local_mp", port 1.
1144 waiter.Init();
1145 ASSERT_EQ(MOJO_RESULT_OK,
1146 local_mp->AddWaiter(
1147 1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
1148
1149 // Write to the dispatcher.
1150 EXPECT_EQ(MOJO_RESULT_OK,
1151 dispatcher->WriteMessage(UserPointer<const void>(kHello),
1152 sizeof(kHello),
1153 nullptr,
1154 MOJO_WRITE_MESSAGE_FLAG_NONE));
1155
1156 // Wait.
1157 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1158 EXPECT_EQ(789u, context);
1159 hss = HandleSignalsState();
1160 local_mp->RemoveWaiter(1, &waiter, &hss);
1161 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1162 hss.satisfied_signals);
1163 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1164 hss.satisfiable_signals);
1165
1166 // Read from "local_mp", port 1.
1167 memset(read_buffer, 0, sizeof(read_buffer));
1168 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1169 EXPECT_EQ(MOJO_RESULT_OK,
1170 local_mp->ReadMessage(1,
1171 UserPointer<void>(read_buffer),
1172 MakeUserPointer(&read_buffer_size),
1173 nullptr,
1174 nullptr,
1175 MOJO_READ_MESSAGE_FLAG_NONE));
1176 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
1177 EXPECT_STREQ(kHello, read_buffer);
1178
1179 // TODO(vtl): Also test the cases where messages are written and read (at
1180 // various points) on the message pipe being passed around.
1181
1182 // Close everything that belongs to us.
1183 mp0->Close(0);
1184 mp1->Close(1);
1185 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
1186 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
1187 local_mp->Close(1);
1188 }
1189
1190 } // namespace
1191 } // namespace system
1192 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/raw_channel_win.cc ('k') | mojo/system/run_all_unittests.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698