OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/edk/system/raw_channel.h" | |
6 | |
7 #include <stdint.h> | |
8 #include <stdio.h> | |
9 | |
10 #include <vector> | |
11 | |
12 #include "base/bind.h" | |
13 #include "base/files/file_path.h" | |
14 #include "base/files/file_util.h" | |
15 #include "base/files/scoped_file.h" | |
16 #include "base/files/scoped_temp_dir.h" | |
17 #include "base/location.h" | |
18 #include "base/logging.h" | |
19 #include "base/macros.h" | |
20 #include "base/memory/scoped_ptr.h" | |
21 #include "base/memory/scoped_vector.h" | |
22 #include "base/rand_util.h" | |
23 #include "base/synchronization/lock.h" | |
24 #include "base/synchronization/waitable_event.h" | |
25 #include "base/test/test_io_thread.h" | |
26 #include "base/threading/platform_thread.h" // For |Sleep()|. | |
27 #include "base/threading/simple_thread.h" | |
28 #include "base/time/time.h" | |
29 #include "build/build_config.h" // TODO(vtl): Remove this. | |
30 #include "mojo/edk/embedder/platform_channel_pair.h" | |
31 #include "mojo/edk/embedder/platform_handle.h" | |
32 #include "mojo/edk/embedder/scoped_platform_handle.h" | |
33 #include "mojo/edk/system/message_in_transit.h" | |
34 #include "mojo/edk/system/test_utils.h" | |
35 #include "mojo/edk/system/transport_data.h" | |
36 #include "mojo/edk/test/test_utils.h" | |
37 #include "testing/gtest/include/gtest/gtest.h" | |
38 | |
39 namespace mojo { | |
40 namespace system { | |
41 namespace { | |
42 | |
43 scoped_ptr<MessageInTransit> MakeTestMessage(uint32_t num_bytes) { | |
44 std::vector<unsigned char> bytes(num_bytes, 0); | |
45 for (size_t i = 0; i < num_bytes; i++) | |
46 bytes[i] = static_cast<unsigned char>(i + num_bytes); | |
47 return make_scoped_ptr(new MessageInTransit( | |
48 MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, | |
49 num_bytes, bytes.empty() ? nullptr : &bytes[0])); | |
50 } | |
51 | |
52 bool CheckMessageData(const void* bytes, uint32_t num_bytes) { | |
53 const unsigned char* b = static_cast<const unsigned char*>(bytes); | |
54 for (uint32_t i = 0; i < num_bytes; i++) { | |
55 if (b[i] != static_cast<unsigned char>(i + num_bytes)) | |
56 return false; | |
57 } | |
58 return true; | |
59 } | |
60 | |
61 void InitOnIOThread(RawChannel* raw_channel, RawChannel::Delegate* delegate) { | |
62 raw_channel->Init(delegate); | |
63 } | |
64 | |
65 bool WriteTestMessageToHandle(const embedder::PlatformHandle& handle, | |
66 uint32_t num_bytes) { | |
67 scoped_ptr<MessageInTransit> message(MakeTestMessage(num_bytes)); | |
68 | |
69 size_t write_size = 0; | |
70 mojo::test::BlockingWrite(handle, message->main_buffer(), | |
71 message->main_buffer_size(), &write_size); | |
72 return write_size == message->main_buffer_size(); | |
73 } | |
74 | |
75 // ----------------------------------------------------------------------------- | |
76 | |
77 class RawChannelTest : public testing::Test { | |
78 public: | |
79 RawChannelTest() : io_thread_(base::TestIOThread::kManualStart) {} | |
80 ~RawChannelTest() override {} | |
81 | |
82 void SetUp() override { | |
83 embedder::PlatformChannelPair channel_pair; | |
84 handles[0] = channel_pair.PassServerHandle(); | |
85 handles[1] = channel_pair.PassClientHandle(); | |
86 io_thread_.Start(); | |
87 } | |
88 | |
89 void TearDown() override { | |
90 io_thread_.Stop(); | |
91 handles[0].reset(); | |
92 handles[1].reset(); | |
93 } | |
94 | |
95 protected: | |
96 base::TestIOThread* io_thread() { return &io_thread_; } | |
97 | |
98 embedder::ScopedPlatformHandle handles[2]; | |
99 | |
100 private: | |
101 base::TestIOThread io_thread_; | |
102 | |
103 DISALLOW_COPY_AND_ASSIGN(RawChannelTest); | |
104 }; | |
105 | |
106 // RawChannelTest.WriteMessage ------------------------------------------------- | |
107 | |
108 class WriteOnlyRawChannelDelegate : public RawChannel::Delegate { | |
109 public: | |
110 WriteOnlyRawChannelDelegate() {} | |
111 ~WriteOnlyRawChannelDelegate() override {} | |
112 | |
113 // |RawChannel::Delegate| implementation: | |
114 void OnReadMessage( | |
115 const MessageInTransit::View& /*message_view*/, | |
116 embedder::ScopedPlatformHandleVectorPtr /*platform_handles*/) override { | |
117 CHECK(false); // Should not get called. | |
118 } | |
119 void OnError(Error error) override { | |
120 // We'll get a read (shutdown) error when the connection is closed. | |
121 CHECK_EQ(error, ERROR_READ_SHUTDOWN); | |
122 } | |
123 | |
124 private: | |
125 DISALLOW_COPY_AND_ASSIGN(WriteOnlyRawChannelDelegate); | |
126 }; | |
127 | |
128 static const int64_t kMessageReaderSleepMs = 1; | |
129 static const size_t kMessageReaderMaxPollIterations = 3000; | |
130 | |
131 class TestMessageReaderAndChecker { | |
132 public: | |
133 explicit TestMessageReaderAndChecker(embedder::PlatformHandle handle) | |
134 : handle_(handle) {} | |
135 ~TestMessageReaderAndChecker() { CHECK(bytes_.empty()); } | |
136 | |
137 bool ReadAndCheckNextMessage(uint32_t expected_size) { | |
138 unsigned char buffer[4096]; | |
139 | |
140 for (size_t i = 0; i < kMessageReaderMaxPollIterations;) { | |
141 size_t read_size = 0; | |
142 CHECK(mojo::test::NonBlockingRead(handle_, buffer, sizeof(buffer), | |
143 &read_size)); | |
144 | |
145 // Append newly-read data to |bytes_|. | |
146 bytes_.insert(bytes_.end(), buffer, buffer + read_size); | |
147 | |
148 // If we have the header.... | |
149 size_t message_size; | |
150 if (MessageInTransit::GetNextMessageSize( | |
151 bytes_.empty() ? nullptr : &bytes_[0], bytes_.size(), | |
152 &message_size)) { | |
153 // If we've read the whole message.... | |
154 if (bytes_.size() >= message_size) { | |
155 bool rv = true; | |
156 MessageInTransit::View message_view(message_size, &bytes_[0]); | |
157 CHECK_EQ(message_view.main_buffer_size(), message_size); | |
158 | |
159 if (message_view.num_bytes() != expected_size) { | |
160 LOG(ERROR) << "Wrong size: " << message_size << " instead of " | |
161 << expected_size << " bytes."; | |
162 rv = false; | |
163 } else if (!CheckMessageData(message_view.bytes(), | |
164 message_view.num_bytes())) { | |
165 LOG(ERROR) << "Incorrect message bytes."; | |
166 rv = false; | |
167 } | |
168 | |
169 // Erase message data. | |
170 bytes_.erase(bytes_.begin(), | |
171 bytes_.begin() + message_view.main_buffer_size()); | |
172 return rv; | |
173 } | |
174 } | |
175 | |
176 if (static_cast<size_t>(read_size) < sizeof(buffer)) { | |
177 i++; | |
178 base::PlatformThread::Sleep( | |
179 base::TimeDelta::FromMilliseconds(kMessageReaderSleepMs)); | |
180 } | |
181 } | |
182 | |
183 LOG(ERROR) << "Too many iterations."; | |
184 return false; | |
185 } | |
186 | |
187 private: | |
188 const embedder::PlatformHandle handle_; | |
189 | |
190 // The start of the received data should always be on a message boundary. | |
191 std::vector<unsigned char> bytes_; | |
192 | |
193 DISALLOW_COPY_AND_ASSIGN(TestMessageReaderAndChecker); | |
194 }; | |
195 | |
196 // Tests writing (and verifies reading using our own custom reader). | |
197 TEST_F(RawChannelTest, WriteMessage) { | |
198 WriteOnlyRawChannelDelegate delegate; | |
199 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
200 TestMessageReaderAndChecker checker(handles[1].get()); | |
201 io_thread()->PostTaskAndWait( | |
202 FROM_HERE, | |
203 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
204 | |
205 // Write and read, for a variety of sizes. | |
206 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) { | |
207 EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size))); | |
208 EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size; | |
209 } | |
210 | |
211 // Write/queue and read afterwards, for a variety of sizes. | |
212 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) | |
213 EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size))); | |
214 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) | |
215 EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size; | |
216 | |
217 io_thread()->PostTaskAndWait( | |
218 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); | |
219 } | |
220 | |
221 // RawChannelTest.OnReadMessage ------------------------------------------------ | |
222 | |
223 class ReadCheckerRawChannelDelegate : public RawChannel::Delegate { | |
224 public: | |
225 ReadCheckerRawChannelDelegate() : done_event_(false, false), position_(0) {} | |
226 ~ReadCheckerRawChannelDelegate() override {} | |
227 | |
228 // |RawChannel::Delegate| implementation (called on the I/O thread): | |
229 void OnReadMessage( | |
230 const MessageInTransit::View& message_view, | |
231 embedder::ScopedPlatformHandleVectorPtr platform_handles) override { | |
232 EXPECT_FALSE(platform_handles); | |
233 | |
234 size_t position; | |
235 size_t expected_size; | |
236 bool should_signal = false; | |
237 { | |
238 base::AutoLock locker(lock_); | |
239 CHECK_LT(position_, expected_sizes_.size()); | |
240 position = position_; | |
241 expected_size = expected_sizes_[position]; | |
242 position_++; | |
243 if (position_ >= expected_sizes_.size()) | |
244 should_signal = true; | |
245 } | |
246 | |
247 EXPECT_EQ(expected_size, message_view.num_bytes()) << position; | |
248 if (message_view.num_bytes() == expected_size) { | |
249 EXPECT_TRUE( | |
250 CheckMessageData(message_view.bytes(), message_view.num_bytes())) | |
251 << position; | |
252 } | |
253 | |
254 if (should_signal) | |
255 done_event_.Signal(); | |
256 } | |
257 void OnError(Error error) override { | |
258 // We'll get a read (shutdown) error when the connection is closed. | |
259 CHECK_EQ(error, ERROR_READ_SHUTDOWN); | |
260 } | |
261 | |
262 // Waits for all the messages (of sizes |expected_sizes_|) to be seen. | |
263 void Wait() { done_event_.Wait(); } | |
264 | |
265 void SetExpectedSizes(const std::vector<uint32_t>& expected_sizes) { | |
266 base::AutoLock locker(lock_); | |
267 CHECK_EQ(position_, expected_sizes_.size()); | |
268 expected_sizes_ = expected_sizes; | |
269 position_ = 0; | |
270 } | |
271 | |
272 private: | |
273 base::WaitableEvent done_event_; | |
274 | |
275 base::Lock lock_; // Protects the following members. | |
276 std::vector<uint32_t> expected_sizes_; | |
277 size_t position_; | |
278 | |
279 DISALLOW_COPY_AND_ASSIGN(ReadCheckerRawChannelDelegate); | |
280 }; | |
281 | |
282 // Tests reading (writing using our own custom writer). | |
283 TEST_F(RawChannelTest, OnReadMessage) { | |
284 ReadCheckerRawChannelDelegate delegate; | |
285 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
286 io_thread()->PostTaskAndWait( | |
287 FROM_HERE, | |
288 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
289 | |
290 // Write and read, for a variety of sizes. | |
291 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) { | |
292 delegate.SetExpectedSizes(std::vector<uint32_t>(1, size)); | |
293 | |
294 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), size)); | |
295 | |
296 delegate.Wait(); | |
297 } | |
298 | |
299 // Set up reader and write as fast as we can. | |
300 // Write/queue and read afterwards, for a variety of sizes. | |
301 std::vector<uint32_t> expected_sizes; | |
302 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) | |
303 expected_sizes.push_back(size); | |
304 delegate.SetExpectedSizes(expected_sizes); | |
305 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) | |
306 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), size)); | |
307 delegate.Wait(); | |
308 | |
309 io_thread()->PostTaskAndWait( | |
310 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); | |
311 } | |
312 | |
313 // RawChannelTest.WriteMessageAndOnReadMessage --------------------------------- | |
314 | |
315 class RawChannelWriterThread : public base::SimpleThread { | |
316 public: | |
317 RawChannelWriterThread(RawChannel* raw_channel, size_t write_count) | |
318 : base::SimpleThread("raw_channel_writer_thread"), | |
319 raw_channel_(raw_channel), | |
320 left_to_write_(write_count) {} | |
321 | |
322 ~RawChannelWriterThread() override { Join(); } | |
323 | |
324 private: | |
325 void Run() override { | |
326 static const int kMaxRandomMessageSize = 25000; | |
327 | |
328 while (left_to_write_-- > 0) { | |
329 EXPECT_TRUE(raw_channel_->WriteMessage(MakeTestMessage( | |
330 static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize))))); | |
331 } | |
332 } | |
333 | |
334 RawChannel* const raw_channel_; | |
335 size_t left_to_write_; | |
336 | |
337 DISALLOW_COPY_AND_ASSIGN(RawChannelWriterThread); | |
338 }; | |
339 | |
340 class ReadCountdownRawChannelDelegate : public RawChannel::Delegate { | |
341 public: | |
342 explicit ReadCountdownRawChannelDelegate(size_t expected_count) | |
343 : done_event_(false, false), expected_count_(expected_count), count_(0) {} | |
344 ~ReadCountdownRawChannelDelegate() override {} | |
345 | |
346 // |RawChannel::Delegate| implementation (called on the I/O thread): | |
347 void OnReadMessage( | |
348 const MessageInTransit::View& message_view, | |
349 embedder::ScopedPlatformHandleVectorPtr platform_handles) override { | |
350 EXPECT_FALSE(platform_handles); | |
351 | |
352 EXPECT_LT(count_, expected_count_); | |
353 count_++; | |
354 | |
355 EXPECT_TRUE( | |
356 CheckMessageData(message_view.bytes(), message_view.num_bytes())); | |
357 | |
358 if (count_ >= expected_count_) | |
359 done_event_.Signal(); | |
360 } | |
361 void OnError(Error error) override { | |
362 // We'll get a read (shutdown) error when the connection is closed. | |
363 CHECK_EQ(error, ERROR_READ_SHUTDOWN); | |
364 } | |
365 | |
366 // Waits for all the messages to have been seen. | |
367 void Wait() { done_event_.Wait(); } | |
368 | |
369 private: | |
370 base::WaitableEvent done_event_; | |
371 size_t expected_count_; | |
372 size_t count_; | |
373 | |
374 DISALLOW_COPY_AND_ASSIGN(ReadCountdownRawChannelDelegate); | |
375 }; | |
376 | |
377 TEST_F(RawChannelTest, WriteMessageAndOnReadMessage) { | |
378 static const size_t kNumWriterThreads = 10; | |
379 static const size_t kNumWriteMessagesPerThread = 4000; | |
380 | |
381 WriteOnlyRawChannelDelegate writer_delegate; | |
382 scoped_ptr<RawChannel> writer_rc(RawChannel::Create(handles[0].Pass())); | |
383 io_thread()->PostTaskAndWait(FROM_HERE, | |
384 base::Bind(&InitOnIOThread, writer_rc.get(), | |
385 base::Unretained(&writer_delegate))); | |
386 | |
387 ReadCountdownRawChannelDelegate reader_delegate(kNumWriterThreads * | |
388 kNumWriteMessagesPerThread); | |
389 scoped_ptr<RawChannel> reader_rc(RawChannel::Create(handles[1].Pass())); | |
390 io_thread()->PostTaskAndWait(FROM_HERE, | |
391 base::Bind(&InitOnIOThread, reader_rc.get(), | |
392 base::Unretained(&reader_delegate))); | |
393 | |
394 { | |
395 ScopedVector<RawChannelWriterThread> writer_threads; | |
396 for (size_t i = 0; i < kNumWriterThreads; i++) { | |
397 writer_threads.push_back(new RawChannelWriterThread( | |
398 writer_rc.get(), kNumWriteMessagesPerThread)); | |
399 } | |
400 for (size_t i = 0; i < writer_threads.size(); i++) | |
401 writer_threads[i]->Start(); | |
402 } // Joins all the writer threads. | |
403 | |
404 // Sleep a bit, to let any extraneous reads be processed. (There shouldn't be | |
405 // any, but we want to know about them.) | |
406 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100)); | |
407 | |
408 // Wait for reading to finish. | |
409 reader_delegate.Wait(); | |
410 | |
411 io_thread()->PostTaskAndWait( | |
412 FROM_HERE, | |
413 base::Bind(&RawChannel::Shutdown, base::Unretained(reader_rc.get()))); | |
414 | |
415 io_thread()->PostTaskAndWait( | |
416 FROM_HERE, | |
417 base::Bind(&RawChannel::Shutdown, base::Unretained(writer_rc.get()))); | |
418 } | |
419 | |
420 // RawChannelTest.OnError ------------------------------------------------------ | |
421 | |
422 class ErrorRecordingRawChannelDelegate | |
423 : public ReadCountdownRawChannelDelegate { | |
424 public: | |
425 ErrorRecordingRawChannelDelegate(size_t expected_read_count, | |
426 bool expect_read_error, | |
427 bool expect_write_error) | |
428 : ReadCountdownRawChannelDelegate(expected_read_count), | |
429 got_read_error_event_(false, false), | |
430 got_write_error_event_(false, false), | |
431 expecting_read_error_(expect_read_error), | |
432 expecting_write_error_(expect_write_error) {} | |
433 | |
434 ~ErrorRecordingRawChannelDelegate() override {} | |
435 | |
436 void OnError(Error error) override { | |
437 switch (error) { | |
438 case ERROR_READ_SHUTDOWN: | |
439 ASSERT_TRUE(expecting_read_error_); | |
440 expecting_read_error_ = false; | |
441 got_read_error_event_.Signal(); | |
442 break; | |
443 case ERROR_READ_BROKEN: | |
444 // TODO(vtl): Test broken connections. | |
445 CHECK(false); | |
446 break; | |
447 case ERROR_READ_BAD_MESSAGE: | |
448 // TODO(vtl): Test reception/detection of bad messages. | |
449 CHECK(false); | |
450 break; | |
451 case ERROR_READ_UNKNOWN: | |
452 // TODO(vtl): Test however it is we might get here. | |
453 CHECK(false); | |
454 break; | |
455 case ERROR_WRITE: | |
456 ASSERT_TRUE(expecting_write_error_); | |
457 expecting_write_error_ = false; | |
458 got_write_error_event_.Signal(); | |
459 break; | |
460 } | |
461 } | |
462 | |
463 void WaitForReadError() { got_read_error_event_.Wait(); } | |
464 void WaitForWriteError() { got_write_error_event_.Wait(); } | |
465 | |
466 private: | |
467 base::WaitableEvent got_read_error_event_; | |
468 base::WaitableEvent got_write_error_event_; | |
469 | |
470 bool expecting_read_error_; | |
471 bool expecting_write_error_; | |
472 | |
473 DISALLOW_COPY_AND_ASSIGN(ErrorRecordingRawChannelDelegate); | |
474 }; | |
475 | |
476 // Tests (fatal) errors. | |
477 TEST_F(RawChannelTest, OnError) { | |
478 ErrorRecordingRawChannelDelegate delegate(0, true, true); | |
479 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
480 io_thread()->PostTaskAndWait( | |
481 FROM_HERE, | |
482 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
483 | |
484 // Close the handle of the other end, which should make writing fail. | |
485 handles[1].reset(); | |
486 | |
487 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); | |
488 | |
489 // We should get a write error. | |
490 delegate.WaitForWriteError(); | |
491 | |
492 // We should also get a read error. | |
493 delegate.WaitForReadError(); | |
494 | |
495 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(2))); | |
496 | |
497 // Sleep a bit, to make sure we don't get another |OnError()| | |
498 // notification. (If we actually get another one, |OnError()| crashes.) | |
499 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20)); | |
500 | |
501 io_thread()->PostTaskAndWait( | |
502 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); | |
503 } | |
504 | |
505 // RawChannelTest.ReadUnaffectedByWriteError ----------------------------------- | |
506 | |
507 TEST_F(RawChannelTest, ReadUnaffectedByWriteError) { | |
508 const size_t kMessageCount = 5; | |
509 | |
510 // Write a few messages into the other end. | |
511 uint32_t message_size = 1; | |
512 for (size_t i = 0; i < kMessageCount; | |
513 i++, message_size += message_size / 2 + 1) | |
514 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), message_size)); | |
515 | |
516 // Close the other end, which should make writing fail. | |
517 handles[1].reset(); | |
518 | |
519 // Only start up reading here. The system buffer should still contain the | |
520 // messages that were written. | |
521 ErrorRecordingRawChannelDelegate delegate(kMessageCount, true, true); | |
522 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
523 io_thread()->PostTaskAndWait( | |
524 FROM_HERE, | |
525 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
526 | |
527 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); | |
528 | |
529 // We should definitely get a write error. | |
530 delegate.WaitForWriteError(); | |
531 | |
532 // Wait for reading to finish. A writing failure shouldn't affect reading. | |
533 delegate.Wait(); | |
534 | |
535 // And then we should get a read error. | |
536 delegate.WaitForReadError(); | |
537 | |
538 io_thread()->PostTaskAndWait( | |
539 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); | |
540 } | |
541 | |
542 // RawChannelTest.WriteMessageAfterShutdown ------------------------------------ | |
543 | |
544 // Makes sure that calling |WriteMessage()| after |Shutdown()| behaves | |
545 // correctly. | |
546 TEST_F(RawChannelTest, WriteMessageAfterShutdown) { | |
547 WriteOnlyRawChannelDelegate delegate; | |
548 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
549 io_thread()->PostTaskAndWait( | |
550 FROM_HERE, | |
551 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
552 io_thread()->PostTaskAndWait( | |
553 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); | |
554 | |
555 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); | |
556 } | |
557 | |
558 // RawChannelTest.ShutdownOnReadMessage ---------------------------------------- | |
559 | |
560 class ShutdownOnReadMessageRawChannelDelegate : public RawChannel::Delegate { | |
561 public: | |
562 explicit ShutdownOnReadMessageRawChannelDelegate(RawChannel* raw_channel) | |
563 : raw_channel_(raw_channel), | |
564 done_event_(false, false), | |
565 did_shutdown_(false) {} | |
566 ~ShutdownOnReadMessageRawChannelDelegate() override {} | |
567 | |
568 // |RawChannel::Delegate| implementation (called on the I/O thread): | |
569 void OnReadMessage( | |
570 const MessageInTransit::View& message_view, | |
571 embedder::ScopedPlatformHandleVectorPtr platform_handles) override { | |
572 EXPECT_FALSE(platform_handles); | |
573 EXPECT_FALSE(did_shutdown_); | |
574 EXPECT_TRUE( | |
575 CheckMessageData(message_view.bytes(), message_view.num_bytes())); | |
576 raw_channel_->Shutdown(); | |
577 did_shutdown_ = true; | |
578 done_event_.Signal(); | |
579 } | |
580 void OnError(Error /*error*/) override { | |
581 CHECK(false); // Should not get called. | |
582 } | |
583 | |
584 // Waits for shutdown. | |
585 void Wait() { | |
586 done_event_.Wait(); | |
587 EXPECT_TRUE(did_shutdown_); | |
588 } | |
589 | |
590 private: | |
591 RawChannel* const raw_channel_; | |
592 base::WaitableEvent done_event_; | |
593 bool did_shutdown_; | |
594 | |
595 DISALLOW_COPY_AND_ASSIGN(ShutdownOnReadMessageRawChannelDelegate); | |
596 }; | |
597 | |
598 TEST_F(RawChannelTest, ShutdownOnReadMessage) { | |
599 // Write a few messages into the other end. | |
600 for (size_t count = 0; count < 5; count++) | |
601 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), 10)); | |
602 | |
603 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
604 ShutdownOnReadMessageRawChannelDelegate delegate(rc.get()); | |
605 io_thread()->PostTaskAndWait( | |
606 FROM_HERE, | |
607 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
608 | |
609 // Wait for the delegate, which will shut the |RawChannel| down. | |
610 delegate.Wait(); | |
611 } | |
612 | |
613 // RawChannelTest.ShutdownOnError{Read, Write} --------------------------------- | |
614 | |
615 class ShutdownOnErrorRawChannelDelegate : public RawChannel::Delegate { | |
616 public: | |
617 ShutdownOnErrorRawChannelDelegate(RawChannel* raw_channel, | |
618 Error shutdown_on_error_type) | |
619 : raw_channel_(raw_channel), | |
620 shutdown_on_error_type_(shutdown_on_error_type), | |
621 done_event_(false, false), | |
622 did_shutdown_(false) {} | |
623 ~ShutdownOnErrorRawChannelDelegate() override {} | |
624 | |
625 // |RawChannel::Delegate| implementation (called on the I/O thread): | |
626 void OnReadMessage( | |
627 const MessageInTransit::View& /*message_view*/, | |
628 embedder::ScopedPlatformHandleVectorPtr /*platform_handles*/) override { | |
629 CHECK(false); // Should not get called. | |
630 } | |
631 void OnError(Error error) override { | |
632 EXPECT_FALSE(did_shutdown_); | |
633 if (error != shutdown_on_error_type_) | |
634 return; | |
635 raw_channel_->Shutdown(); | |
636 did_shutdown_ = true; | |
637 done_event_.Signal(); | |
638 } | |
639 | |
640 // Waits for shutdown. | |
641 void Wait() { | |
642 done_event_.Wait(); | |
643 EXPECT_TRUE(did_shutdown_); | |
644 } | |
645 | |
646 private: | |
647 RawChannel* const raw_channel_; | |
648 const Error shutdown_on_error_type_; | |
649 base::WaitableEvent done_event_; | |
650 bool did_shutdown_; | |
651 | |
652 DISALLOW_COPY_AND_ASSIGN(ShutdownOnErrorRawChannelDelegate); | |
653 }; | |
654 | |
655 TEST_F(RawChannelTest, ShutdownOnErrorRead) { | |
656 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
657 ShutdownOnErrorRawChannelDelegate delegate( | |
658 rc.get(), RawChannel::Delegate::ERROR_READ_SHUTDOWN); | |
659 io_thread()->PostTaskAndWait( | |
660 FROM_HERE, | |
661 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
662 | |
663 // Close the handle of the other end, which should stuff fail. | |
664 handles[1].reset(); | |
665 | |
666 // Wait for the delegate, which will shut the |RawChannel| down. | |
667 delegate.Wait(); | |
668 } | |
669 | |
670 TEST_F(RawChannelTest, ShutdownOnErrorWrite) { | |
671 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
672 ShutdownOnErrorRawChannelDelegate delegate(rc.get(), | |
673 RawChannel::Delegate::ERROR_WRITE); | |
674 io_thread()->PostTaskAndWait( | |
675 FROM_HERE, | |
676 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
677 | |
678 // Close the handle of the other end, which should stuff fail. | |
679 handles[1].reset(); | |
680 | |
681 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); | |
682 | |
683 // Wait for the delegate, which will shut the |RawChannel| down. | |
684 delegate.Wait(); | |
685 } | |
686 | |
687 // RawChannelTest.ReadWritePlatformHandles ------------------------------------- | |
688 | |
689 class ReadPlatformHandlesCheckerRawChannelDelegate | |
690 : public RawChannel::Delegate { | |
691 public: | |
692 ReadPlatformHandlesCheckerRawChannelDelegate() : done_event_(false, false) {} | |
693 ~ReadPlatformHandlesCheckerRawChannelDelegate() override {} | |
694 | |
695 // |RawChannel::Delegate| implementation (called on the I/O thread): | |
696 void OnReadMessage( | |
697 const MessageInTransit::View& message_view, | |
698 embedder::ScopedPlatformHandleVectorPtr platform_handles) override { | |
699 const char kHello[] = "hello"; | |
700 | |
701 EXPECT_EQ(sizeof(kHello), message_view.num_bytes()); | |
702 EXPECT_STREQ(kHello, static_cast<const char*>(message_view.bytes())); | |
703 | |
704 ASSERT_TRUE(platform_handles); | |
705 ASSERT_EQ(2u, platform_handles->size()); | |
706 embedder::ScopedPlatformHandle h1(platform_handles->at(0)); | |
707 EXPECT_TRUE(h1.is_valid()); | |
708 embedder::ScopedPlatformHandle h2(platform_handles->at(1)); | |
709 EXPECT_TRUE(h2.is_valid()); | |
710 platform_handles->clear(); | |
711 | |
712 { | |
713 char buffer[100] = {}; | |
714 | |
715 base::ScopedFILE fp(mojo::test::FILEFromPlatformHandle(h1.Pass(), "rb")); | |
716 EXPECT_TRUE(fp); | |
717 rewind(fp.get()); | |
718 EXPECT_EQ(1u, fread(buffer, 1, sizeof(buffer), fp.get())); | |
719 EXPECT_EQ('1', buffer[0]); | |
720 } | |
721 | |
722 { | |
723 char buffer[100] = {}; | |
724 base::ScopedFILE fp(mojo::test::FILEFromPlatformHandle(h2.Pass(), "rb")); | |
725 EXPECT_TRUE(fp); | |
726 rewind(fp.get()); | |
727 EXPECT_EQ(1u, fread(buffer, 1, sizeof(buffer), fp.get())); | |
728 EXPECT_EQ('2', buffer[0]); | |
729 } | |
730 | |
731 done_event_.Signal(); | |
732 } | |
733 void OnError(Error error) override { | |
734 // We'll get a read (shutdown) error when the connection is closed. | |
735 CHECK_EQ(error, ERROR_READ_SHUTDOWN); | |
736 } | |
737 | |
738 void Wait() { done_event_.Wait(); } | |
739 | |
740 private: | |
741 base::WaitableEvent done_event_; | |
742 | |
743 DISALLOW_COPY_AND_ASSIGN(ReadPlatformHandlesCheckerRawChannelDelegate); | |
744 }; | |
745 | |
746 #if defined(OS_POSIX) | |
747 #define MAYBE_ReadWritePlatformHandles ReadWritePlatformHandles | |
748 #else | |
749 // Not yet implemented (on Windows). | |
750 #define MAYBE_ReadWritePlatformHandles DISABLED_ReadWritePlatformHandles | |
751 #endif | |
752 TEST_F(RawChannelTest, MAYBE_ReadWritePlatformHandles) { | |
753 base::ScopedTempDir temp_dir; | |
754 ASSERT_TRUE(temp_dir.CreateUniqueTempDir()); | |
755 | |
756 WriteOnlyRawChannelDelegate write_delegate; | |
757 scoped_ptr<RawChannel> rc_write(RawChannel::Create(handles[0].Pass())); | |
758 io_thread()->PostTaskAndWait(FROM_HERE, | |
759 base::Bind(&InitOnIOThread, rc_write.get(), | |
760 base::Unretained(&write_delegate))); | |
761 | |
762 ReadPlatformHandlesCheckerRawChannelDelegate read_delegate; | |
763 scoped_ptr<RawChannel> rc_read(RawChannel::Create(handles[1].Pass())); | |
764 io_thread()->PostTaskAndWait(FROM_HERE, | |
765 base::Bind(&InitOnIOThread, rc_read.get(), | |
766 base::Unretained(&read_delegate))); | |
767 | |
768 base::FilePath unused; | |
769 base::ScopedFILE fp1( | |
770 base::CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused)); | |
771 EXPECT_EQ(1u, fwrite("1", 1, 1, fp1.get())); | |
772 base::ScopedFILE fp2( | |
773 base::CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused)); | |
774 EXPECT_EQ(1u, fwrite("2", 1, 1, fp2.get())); | |
775 | |
776 { | |
777 const char kHello[] = "hello"; | |
778 embedder::ScopedPlatformHandleVectorPtr platform_handles( | |
779 new embedder::PlatformHandleVector()); | |
780 platform_handles->push_back( | |
781 mojo::test::PlatformHandleFromFILE(fp1.Pass()).release()); | |
782 platform_handles->push_back( | |
783 mojo::test::PlatformHandleFromFILE(fp2.Pass()).release()); | |
784 | |
785 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
786 MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, | |
787 sizeof(kHello), kHello)); | |
788 message->SetTransportData( | |
789 make_scoped_ptr(new TransportData(platform_handles.Pass()))); | |
790 EXPECT_TRUE(rc_write->WriteMessage(message.Pass())); | |
791 } | |
792 | |
793 read_delegate.Wait(); | |
794 | |
795 io_thread()->PostTaskAndWait( | |
796 FROM_HERE, | |
797 base::Bind(&RawChannel::Shutdown, base::Unretained(rc_read.get()))); | |
798 io_thread()->PostTaskAndWait( | |
799 FROM_HERE, | |
800 base::Bind(&RawChannel::Shutdown, base::Unretained(rc_write.get()))); | |
801 } | |
802 | |
803 } // namespace | |
804 } // namespace system | |
805 } // namespace mojo | |
OLD | NEW |