| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "mojo/edk/system/raw_channel.h" | |
| 6 | |
| 7 #include <stdint.h> | |
| 8 #include <stdio.h> | |
| 9 | |
| 10 #include <vector> | |
| 11 | |
| 12 #include "base/bind.h" | |
| 13 #include "base/files/file_path.h" | |
| 14 #include "base/files/file_util.h" | |
| 15 #include "base/files/scoped_file.h" | |
| 16 #include "base/files/scoped_temp_dir.h" | |
| 17 #include "base/location.h" | |
| 18 #include "base/logging.h" | |
| 19 #include "base/macros.h" | |
| 20 #include "base/memory/scoped_ptr.h" | |
| 21 #include "base/memory/scoped_vector.h" | |
| 22 #include "base/rand_util.h" | |
| 23 #include "base/synchronization/lock.h" | |
| 24 #include "base/synchronization/waitable_event.h" | |
| 25 #include "base/test/test_io_thread.h" | |
| 26 #include "base/threading/platform_thread.h" // For |Sleep()|. | |
| 27 #include "base/threading/simple_thread.h" | |
| 28 #include "base/time/time.h" | |
| 29 #include "build/build_config.h" // TODO(vtl): Remove this. | |
| 30 #include "mojo/edk/embedder/platform_channel_pair.h" | |
| 31 #include "mojo/edk/embedder/platform_handle.h" | |
| 32 #include "mojo/edk/embedder/scoped_platform_handle.h" | |
| 33 #include "mojo/edk/system/message_in_transit.h" | |
| 34 #include "mojo/edk/system/test_utils.h" | |
| 35 #include "mojo/edk/system/transport_data.h" | |
| 36 #include "mojo/edk/test/test_utils.h" | |
| 37 #include "testing/gtest/include/gtest/gtest.h" | |
| 38 | |
| 39 namespace mojo { | |
| 40 namespace system { | |
| 41 namespace { | |
| 42 | |
| 43 scoped_ptr<MessageInTransit> MakeTestMessage(uint32_t num_bytes) { | |
| 44 std::vector<unsigned char> bytes(num_bytes, 0); | |
| 45 for (size_t i = 0; i < num_bytes; i++) | |
| 46 bytes[i] = static_cast<unsigned char>(i + num_bytes); | |
| 47 return make_scoped_ptr(new MessageInTransit( | |
| 48 MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, | |
| 49 num_bytes, bytes.empty() ? nullptr : &bytes[0])); | |
| 50 } | |
| 51 | |
| 52 bool CheckMessageData(const void* bytes, uint32_t num_bytes) { | |
| 53 const unsigned char* b = static_cast<const unsigned char*>(bytes); | |
| 54 for (uint32_t i = 0; i < num_bytes; i++) { | |
| 55 if (b[i] != static_cast<unsigned char>(i + num_bytes)) | |
| 56 return false; | |
| 57 } | |
| 58 return true; | |
| 59 } | |
| 60 | |
| 61 void InitOnIOThread(RawChannel* raw_channel, RawChannel::Delegate* delegate) { | |
| 62 raw_channel->Init(delegate); | |
| 63 } | |
| 64 | |
| 65 bool WriteTestMessageToHandle(const embedder::PlatformHandle& handle, | |
| 66 uint32_t num_bytes) { | |
| 67 scoped_ptr<MessageInTransit> message(MakeTestMessage(num_bytes)); | |
| 68 | |
| 69 size_t write_size = 0; | |
| 70 mojo::test::BlockingWrite(handle, message->main_buffer(), | |
| 71 message->main_buffer_size(), &write_size); | |
| 72 return write_size == message->main_buffer_size(); | |
| 73 } | |
| 74 | |
| 75 // ----------------------------------------------------------------------------- | |
| 76 | |
| 77 class RawChannelTest : public testing::Test { | |
| 78 public: | |
| 79 RawChannelTest() : io_thread_(base::TestIOThread::kManualStart) {} | |
| 80 ~RawChannelTest() override {} | |
| 81 | |
| 82 void SetUp() override { | |
| 83 embedder::PlatformChannelPair channel_pair; | |
| 84 handles[0] = channel_pair.PassServerHandle(); | |
| 85 handles[1] = channel_pair.PassClientHandle(); | |
| 86 io_thread_.Start(); | |
| 87 } | |
| 88 | |
| 89 void TearDown() override { | |
| 90 io_thread_.Stop(); | |
| 91 handles[0].reset(); | |
| 92 handles[1].reset(); | |
| 93 } | |
| 94 | |
| 95 protected: | |
| 96 base::TestIOThread* io_thread() { return &io_thread_; } | |
| 97 | |
| 98 embedder::ScopedPlatformHandle handles[2]; | |
| 99 | |
| 100 private: | |
| 101 base::TestIOThread io_thread_; | |
| 102 | |
| 103 DISALLOW_COPY_AND_ASSIGN(RawChannelTest); | |
| 104 }; | |
| 105 | |
| 106 // RawChannelTest.WriteMessage ------------------------------------------------- | |
| 107 | |
| 108 class WriteOnlyRawChannelDelegate : public RawChannel::Delegate { | |
| 109 public: | |
| 110 WriteOnlyRawChannelDelegate() {} | |
| 111 ~WriteOnlyRawChannelDelegate() override {} | |
| 112 | |
| 113 // |RawChannel::Delegate| implementation: | |
| 114 void OnReadMessage( | |
| 115 const MessageInTransit::View& /*message_view*/, | |
| 116 embedder::ScopedPlatformHandleVectorPtr /*platform_handles*/) override { | |
| 117 CHECK(false); // Should not get called. | |
| 118 } | |
| 119 void OnError(Error error) override { | |
| 120 // We'll get a read (shutdown) error when the connection is closed. | |
| 121 CHECK_EQ(error, ERROR_READ_SHUTDOWN); | |
| 122 } | |
| 123 | |
| 124 private: | |
| 125 DISALLOW_COPY_AND_ASSIGN(WriteOnlyRawChannelDelegate); | |
| 126 }; | |
| 127 | |
| 128 static const int64_t kMessageReaderSleepMs = 1; | |
| 129 static const size_t kMessageReaderMaxPollIterations = 3000; | |
| 130 | |
| 131 class TestMessageReaderAndChecker { | |
| 132 public: | |
| 133 explicit TestMessageReaderAndChecker(embedder::PlatformHandle handle) | |
| 134 : handle_(handle) {} | |
| 135 ~TestMessageReaderAndChecker() { CHECK(bytes_.empty()); } | |
| 136 | |
| 137 bool ReadAndCheckNextMessage(uint32_t expected_size) { | |
| 138 unsigned char buffer[4096]; | |
| 139 | |
| 140 for (size_t i = 0; i < kMessageReaderMaxPollIterations;) { | |
| 141 size_t read_size = 0; | |
| 142 CHECK(mojo::test::NonBlockingRead(handle_, buffer, sizeof(buffer), | |
| 143 &read_size)); | |
| 144 | |
| 145 // Append newly-read data to |bytes_|. | |
| 146 bytes_.insert(bytes_.end(), buffer, buffer + read_size); | |
| 147 | |
| 148 // If we have the header.... | |
| 149 size_t message_size; | |
| 150 if (MessageInTransit::GetNextMessageSize( | |
| 151 bytes_.empty() ? nullptr : &bytes_[0], bytes_.size(), | |
| 152 &message_size)) { | |
| 153 // If we've read the whole message.... | |
| 154 if (bytes_.size() >= message_size) { | |
| 155 bool rv = true; | |
| 156 MessageInTransit::View message_view(message_size, &bytes_[0]); | |
| 157 CHECK_EQ(message_view.main_buffer_size(), message_size); | |
| 158 | |
| 159 if (message_view.num_bytes() != expected_size) { | |
| 160 LOG(ERROR) << "Wrong size: " << message_size << " instead of " | |
| 161 << expected_size << " bytes."; | |
| 162 rv = false; | |
| 163 } else if (!CheckMessageData(message_view.bytes(), | |
| 164 message_view.num_bytes())) { | |
| 165 LOG(ERROR) << "Incorrect message bytes."; | |
| 166 rv = false; | |
| 167 } | |
| 168 | |
| 169 // Erase message data. | |
| 170 bytes_.erase(bytes_.begin(), | |
| 171 bytes_.begin() + message_view.main_buffer_size()); | |
| 172 return rv; | |
| 173 } | |
| 174 } | |
| 175 | |
| 176 if (static_cast<size_t>(read_size) < sizeof(buffer)) { | |
| 177 i++; | |
| 178 base::PlatformThread::Sleep( | |
| 179 base::TimeDelta::FromMilliseconds(kMessageReaderSleepMs)); | |
| 180 } | |
| 181 } | |
| 182 | |
| 183 LOG(ERROR) << "Too many iterations."; | |
| 184 return false; | |
| 185 } | |
| 186 | |
| 187 private: | |
| 188 const embedder::PlatformHandle handle_; | |
| 189 | |
| 190 // The start of the received data should always be on a message boundary. | |
| 191 std::vector<unsigned char> bytes_; | |
| 192 | |
| 193 DISALLOW_COPY_AND_ASSIGN(TestMessageReaderAndChecker); | |
| 194 }; | |
| 195 | |
| 196 // Tests writing (and verifies reading using our own custom reader). | |
| 197 TEST_F(RawChannelTest, WriteMessage) { | |
| 198 WriteOnlyRawChannelDelegate delegate; | |
| 199 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
| 200 TestMessageReaderAndChecker checker(handles[1].get()); | |
| 201 io_thread()->PostTaskAndWait( | |
| 202 FROM_HERE, | |
| 203 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
| 204 | |
| 205 // Write and read, for a variety of sizes. | |
| 206 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) { | |
| 207 EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size))); | |
| 208 EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size; | |
| 209 } | |
| 210 | |
| 211 // Write/queue and read afterwards, for a variety of sizes. | |
| 212 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) | |
| 213 EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size))); | |
| 214 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) | |
| 215 EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size; | |
| 216 | |
| 217 io_thread()->PostTaskAndWait( | |
| 218 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); | |
| 219 } | |
| 220 | |
| 221 // RawChannelTest.OnReadMessage ------------------------------------------------ | |
| 222 | |
| 223 class ReadCheckerRawChannelDelegate : public RawChannel::Delegate { | |
| 224 public: | |
| 225 ReadCheckerRawChannelDelegate() : done_event_(false, false), position_(0) {} | |
| 226 ~ReadCheckerRawChannelDelegate() override {} | |
| 227 | |
| 228 // |RawChannel::Delegate| implementation (called on the I/O thread): | |
| 229 void OnReadMessage( | |
| 230 const MessageInTransit::View& message_view, | |
| 231 embedder::ScopedPlatformHandleVectorPtr platform_handles) override { | |
| 232 EXPECT_FALSE(platform_handles); | |
| 233 | |
| 234 size_t position; | |
| 235 size_t expected_size; | |
| 236 bool should_signal = false; | |
| 237 { | |
| 238 base::AutoLock locker(lock_); | |
| 239 CHECK_LT(position_, expected_sizes_.size()); | |
| 240 position = position_; | |
| 241 expected_size = expected_sizes_[position]; | |
| 242 position_++; | |
| 243 if (position_ >= expected_sizes_.size()) | |
| 244 should_signal = true; | |
| 245 } | |
| 246 | |
| 247 EXPECT_EQ(expected_size, message_view.num_bytes()) << position; | |
| 248 if (message_view.num_bytes() == expected_size) { | |
| 249 EXPECT_TRUE( | |
| 250 CheckMessageData(message_view.bytes(), message_view.num_bytes())) | |
| 251 << position; | |
| 252 } | |
| 253 | |
| 254 if (should_signal) | |
| 255 done_event_.Signal(); | |
| 256 } | |
| 257 void OnError(Error error) override { | |
| 258 // We'll get a read (shutdown) error when the connection is closed. | |
| 259 CHECK_EQ(error, ERROR_READ_SHUTDOWN); | |
| 260 } | |
| 261 | |
| 262 // Waits for all the messages (of sizes |expected_sizes_|) to be seen. | |
| 263 void Wait() { done_event_.Wait(); } | |
| 264 | |
| 265 void SetExpectedSizes(const std::vector<uint32_t>& expected_sizes) { | |
| 266 base::AutoLock locker(lock_); | |
| 267 CHECK_EQ(position_, expected_sizes_.size()); | |
| 268 expected_sizes_ = expected_sizes; | |
| 269 position_ = 0; | |
| 270 } | |
| 271 | |
| 272 private: | |
| 273 base::WaitableEvent done_event_; | |
| 274 | |
| 275 base::Lock lock_; // Protects the following members. | |
| 276 std::vector<uint32_t> expected_sizes_; | |
| 277 size_t position_; | |
| 278 | |
| 279 DISALLOW_COPY_AND_ASSIGN(ReadCheckerRawChannelDelegate); | |
| 280 }; | |
| 281 | |
| 282 // Tests reading (writing using our own custom writer). | |
| 283 TEST_F(RawChannelTest, OnReadMessage) { | |
| 284 ReadCheckerRawChannelDelegate delegate; | |
| 285 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
| 286 io_thread()->PostTaskAndWait( | |
| 287 FROM_HERE, | |
| 288 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
| 289 | |
| 290 // Write and read, for a variety of sizes. | |
| 291 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) { | |
| 292 delegate.SetExpectedSizes(std::vector<uint32_t>(1, size)); | |
| 293 | |
| 294 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), size)); | |
| 295 | |
| 296 delegate.Wait(); | |
| 297 } | |
| 298 | |
| 299 // Set up reader and write as fast as we can. | |
| 300 // Write/queue and read afterwards, for a variety of sizes. | |
| 301 std::vector<uint32_t> expected_sizes; | |
| 302 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) | |
| 303 expected_sizes.push_back(size); | |
| 304 delegate.SetExpectedSizes(expected_sizes); | |
| 305 for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) | |
| 306 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), size)); | |
| 307 delegate.Wait(); | |
| 308 | |
| 309 io_thread()->PostTaskAndWait( | |
| 310 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); | |
| 311 } | |
| 312 | |
| 313 // RawChannelTest.WriteMessageAndOnReadMessage --------------------------------- | |
| 314 | |
| 315 class RawChannelWriterThread : public base::SimpleThread { | |
| 316 public: | |
| 317 RawChannelWriterThread(RawChannel* raw_channel, size_t write_count) | |
| 318 : base::SimpleThread("raw_channel_writer_thread"), | |
| 319 raw_channel_(raw_channel), | |
| 320 left_to_write_(write_count) {} | |
| 321 | |
| 322 ~RawChannelWriterThread() override { Join(); } | |
| 323 | |
| 324 private: | |
| 325 void Run() override { | |
| 326 static const int kMaxRandomMessageSize = 25000; | |
| 327 | |
| 328 while (left_to_write_-- > 0) { | |
| 329 EXPECT_TRUE(raw_channel_->WriteMessage(MakeTestMessage( | |
| 330 static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize))))); | |
| 331 } | |
| 332 } | |
| 333 | |
| 334 RawChannel* const raw_channel_; | |
| 335 size_t left_to_write_; | |
| 336 | |
| 337 DISALLOW_COPY_AND_ASSIGN(RawChannelWriterThread); | |
| 338 }; | |
| 339 | |
| 340 class ReadCountdownRawChannelDelegate : public RawChannel::Delegate { | |
| 341 public: | |
| 342 explicit ReadCountdownRawChannelDelegate(size_t expected_count) | |
| 343 : done_event_(false, false), expected_count_(expected_count), count_(0) {} | |
| 344 ~ReadCountdownRawChannelDelegate() override {} | |
| 345 | |
| 346 // |RawChannel::Delegate| implementation (called on the I/O thread): | |
| 347 void OnReadMessage( | |
| 348 const MessageInTransit::View& message_view, | |
| 349 embedder::ScopedPlatformHandleVectorPtr platform_handles) override { | |
| 350 EXPECT_FALSE(platform_handles); | |
| 351 | |
| 352 EXPECT_LT(count_, expected_count_); | |
| 353 count_++; | |
| 354 | |
| 355 EXPECT_TRUE( | |
| 356 CheckMessageData(message_view.bytes(), message_view.num_bytes())); | |
| 357 | |
| 358 if (count_ >= expected_count_) | |
| 359 done_event_.Signal(); | |
| 360 } | |
| 361 void OnError(Error error) override { | |
| 362 // We'll get a read (shutdown) error when the connection is closed. | |
| 363 CHECK_EQ(error, ERROR_READ_SHUTDOWN); | |
| 364 } | |
| 365 | |
| 366 // Waits for all the messages to have been seen. | |
| 367 void Wait() { done_event_.Wait(); } | |
| 368 | |
| 369 private: | |
| 370 base::WaitableEvent done_event_; | |
| 371 size_t expected_count_; | |
| 372 size_t count_; | |
| 373 | |
| 374 DISALLOW_COPY_AND_ASSIGN(ReadCountdownRawChannelDelegate); | |
| 375 }; | |
| 376 | |
| 377 TEST_F(RawChannelTest, WriteMessageAndOnReadMessage) { | |
| 378 static const size_t kNumWriterThreads = 10; | |
| 379 static const size_t kNumWriteMessagesPerThread = 4000; | |
| 380 | |
| 381 WriteOnlyRawChannelDelegate writer_delegate; | |
| 382 scoped_ptr<RawChannel> writer_rc(RawChannel::Create(handles[0].Pass())); | |
| 383 io_thread()->PostTaskAndWait(FROM_HERE, | |
| 384 base::Bind(&InitOnIOThread, writer_rc.get(), | |
| 385 base::Unretained(&writer_delegate))); | |
| 386 | |
| 387 ReadCountdownRawChannelDelegate reader_delegate(kNumWriterThreads * | |
| 388 kNumWriteMessagesPerThread); | |
| 389 scoped_ptr<RawChannel> reader_rc(RawChannel::Create(handles[1].Pass())); | |
| 390 io_thread()->PostTaskAndWait(FROM_HERE, | |
| 391 base::Bind(&InitOnIOThread, reader_rc.get(), | |
| 392 base::Unretained(&reader_delegate))); | |
| 393 | |
| 394 { | |
| 395 ScopedVector<RawChannelWriterThread> writer_threads; | |
| 396 for (size_t i = 0; i < kNumWriterThreads; i++) { | |
| 397 writer_threads.push_back(new RawChannelWriterThread( | |
| 398 writer_rc.get(), kNumWriteMessagesPerThread)); | |
| 399 } | |
| 400 for (size_t i = 0; i < writer_threads.size(); i++) | |
| 401 writer_threads[i]->Start(); | |
| 402 } // Joins all the writer threads. | |
| 403 | |
| 404 // Sleep a bit, to let any extraneous reads be processed. (There shouldn't be | |
| 405 // any, but we want to know about them.) | |
| 406 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100)); | |
| 407 | |
| 408 // Wait for reading to finish. | |
| 409 reader_delegate.Wait(); | |
| 410 | |
| 411 io_thread()->PostTaskAndWait( | |
| 412 FROM_HERE, | |
| 413 base::Bind(&RawChannel::Shutdown, base::Unretained(reader_rc.get()))); | |
| 414 | |
| 415 io_thread()->PostTaskAndWait( | |
| 416 FROM_HERE, | |
| 417 base::Bind(&RawChannel::Shutdown, base::Unretained(writer_rc.get()))); | |
| 418 } | |
| 419 | |
| 420 // RawChannelTest.OnError ------------------------------------------------------ | |
| 421 | |
| 422 class ErrorRecordingRawChannelDelegate | |
| 423 : public ReadCountdownRawChannelDelegate { | |
| 424 public: | |
| 425 ErrorRecordingRawChannelDelegate(size_t expected_read_count, | |
| 426 bool expect_read_error, | |
| 427 bool expect_write_error) | |
| 428 : ReadCountdownRawChannelDelegate(expected_read_count), | |
| 429 got_read_error_event_(false, false), | |
| 430 got_write_error_event_(false, false), | |
| 431 expecting_read_error_(expect_read_error), | |
| 432 expecting_write_error_(expect_write_error) {} | |
| 433 | |
| 434 ~ErrorRecordingRawChannelDelegate() override {} | |
| 435 | |
| 436 void OnError(Error error) override { | |
| 437 switch (error) { | |
| 438 case ERROR_READ_SHUTDOWN: | |
| 439 ASSERT_TRUE(expecting_read_error_); | |
| 440 expecting_read_error_ = false; | |
| 441 got_read_error_event_.Signal(); | |
| 442 break; | |
| 443 case ERROR_READ_BROKEN: | |
| 444 // TODO(vtl): Test broken connections. | |
| 445 CHECK(false); | |
| 446 break; | |
| 447 case ERROR_READ_BAD_MESSAGE: | |
| 448 // TODO(vtl): Test reception/detection of bad messages. | |
| 449 CHECK(false); | |
| 450 break; | |
| 451 case ERROR_READ_UNKNOWN: | |
| 452 // TODO(vtl): Test however it is we might get here. | |
| 453 CHECK(false); | |
| 454 break; | |
| 455 case ERROR_WRITE: | |
| 456 ASSERT_TRUE(expecting_write_error_); | |
| 457 expecting_write_error_ = false; | |
| 458 got_write_error_event_.Signal(); | |
| 459 break; | |
| 460 } | |
| 461 } | |
| 462 | |
| 463 void WaitForReadError() { got_read_error_event_.Wait(); } | |
| 464 void WaitForWriteError() { got_write_error_event_.Wait(); } | |
| 465 | |
| 466 private: | |
| 467 base::WaitableEvent got_read_error_event_; | |
| 468 base::WaitableEvent got_write_error_event_; | |
| 469 | |
| 470 bool expecting_read_error_; | |
| 471 bool expecting_write_error_; | |
| 472 | |
| 473 DISALLOW_COPY_AND_ASSIGN(ErrorRecordingRawChannelDelegate); | |
| 474 }; | |
| 475 | |
| 476 // Tests (fatal) errors. | |
| 477 TEST_F(RawChannelTest, OnError) { | |
| 478 ErrorRecordingRawChannelDelegate delegate(0, true, true); | |
| 479 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
| 480 io_thread()->PostTaskAndWait( | |
| 481 FROM_HERE, | |
| 482 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
| 483 | |
| 484 // Close the handle of the other end, which should make writing fail. | |
| 485 handles[1].reset(); | |
| 486 | |
| 487 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); | |
| 488 | |
| 489 // We should get a write error. | |
| 490 delegate.WaitForWriteError(); | |
| 491 | |
| 492 // We should also get a read error. | |
| 493 delegate.WaitForReadError(); | |
| 494 | |
| 495 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(2))); | |
| 496 | |
| 497 // Sleep a bit, to make sure we don't get another |OnError()| | |
| 498 // notification. (If we actually get another one, |OnError()| crashes.) | |
| 499 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20)); | |
| 500 | |
| 501 io_thread()->PostTaskAndWait( | |
| 502 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); | |
| 503 } | |
| 504 | |
| 505 // RawChannelTest.ReadUnaffectedByWriteError ----------------------------------- | |
| 506 | |
| 507 TEST_F(RawChannelTest, ReadUnaffectedByWriteError) { | |
| 508 const size_t kMessageCount = 5; | |
| 509 | |
| 510 // Write a few messages into the other end. | |
| 511 uint32_t message_size = 1; | |
| 512 for (size_t i = 0; i < kMessageCount; | |
| 513 i++, message_size += message_size / 2 + 1) | |
| 514 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), message_size)); | |
| 515 | |
| 516 // Close the other end, which should make writing fail. | |
| 517 handles[1].reset(); | |
| 518 | |
| 519 // Only start up reading here. The system buffer should still contain the | |
| 520 // messages that were written. | |
| 521 ErrorRecordingRawChannelDelegate delegate(kMessageCount, true, true); | |
| 522 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
| 523 io_thread()->PostTaskAndWait( | |
| 524 FROM_HERE, | |
| 525 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
| 526 | |
| 527 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); | |
| 528 | |
| 529 // We should definitely get a write error. | |
| 530 delegate.WaitForWriteError(); | |
| 531 | |
| 532 // Wait for reading to finish. A writing failure shouldn't affect reading. | |
| 533 delegate.Wait(); | |
| 534 | |
| 535 // And then we should get a read error. | |
| 536 delegate.WaitForReadError(); | |
| 537 | |
| 538 io_thread()->PostTaskAndWait( | |
| 539 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); | |
| 540 } | |
| 541 | |
| 542 // RawChannelTest.WriteMessageAfterShutdown ------------------------------------ | |
| 543 | |
| 544 // Makes sure that calling |WriteMessage()| after |Shutdown()| behaves | |
| 545 // correctly. | |
| 546 TEST_F(RawChannelTest, WriteMessageAfterShutdown) { | |
| 547 WriteOnlyRawChannelDelegate delegate; | |
| 548 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
| 549 io_thread()->PostTaskAndWait( | |
| 550 FROM_HERE, | |
| 551 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
| 552 io_thread()->PostTaskAndWait( | |
| 553 FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); | |
| 554 | |
| 555 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); | |
| 556 } | |
| 557 | |
| 558 // RawChannelTest.ShutdownOnReadMessage ---------------------------------------- | |
| 559 | |
| 560 class ShutdownOnReadMessageRawChannelDelegate : public RawChannel::Delegate { | |
| 561 public: | |
| 562 explicit ShutdownOnReadMessageRawChannelDelegate(RawChannel* raw_channel) | |
| 563 : raw_channel_(raw_channel), | |
| 564 done_event_(false, false), | |
| 565 did_shutdown_(false) {} | |
| 566 ~ShutdownOnReadMessageRawChannelDelegate() override {} | |
| 567 | |
| 568 // |RawChannel::Delegate| implementation (called on the I/O thread): | |
| 569 void OnReadMessage( | |
| 570 const MessageInTransit::View& message_view, | |
| 571 embedder::ScopedPlatformHandleVectorPtr platform_handles) override { | |
| 572 EXPECT_FALSE(platform_handles); | |
| 573 EXPECT_FALSE(did_shutdown_); | |
| 574 EXPECT_TRUE( | |
| 575 CheckMessageData(message_view.bytes(), message_view.num_bytes())); | |
| 576 raw_channel_->Shutdown(); | |
| 577 did_shutdown_ = true; | |
| 578 done_event_.Signal(); | |
| 579 } | |
| 580 void OnError(Error /*error*/) override { | |
| 581 CHECK(false); // Should not get called. | |
| 582 } | |
| 583 | |
| 584 // Waits for shutdown. | |
| 585 void Wait() { | |
| 586 done_event_.Wait(); | |
| 587 EXPECT_TRUE(did_shutdown_); | |
| 588 } | |
| 589 | |
| 590 private: | |
| 591 RawChannel* const raw_channel_; | |
| 592 base::WaitableEvent done_event_; | |
| 593 bool did_shutdown_; | |
| 594 | |
| 595 DISALLOW_COPY_AND_ASSIGN(ShutdownOnReadMessageRawChannelDelegate); | |
| 596 }; | |
| 597 | |
| 598 TEST_F(RawChannelTest, ShutdownOnReadMessage) { | |
| 599 // Write a few messages into the other end. | |
| 600 for (size_t count = 0; count < 5; count++) | |
| 601 EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), 10)); | |
| 602 | |
| 603 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
| 604 ShutdownOnReadMessageRawChannelDelegate delegate(rc.get()); | |
| 605 io_thread()->PostTaskAndWait( | |
| 606 FROM_HERE, | |
| 607 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
| 608 | |
| 609 // Wait for the delegate, which will shut the |RawChannel| down. | |
| 610 delegate.Wait(); | |
| 611 } | |
| 612 | |
| 613 // RawChannelTest.ShutdownOnError{Read, Write} --------------------------------- | |
| 614 | |
| 615 class ShutdownOnErrorRawChannelDelegate : public RawChannel::Delegate { | |
| 616 public: | |
| 617 ShutdownOnErrorRawChannelDelegate(RawChannel* raw_channel, | |
| 618 Error shutdown_on_error_type) | |
| 619 : raw_channel_(raw_channel), | |
| 620 shutdown_on_error_type_(shutdown_on_error_type), | |
| 621 done_event_(false, false), | |
| 622 did_shutdown_(false) {} | |
| 623 ~ShutdownOnErrorRawChannelDelegate() override {} | |
| 624 | |
| 625 // |RawChannel::Delegate| implementation (called on the I/O thread): | |
| 626 void OnReadMessage( | |
| 627 const MessageInTransit::View& /*message_view*/, | |
| 628 embedder::ScopedPlatformHandleVectorPtr /*platform_handles*/) override { | |
| 629 CHECK(false); // Should not get called. | |
| 630 } | |
| 631 void OnError(Error error) override { | |
| 632 EXPECT_FALSE(did_shutdown_); | |
| 633 if (error != shutdown_on_error_type_) | |
| 634 return; | |
| 635 raw_channel_->Shutdown(); | |
| 636 did_shutdown_ = true; | |
| 637 done_event_.Signal(); | |
| 638 } | |
| 639 | |
| 640 // Waits for shutdown. | |
| 641 void Wait() { | |
| 642 done_event_.Wait(); | |
| 643 EXPECT_TRUE(did_shutdown_); | |
| 644 } | |
| 645 | |
| 646 private: | |
| 647 RawChannel* const raw_channel_; | |
| 648 const Error shutdown_on_error_type_; | |
| 649 base::WaitableEvent done_event_; | |
| 650 bool did_shutdown_; | |
| 651 | |
| 652 DISALLOW_COPY_AND_ASSIGN(ShutdownOnErrorRawChannelDelegate); | |
| 653 }; | |
| 654 | |
| 655 TEST_F(RawChannelTest, ShutdownOnErrorRead) { | |
| 656 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
| 657 ShutdownOnErrorRawChannelDelegate delegate( | |
| 658 rc.get(), RawChannel::Delegate::ERROR_READ_SHUTDOWN); | |
| 659 io_thread()->PostTaskAndWait( | |
| 660 FROM_HERE, | |
| 661 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
| 662 | |
| 663 // Close the handle of the other end, which should stuff fail. | |
| 664 handles[1].reset(); | |
| 665 | |
| 666 // Wait for the delegate, which will shut the |RawChannel| down. | |
| 667 delegate.Wait(); | |
| 668 } | |
| 669 | |
| 670 TEST_F(RawChannelTest, ShutdownOnErrorWrite) { | |
| 671 scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); | |
| 672 ShutdownOnErrorRawChannelDelegate delegate(rc.get(), | |
| 673 RawChannel::Delegate::ERROR_WRITE); | |
| 674 io_thread()->PostTaskAndWait( | |
| 675 FROM_HERE, | |
| 676 base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); | |
| 677 | |
| 678 // Close the handle of the other end, which should stuff fail. | |
| 679 handles[1].reset(); | |
| 680 | |
| 681 EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); | |
| 682 | |
| 683 // Wait for the delegate, which will shut the |RawChannel| down. | |
| 684 delegate.Wait(); | |
| 685 } | |
| 686 | |
| 687 // RawChannelTest.ReadWritePlatformHandles ------------------------------------- | |
| 688 | |
| 689 class ReadPlatformHandlesCheckerRawChannelDelegate | |
| 690 : public RawChannel::Delegate { | |
| 691 public: | |
| 692 ReadPlatformHandlesCheckerRawChannelDelegate() : done_event_(false, false) {} | |
| 693 ~ReadPlatformHandlesCheckerRawChannelDelegate() override {} | |
| 694 | |
| 695 // |RawChannel::Delegate| implementation (called on the I/O thread): | |
| 696 void OnReadMessage( | |
| 697 const MessageInTransit::View& message_view, | |
| 698 embedder::ScopedPlatformHandleVectorPtr platform_handles) override { | |
| 699 const char kHello[] = "hello"; | |
| 700 | |
| 701 EXPECT_EQ(sizeof(kHello), message_view.num_bytes()); | |
| 702 EXPECT_STREQ(kHello, static_cast<const char*>(message_view.bytes())); | |
| 703 | |
| 704 ASSERT_TRUE(platform_handles); | |
| 705 ASSERT_EQ(2u, platform_handles->size()); | |
| 706 embedder::ScopedPlatformHandle h1(platform_handles->at(0)); | |
| 707 EXPECT_TRUE(h1.is_valid()); | |
| 708 embedder::ScopedPlatformHandle h2(platform_handles->at(1)); | |
| 709 EXPECT_TRUE(h2.is_valid()); | |
| 710 platform_handles->clear(); | |
| 711 | |
| 712 { | |
| 713 char buffer[100] = {}; | |
| 714 | |
| 715 base::ScopedFILE fp(mojo::test::FILEFromPlatformHandle(h1.Pass(), "rb")); | |
| 716 EXPECT_TRUE(fp); | |
| 717 rewind(fp.get()); | |
| 718 EXPECT_EQ(1u, fread(buffer, 1, sizeof(buffer), fp.get())); | |
| 719 EXPECT_EQ('1', buffer[0]); | |
| 720 } | |
| 721 | |
| 722 { | |
| 723 char buffer[100] = {}; | |
| 724 base::ScopedFILE fp(mojo::test::FILEFromPlatformHandle(h2.Pass(), "rb")); | |
| 725 EXPECT_TRUE(fp); | |
| 726 rewind(fp.get()); | |
| 727 EXPECT_EQ(1u, fread(buffer, 1, sizeof(buffer), fp.get())); | |
| 728 EXPECT_EQ('2', buffer[0]); | |
| 729 } | |
| 730 | |
| 731 done_event_.Signal(); | |
| 732 } | |
| 733 void OnError(Error error) override { | |
| 734 // We'll get a read (shutdown) error when the connection is closed. | |
| 735 CHECK_EQ(error, ERROR_READ_SHUTDOWN); | |
| 736 } | |
| 737 | |
| 738 void Wait() { done_event_.Wait(); } | |
| 739 | |
| 740 private: | |
| 741 base::WaitableEvent done_event_; | |
| 742 | |
| 743 DISALLOW_COPY_AND_ASSIGN(ReadPlatformHandlesCheckerRawChannelDelegate); | |
| 744 }; | |
| 745 | |
| 746 #if defined(OS_POSIX) | |
| 747 #define MAYBE_ReadWritePlatformHandles ReadWritePlatformHandles | |
| 748 #else | |
| 749 // Not yet implemented (on Windows). | |
| 750 #define MAYBE_ReadWritePlatformHandles DISABLED_ReadWritePlatformHandles | |
| 751 #endif | |
| 752 TEST_F(RawChannelTest, MAYBE_ReadWritePlatformHandles) { | |
| 753 base::ScopedTempDir temp_dir; | |
| 754 ASSERT_TRUE(temp_dir.CreateUniqueTempDir()); | |
| 755 | |
| 756 WriteOnlyRawChannelDelegate write_delegate; | |
| 757 scoped_ptr<RawChannel> rc_write(RawChannel::Create(handles[0].Pass())); | |
| 758 io_thread()->PostTaskAndWait(FROM_HERE, | |
| 759 base::Bind(&InitOnIOThread, rc_write.get(), | |
| 760 base::Unretained(&write_delegate))); | |
| 761 | |
| 762 ReadPlatformHandlesCheckerRawChannelDelegate read_delegate; | |
| 763 scoped_ptr<RawChannel> rc_read(RawChannel::Create(handles[1].Pass())); | |
| 764 io_thread()->PostTaskAndWait(FROM_HERE, | |
| 765 base::Bind(&InitOnIOThread, rc_read.get(), | |
| 766 base::Unretained(&read_delegate))); | |
| 767 | |
| 768 base::FilePath unused; | |
| 769 base::ScopedFILE fp1( | |
| 770 base::CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused)); | |
| 771 EXPECT_EQ(1u, fwrite("1", 1, 1, fp1.get())); | |
| 772 base::ScopedFILE fp2( | |
| 773 base::CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused)); | |
| 774 EXPECT_EQ(1u, fwrite("2", 1, 1, fp2.get())); | |
| 775 | |
| 776 { | |
| 777 const char kHello[] = "hello"; | |
| 778 embedder::ScopedPlatformHandleVectorPtr platform_handles( | |
| 779 new embedder::PlatformHandleVector()); | |
| 780 platform_handles->push_back( | |
| 781 mojo::test::PlatformHandleFromFILE(fp1.Pass()).release()); | |
| 782 platform_handles->push_back( | |
| 783 mojo::test::PlatformHandleFromFILE(fp2.Pass()).release()); | |
| 784 | |
| 785 scoped_ptr<MessageInTransit> message(new MessageInTransit( | |
| 786 MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, | |
| 787 sizeof(kHello), kHello)); | |
| 788 message->SetTransportData( | |
| 789 make_scoped_ptr(new TransportData(platform_handles.Pass()))); | |
| 790 EXPECT_TRUE(rc_write->WriteMessage(message.Pass())); | |
| 791 } | |
| 792 | |
| 793 read_delegate.Wait(); | |
| 794 | |
| 795 io_thread()->PostTaskAndWait( | |
| 796 FROM_HERE, | |
| 797 base::Bind(&RawChannel::Shutdown, base::Unretained(rc_read.get()))); | |
| 798 io_thread()->PostTaskAndWait( | |
| 799 FROM_HERE, | |
| 800 base::Bind(&RawChannel::Shutdown, base::Unretained(rc_write.get()))); | |
| 801 } | |
| 802 | |
| 803 } // namespace | |
| 804 } // namespace system | |
| 805 } // namespace mojo | |
| OLD | NEW |