Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(275)

Side by Side Diff: mojo/edk/system/raw_channel_unittest.cc

Issue 814543006: Move //mojo/{public, edk} underneath //third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/raw_channel_posix.cc ('k') | mojo/edk/system/raw_channel_win.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/raw_channel.h"
6
7 #include <stdint.h>
8 #include <stdio.h>
9
10 #include <vector>
11
12 #include "base/bind.h"
13 #include "base/files/file_path.h"
14 #include "base/files/file_util.h"
15 #include "base/files/scoped_file.h"
16 #include "base/files/scoped_temp_dir.h"
17 #include "base/location.h"
18 #include "base/logging.h"
19 #include "base/macros.h"
20 #include "base/memory/scoped_ptr.h"
21 #include "base/memory/scoped_vector.h"
22 #include "base/rand_util.h"
23 #include "base/synchronization/lock.h"
24 #include "base/synchronization/waitable_event.h"
25 #include "base/test/test_io_thread.h"
26 #include "base/threading/platform_thread.h" // For |Sleep()|.
27 #include "base/threading/simple_thread.h"
28 #include "base/time/time.h"
29 #include "build/build_config.h" // TODO(vtl): Remove this.
30 #include "mojo/edk/embedder/platform_channel_pair.h"
31 #include "mojo/edk/embedder/platform_handle.h"
32 #include "mojo/edk/embedder/scoped_platform_handle.h"
33 #include "mojo/edk/system/message_in_transit.h"
34 #include "mojo/edk/system/test_utils.h"
35 #include "mojo/edk/system/transport_data.h"
36 #include "mojo/edk/test/test_utils.h"
37 #include "testing/gtest/include/gtest/gtest.h"
38
39 namespace mojo {
40 namespace system {
41 namespace {
42
43 scoped_ptr<MessageInTransit> MakeTestMessage(uint32_t num_bytes) {
44 std::vector<unsigned char> bytes(num_bytes, 0);
45 for (size_t i = 0; i < num_bytes; i++)
46 bytes[i] = static_cast<unsigned char>(i + num_bytes);
47 return make_scoped_ptr(new MessageInTransit(
48 MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData,
49 num_bytes, bytes.empty() ? nullptr : &bytes[0]));
50 }
51
52 bool CheckMessageData(const void* bytes, uint32_t num_bytes) {
53 const unsigned char* b = static_cast<const unsigned char*>(bytes);
54 for (uint32_t i = 0; i < num_bytes; i++) {
55 if (b[i] != static_cast<unsigned char>(i + num_bytes))
56 return false;
57 }
58 return true;
59 }
60
61 void InitOnIOThread(RawChannel* raw_channel, RawChannel::Delegate* delegate) {
62 raw_channel->Init(delegate);
63 }
64
65 bool WriteTestMessageToHandle(const embedder::PlatformHandle& handle,
66 uint32_t num_bytes) {
67 scoped_ptr<MessageInTransit> message(MakeTestMessage(num_bytes));
68
69 size_t write_size = 0;
70 mojo::test::BlockingWrite(handle, message->main_buffer(),
71 message->main_buffer_size(), &write_size);
72 return write_size == message->main_buffer_size();
73 }
74
75 // -----------------------------------------------------------------------------
76
77 class RawChannelTest : public testing::Test {
78 public:
79 RawChannelTest() : io_thread_(base::TestIOThread::kManualStart) {}
80 ~RawChannelTest() override {}
81
82 void SetUp() override {
83 embedder::PlatformChannelPair channel_pair;
84 handles[0] = channel_pair.PassServerHandle();
85 handles[1] = channel_pair.PassClientHandle();
86 io_thread_.Start();
87 }
88
89 void TearDown() override {
90 io_thread_.Stop();
91 handles[0].reset();
92 handles[1].reset();
93 }
94
95 protected:
96 base::TestIOThread* io_thread() { return &io_thread_; }
97
98 embedder::ScopedPlatformHandle handles[2];
99
100 private:
101 base::TestIOThread io_thread_;
102
103 DISALLOW_COPY_AND_ASSIGN(RawChannelTest);
104 };
105
106 // RawChannelTest.WriteMessage -------------------------------------------------
107
108 class WriteOnlyRawChannelDelegate : public RawChannel::Delegate {
109 public:
110 WriteOnlyRawChannelDelegate() {}
111 ~WriteOnlyRawChannelDelegate() override {}
112
113 // |RawChannel::Delegate| implementation:
114 void OnReadMessage(
115 const MessageInTransit::View& /*message_view*/,
116 embedder::ScopedPlatformHandleVectorPtr /*platform_handles*/) override {
117 CHECK(false); // Should not get called.
118 }
119 void OnError(Error error) override {
120 // We'll get a read (shutdown) error when the connection is closed.
121 CHECK_EQ(error, ERROR_READ_SHUTDOWN);
122 }
123
124 private:
125 DISALLOW_COPY_AND_ASSIGN(WriteOnlyRawChannelDelegate);
126 };
127
128 static const int64_t kMessageReaderSleepMs = 1;
129 static const size_t kMessageReaderMaxPollIterations = 3000;
130
131 class TestMessageReaderAndChecker {
132 public:
133 explicit TestMessageReaderAndChecker(embedder::PlatformHandle handle)
134 : handle_(handle) {}
135 ~TestMessageReaderAndChecker() { CHECK(bytes_.empty()); }
136
137 bool ReadAndCheckNextMessage(uint32_t expected_size) {
138 unsigned char buffer[4096];
139
140 for (size_t i = 0; i < kMessageReaderMaxPollIterations;) {
141 size_t read_size = 0;
142 CHECK(mojo::test::NonBlockingRead(handle_, buffer, sizeof(buffer),
143 &read_size));
144
145 // Append newly-read data to |bytes_|.
146 bytes_.insert(bytes_.end(), buffer, buffer + read_size);
147
148 // If we have the header....
149 size_t message_size;
150 if (MessageInTransit::GetNextMessageSize(
151 bytes_.empty() ? nullptr : &bytes_[0], bytes_.size(),
152 &message_size)) {
153 // If we've read the whole message....
154 if (bytes_.size() >= message_size) {
155 bool rv = true;
156 MessageInTransit::View message_view(message_size, &bytes_[0]);
157 CHECK_EQ(message_view.main_buffer_size(), message_size);
158
159 if (message_view.num_bytes() != expected_size) {
160 LOG(ERROR) << "Wrong size: " << message_size << " instead of "
161 << expected_size << " bytes.";
162 rv = false;
163 } else if (!CheckMessageData(message_view.bytes(),
164 message_view.num_bytes())) {
165 LOG(ERROR) << "Incorrect message bytes.";
166 rv = false;
167 }
168
169 // Erase message data.
170 bytes_.erase(bytes_.begin(),
171 bytes_.begin() + message_view.main_buffer_size());
172 return rv;
173 }
174 }
175
176 if (static_cast<size_t>(read_size) < sizeof(buffer)) {
177 i++;
178 base::PlatformThread::Sleep(
179 base::TimeDelta::FromMilliseconds(kMessageReaderSleepMs));
180 }
181 }
182
183 LOG(ERROR) << "Too many iterations.";
184 return false;
185 }
186
187 private:
188 const embedder::PlatformHandle handle_;
189
190 // The start of the received data should always be on a message boundary.
191 std::vector<unsigned char> bytes_;
192
193 DISALLOW_COPY_AND_ASSIGN(TestMessageReaderAndChecker);
194 };
195
196 // Tests writing (and verifies reading using our own custom reader).
197 TEST_F(RawChannelTest, WriteMessage) {
198 WriteOnlyRawChannelDelegate delegate;
199 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass()));
200 TestMessageReaderAndChecker checker(handles[1].get());
201 io_thread()->PostTaskAndWait(
202 FROM_HERE,
203 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate)));
204
205 // Write and read, for a variety of sizes.
206 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
207 EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size)));
208 EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size;
209 }
210
211 // Write/queue and read afterwards, for a variety of sizes.
212 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
213 EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size)));
214 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
215 EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size;
216
217 io_thread()->PostTaskAndWait(
218 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get())));
219 }
220
221 // RawChannelTest.OnReadMessage ------------------------------------------------
222
223 class ReadCheckerRawChannelDelegate : public RawChannel::Delegate {
224 public:
225 ReadCheckerRawChannelDelegate() : done_event_(false, false), position_(0) {}
226 ~ReadCheckerRawChannelDelegate() override {}
227
228 // |RawChannel::Delegate| implementation (called on the I/O thread):
229 void OnReadMessage(
230 const MessageInTransit::View& message_view,
231 embedder::ScopedPlatformHandleVectorPtr platform_handles) override {
232 EXPECT_FALSE(platform_handles);
233
234 size_t position;
235 size_t expected_size;
236 bool should_signal = false;
237 {
238 base::AutoLock locker(lock_);
239 CHECK_LT(position_, expected_sizes_.size());
240 position = position_;
241 expected_size = expected_sizes_[position];
242 position_++;
243 if (position_ >= expected_sizes_.size())
244 should_signal = true;
245 }
246
247 EXPECT_EQ(expected_size, message_view.num_bytes()) << position;
248 if (message_view.num_bytes() == expected_size) {
249 EXPECT_TRUE(
250 CheckMessageData(message_view.bytes(), message_view.num_bytes()))
251 << position;
252 }
253
254 if (should_signal)
255 done_event_.Signal();
256 }
257 void OnError(Error error) override {
258 // We'll get a read (shutdown) error when the connection is closed.
259 CHECK_EQ(error, ERROR_READ_SHUTDOWN);
260 }
261
262 // Waits for all the messages (of sizes |expected_sizes_|) to be seen.
263 void Wait() { done_event_.Wait(); }
264
265 void SetExpectedSizes(const std::vector<uint32_t>& expected_sizes) {
266 base::AutoLock locker(lock_);
267 CHECK_EQ(position_, expected_sizes_.size());
268 expected_sizes_ = expected_sizes;
269 position_ = 0;
270 }
271
272 private:
273 base::WaitableEvent done_event_;
274
275 base::Lock lock_; // Protects the following members.
276 std::vector<uint32_t> expected_sizes_;
277 size_t position_;
278
279 DISALLOW_COPY_AND_ASSIGN(ReadCheckerRawChannelDelegate);
280 };
281
282 // Tests reading (writing using our own custom writer).
283 TEST_F(RawChannelTest, OnReadMessage) {
284 ReadCheckerRawChannelDelegate delegate;
285 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass()));
286 io_thread()->PostTaskAndWait(
287 FROM_HERE,
288 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate)));
289
290 // Write and read, for a variety of sizes.
291 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
292 delegate.SetExpectedSizes(std::vector<uint32_t>(1, size));
293
294 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), size));
295
296 delegate.Wait();
297 }
298
299 // Set up reader and write as fast as we can.
300 // Write/queue and read afterwards, for a variety of sizes.
301 std::vector<uint32_t> expected_sizes;
302 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
303 expected_sizes.push_back(size);
304 delegate.SetExpectedSizes(expected_sizes);
305 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
306 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), size));
307 delegate.Wait();
308
309 io_thread()->PostTaskAndWait(
310 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get())));
311 }
312
313 // RawChannelTest.WriteMessageAndOnReadMessage ---------------------------------
314
315 class RawChannelWriterThread : public base::SimpleThread {
316 public:
317 RawChannelWriterThread(RawChannel* raw_channel, size_t write_count)
318 : base::SimpleThread("raw_channel_writer_thread"),
319 raw_channel_(raw_channel),
320 left_to_write_(write_count) {}
321
322 ~RawChannelWriterThread() override { Join(); }
323
324 private:
325 void Run() override {
326 static const int kMaxRandomMessageSize = 25000;
327
328 while (left_to_write_-- > 0) {
329 EXPECT_TRUE(raw_channel_->WriteMessage(MakeTestMessage(
330 static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize)))));
331 }
332 }
333
334 RawChannel* const raw_channel_;
335 size_t left_to_write_;
336
337 DISALLOW_COPY_AND_ASSIGN(RawChannelWriterThread);
338 };
339
340 class ReadCountdownRawChannelDelegate : public RawChannel::Delegate {
341 public:
342 explicit ReadCountdownRawChannelDelegate(size_t expected_count)
343 : done_event_(false, false), expected_count_(expected_count), count_(0) {}
344 ~ReadCountdownRawChannelDelegate() override {}
345
346 // |RawChannel::Delegate| implementation (called on the I/O thread):
347 void OnReadMessage(
348 const MessageInTransit::View& message_view,
349 embedder::ScopedPlatformHandleVectorPtr platform_handles) override {
350 EXPECT_FALSE(platform_handles);
351
352 EXPECT_LT(count_, expected_count_);
353 count_++;
354
355 EXPECT_TRUE(
356 CheckMessageData(message_view.bytes(), message_view.num_bytes()));
357
358 if (count_ >= expected_count_)
359 done_event_.Signal();
360 }
361 void OnError(Error error) override {
362 // We'll get a read (shutdown) error when the connection is closed.
363 CHECK_EQ(error, ERROR_READ_SHUTDOWN);
364 }
365
366 // Waits for all the messages to have been seen.
367 void Wait() { done_event_.Wait(); }
368
369 private:
370 base::WaitableEvent done_event_;
371 size_t expected_count_;
372 size_t count_;
373
374 DISALLOW_COPY_AND_ASSIGN(ReadCountdownRawChannelDelegate);
375 };
376
377 TEST_F(RawChannelTest, WriteMessageAndOnReadMessage) {
378 static const size_t kNumWriterThreads = 10;
379 static const size_t kNumWriteMessagesPerThread = 4000;
380
381 WriteOnlyRawChannelDelegate writer_delegate;
382 scoped_ptr<RawChannel> writer_rc(RawChannel::Create(handles[0].Pass()));
383 io_thread()->PostTaskAndWait(FROM_HERE,
384 base::Bind(&InitOnIOThread, writer_rc.get(),
385 base::Unretained(&writer_delegate)));
386
387 ReadCountdownRawChannelDelegate reader_delegate(kNumWriterThreads *
388 kNumWriteMessagesPerThread);
389 scoped_ptr<RawChannel> reader_rc(RawChannel::Create(handles[1].Pass()));
390 io_thread()->PostTaskAndWait(FROM_HERE,
391 base::Bind(&InitOnIOThread, reader_rc.get(),
392 base::Unretained(&reader_delegate)));
393
394 {
395 ScopedVector<RawChannelWriterThread> writer_threads;
396 for (size_t i = 0; i < kNumWriterThreads; i++) {
397 writer_threads.push_back(new RawChannelWriterThread(
398 writer_rc.get(), kNumWriteMessagesPerThread));
399 }
400 for (size_t i = 0; i < writer_threads.size(); i++)
401 writer_threads[i]->Start();
402 } // Joins all the writer threads.
403
404 // Sleep a bit, to let any extraneous reads be processed. (There shouldn't be
405 // any, but we want to know about them.)
406 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
407
408 // Wait for reading to finish.
409 reader_delegate.Wait();
410
411 io_thread()->PostTaskAndWait(
412 FROM_HERE,
413 base::Bind(&RawChannel::Shutdown, base::Unretained(reader_rc.get())));
414
415 io_thread()->PostTaskAndWait(
416 FROM_HERE,
417 base::Bind(&RawChannel::Shutdown, base::Unretained(writer_rc.get())));
418 }
419
420 // RawChannelTest.OnError ------------------------------------------------------
421
422 class ErrorRecordingRawChannelDelegate
423 : public ReadCountdownRawChannelDelegate {
424 public:
425 ErrorRecordingRawChannelDelegate(size_t expected_read_count,
426 bool expect_read_error,
427 bool expect_write_error)
428 : ReadCountdownRawChannelDelegate(expected_read_count),
429 got_read_error_event_(false, false),
430 got_write_error_event_(false, false),
431 expecting_read_error_(expect_read_error),
432 expecting_write_error_(expect_write_error) {}
433
434 ~ErrorRecordingRawChannelDelegate() override {}
435
436 void OnError(Error error) override {
437 switch (error) {
438 case ERROR_READ_SHUTDOWN:
439 ASSERT_TRUE(expecting_read_error_);
440 expecting_read_error_ = false;
441 got_read_error_event_.Signal();
442 break;
443 case ERROR_READ_BROKEN:
444 // TODO(vtl): Test broken connections.
445 CHECK(false);
446 break;
447 case ERROR_READ_BAD_MESSAGE:
448 // TODO(vtl): Test reception/detection of bad messages.
449 CHECK(false);
450 break;
451 case ERROR_READ_UNKNOWN:
452 // TODO(vtl): Test however it is we might get here.
453 CHECK(false);
454 break;
455 case ERROR_WRITE:
456 ASSERT_TRUE(expecting_write_error_);
457 expecting_write_error_ = false;
458 got_write_error_event_.Signal();
459 break;
460 }
461 }
462
463 void WaitForReadError() { got_read_error_event_.Wait(); }
464 void WaitForWriteError() { got_write_error_event_.Wait(); }
465
466 private:
467 base::WaitableEvent got_read_error_event_;
468 base::WaitableEvent got_write_error_event_;
469
470 bool expecting_read_error_;
471 bool expecting_write_error_;
472
473 DISALLOW_COPY_AND_ASSIGN(ErrorRecordingRawChannelDelegate);
474 };
475
476 // Tests (fatal) errors.
477 TEST_F(RawChannelTest, OnError) {
478 ErrorRecordingRawChannelDelegate delegate(0, true, true);
479 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass()));
480 io_thread()->PostTaskAndWait(
481 FROM_HERE,
482 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate)));
483
484 // Close the handle of the other end, which should make writing fail.
485 handles[1].reset();
486
487 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
488
489 // We should get a write error.
490 delegate.WaitForWriteError();
491
492 // We should also get a read error.
493 delegate.WaitForReadError();
494
495 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(2)));
496
497 // Sleep a bit, to make sure we don't get another |OnError()|
498 // notification. (If we actually get another one, |OnError()| crashes.)
499 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
500
501 io_thread()->PostTaskAndWait(
502 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get())));
503 }
504
505 // RawChannelTest.ReadUnaffectedByWriteError -----------------------------------
506
507 TEST_F(RawChannelTest, ReadUnaffectedByWriteError) {
508 const size_t kMessageCount = 5;
509
510 // Write a few messages into the other end.
511 uint32_t message_size = 1;
512 for (size_t i = 0; i < kMessageCount;
513 i++, message_size += message_size / 2 + 1)
514 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), message_size));
515
516 // Close the other end, which should make writing fail.
517 handles[1].reset();
518
519 // Only start up reading here. The system buffer should still contain the
520 // messages that were written.
521 ErrorRecordingRawChannelDelegate delegate(kMessageCount, true, true);
522 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass()));
523 io_thread()->PostTaskAndWait(
524 FROM_HERE,
525 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate)));
526
527 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
528
529 // We should definitely get a write error.
530 delegate.WaitForWriteError();
531
532 // Wait for reading to finish. A writing failure shouldn't affect reading.
533 delegate.Wait();
534
535 // And then we should get a read error.
536 delegate.WaitForReadError();
537
538 io_thread()->PostTaskAndWait(
539 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get())));
540 }
541
542 // RawChannelTest.WriteMessageAfterShutdown ------------------------------------
543
544 // Makes sure that calling |WriteMessage()| after |Shutdown()| behaves
545 // correctly.
546 TEST_F(RawChannelTest, WriteMessageAfterShutdown) {
547 WriteOnlyRawChannelDelegate delegate;
548 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass()));
549 io_thread()->PostTaskAndWait(
550 FROM_HERE,
551 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate)));
552 io_thread()->PostTaskAndWait(
553 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get())));
554
555 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
556 }
557
558 // RawChannelTest.ShutdownOnReadMessage ----------------------------------------
559
560 class ShutdownOnReadMessageRawChannelDelegate : public RawChannel::Delegate {
561 public:
562 explicit ShutdownOnReadMessageRawChannelDelegate(RawChannel* raw_channel)
563 : raw_channel_(raw_channel),
564 done_event_(false, false),
565 did_shutdown_(false) {}
566 ~ShutdownOnReadMessageRawChannelDelegate() override {}
567
568 // |RawChannel::Delegate| implementation (called on the I/O thread):
569 void OnReadMessage(
570 const MessageInTransit::View& message_view,
571 embedder::ScopedPlatformHandleVectorPtr platform_handles) override {
572 EXPECT_FALSE(platform_handles);
573 EXPECT_FALSE(did_shutdown_);
574 EXPECT_TRUE(
575 CheckMessageData(message_view.bytes(), message_view.num_bytes()));
576 raw_channel_->Shutdown();
577 did_shutdown_ = true;
578 done_event_.Signal();
579 }
580 void OnError(Error /*error*/) override {
581 CHECK(false); // Should not get called.
582 }
583
584 // Waits for shutdown.
585 void Wait() {
586 done_event_.Wait();
587 EXPECT_TRUE(did_shutdown_);
588 }
589
590 private:
591 RawChannel* const raw_channel_;
592 base::WaitableEvent done_event_;
593 bool did_shutdown_;
594
595 DISALLOW_COPY_AND_ASSIGN(ShutdownOnReadMessageRawChannelDelegate);
596 };
597
598 TEST_F(RawChannelTest, ShutdownOnReadMessage) {
599 // Write a few messages into the other end.
600 for (size_t count = 0; count < 5; count++)
601 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), 10));
602
603 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass()));
604 ShutdownOnReadMessageRawChannelDelegate delegate(rc.get());
605 io_thread()->PostTaskAndWait(
606 FROM_HERE,
607 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate)));
608
609 // Wait for the delegate, which will shut the |RawChannel| down.
610 delegate.Wait();
611 }
612
613 // RawChannelTest.ShutdownOnError{Read, Write} ---------------------------------
614
615 class ShutdownOnErrorRawChannelDelegate : public RawChannel::Delegate {
616 public:
617 ShutdownOnErrorRawChannelDelegate(RawChannel* raw_channel,
618 Error shutdown_on_error_type)
619 : raw_channel_(raw_channel),
620 shutdown_on_error_type_(shutdown_on_error_type),
621 done_event_(false, false),
622 did_shutdown_(false) {}
623 ~ShutdownOnErrorRawChannelDelegate() override {}
624
625 // |RawChannel::Delegate| implementation (called on the I/O thread):
626 void OnReadMessage(
627 const MessageInTransit::View& /*message_view*/,
628 embedder::ScopedPlatformHandleVectorPtr /*platform_handles*/) override {
629 CHECK(false); // Should not get called.
630 }
631 void OnError(Error error) override {
632 EXPECT_FALSE(did_shutdown_);
633 if (error != shutdown_on_error_type_)
634 return;
635 raw_channel_->Shutdown();
636 did_shutdown_ = true;
637 done_event_.Signal();
638 }
639
640 // Waits for shutdown.
641 void Wait() {
642 done_event_.Wait();
643 EXPECT_TRUE(did_shutdown_);
644 }
645
646 private:
647 RawChannel* const raw_channel_;
648 const Error shutdown_on_error_type_;
649 base::WaitableEvent done_event_;
650 bool did_shutdown_;
651
652 DISALLOW_COPY_AND_ASSIGN(ShutdownOnErrorRawChannelDelegate);
653 };
654
655 TEST_F(RawChannelTest, ShutdownOnErrorRead) {
656 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass()));
657 ShutdownOnErrorRawChannelDelegate delegate(
658 rc.get(), RawChannel::Delegate::ERROR_READ_SHUTDOWN);
659 io_thread()->PostTaskAndWait(
660 FROM_HERE,
661 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate)));
662
663 // Close the handle of the other end, which should stuff fail.
664 handles[1].reset();
665
666 // Wait for the delegate, which will shut the |RawChannel| down.
667 delegate.Wait();
668 }
669
670 TEST_F(RawChannelTest, ShutdownOnErrorWrite) {
671 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass()));
672 ShutdownOnErrorRawChannelDelegate delegate(rc.get(),
673 RawChannel::Delegate::ERROR_WRITE);
674 io_thread()->PostTaskAndWait(
675 FROM_HERE,
676 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate)));
677
678 // Close the handle of the other end, which should stuff fail.
679 handles[1].reset();
680
681 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
682
683 // Wait for the delegate, which will shut the |RawChannel| down.
684 delegate.Wait();
685 }
686
687 // RawChannelTest.ReadWritePlatformHandles -------------------------------------
688
689 class ReadPlatformHandlesCheckerRawChannelDelegate
690 : public RawChannel::Delegate {
691 public:
692 ReadPlatformHandlesCheckerRawChannelDelegate() : done_event_(false, false) {}
693 ~ReadPlatformHandlesCheckerRawChannelDelegate() override {}
694
695 // |RawChannel::Delegate| implementation (called on the I/O thread):
696 void OnReadMessage(
697 const MessageInTransit::View& message_view,
698 embedder::ScopedPlatformHandleVectorPtr platform_handles) override {
699 const char kHello[] = "hello";
700
701 EXPECT_EQ(sizeof(kHello), message_view.num_bytes());
702 EXPECT_STREQ(kHello, static_cast<const char*>(message_view.bytes()));
703
704 ASSERT_TRUE(platform_handles);
705 ASSERT_EQ(2u, platform_handles->size());
706 embedder::ScopedPlatformHandle h1(platform_handles->at(0));
707 EXPECT_TRUE(h1.is_valid());
708 embedder::ScopedPlatformHandle h2(platform_handles->at(1));
709 EXPECT_TRUE(h2.is_valid());
710 platform_handles->clear();
711
712 {
713 char buffer[100] = {};
714
715 base::ScopedFILE fp(mojo::test::FILEFromPlatformHandle(h1.Pass(), "rb"));
716 EXPECT_TRUE(fp);
717 rewind(fp.get());
718 EXPECT_EQ(1u, fread(buffer, 1, sizeof(buffer), fp.get()));
719 EXPECT_EQ('1', buffer[0]);
720 }
721
722 {
723 char buffer[100] = {};
724 base::ScopedFILE fp(mojo::test::FILEFromPlatformHandle(h2.Pass(), "rb"));
725 EXPECT_TRUE(fp);
726 rewind(fp.get());
727 EXPECT_EQ(1u, fread(buffer, 1, sizeof(buffer), fp.get()));
728 EXPECT_EQ('2', buffer[0]);
729 }
730
731 done_event_.Signal();
732 }
733 void OnError(Error error) override {
734 // We'll get a read (shutdown) error when the connection is closed.
735 CHECK_EQ(error, ERROR_READ_SHUTDOWN);
736 }
737
738 void Wait() { done_event_.Wait(); }
739
740 private:
741 base::WaitableEvent done_event_;
742
743 DISALLOW_COPY_AND_ASSIGN(ReadPlatformHandlesCheckerRawChannelDelegate);
744 };
745
746 #if defined(OS_POSIX)
747 #define MAYBE_ReadWritePlatformHandles ReadWritePlatformHandles
748 #else
749 // Not yet implemented (on Windows).
750 #define MAYBE_ReadWritePlatformHandles DISABLED_ReadWritePlatformHandles
751 #endif
752 TEST_F(RawChannelTest, MAYBE_ReadWritePlatformHandles) {
753 base::ScopedTempDir temp_dir;
754 ASSERT_TRUE(temp_dir.CreateUniqueTempDir());
755
756 WriteOnlyRawChannelDelegate write_delegate;
757 scoped_ptr<RawChannel> rc_write(RawChannel::Create(handles[0].Pass()));
758 io_thread()->PostTaskAndWait(FROM_HERE,
759 base::Bind(&InitOnIOThread, rc_write.get(),
760 base::Unretained(&write_delegate)));
761
762 ReadPlatformHandlesCheckerRawChannelDelegate read_delegate;
763 scoped_ptr<RawChannel> rc_read(RawChannel::Create(handles[1].Pass()));
764 io_thread()->PostTaskAndWait(FROM_HERE,
765 base::Bind(&InitOnIOThread, rc_read.get(),
766 base::Unretained(&read_delegate)));
767
768 base::FilePath unused;
769 base::ScopedFILE fp1(
770 base::CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused));
771 EXPECT_EQ(1u, fwrite("1", 1, 1, fp1.get()));
772 base::ScopedFILE fp2(
773 base::CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused));
774 EXPECT_EQ(1u, fwrite("2", 1, 1, fp2.get()));
775
776 {
777 const char kHello[] = "hello";
778 embedder::ScopedPlatformHandleVectorPtr platform_handles(
779 new embedder::PlatformHandleVector());
780 platform_handles->push_back(
781 mojo::test::PlatformHandleFromFILE(fp1.Pass()).release());
782 platform_handles->push_back(
783 mojo::test::PlatformHandleFromFILE(fp2.Pass()).release());
784
785 scoped_ptr<MessageInTransit> message(new MessageInTransit(
786 MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData,
787 sizeof(kHello), kHello));
788 message->SetTransportData(
789 make_scoped_ptr(new TransportData(platform_handles.Pass())));
790 EXPECT_TRUE(rc_write->WriteMessage(message.Pass()));
791 }
792
793 read_delegate.Wait();
794
795 io_thread()->PostTaskAndWait(
796 FROM_HERE,
797 base::Bind(&RawChannel::Shutdown, base::Unretained(rc_read.get())));
798 io_thread()->PostTaskAndWait(
799 FROM_HERE,
800 base::Bind(&RawChannel::Shutdown, base::Unretained(rc_write.get())));
801 }
802
803 } // namespace
804 } // namespace system
805 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/raw_channel_posix.cc ('k') | mojo/edk/system/raw_channel_win.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698