| OLD | NEW | 
 | (Empty) | 
|    1 // Copyright 2014 The Chromium Authors. All rights reserved. |  | 
|    2 // Use of this source code is governed by a BSD-style license that can be |  | 
|    3 // found in the LICENSE file. |  | 
|    4  |  | 
|    5 #include "mojo/edk/system/raw_channel.h" |  | 
|    6  |  | 
|    7 #include <stdint.h> |  | 
|    8 #include <stdio.h> |  | 
|    9  |  | 
|   10 #include <vector> |  | 
|   11  |  | 
|   12 #include "base/bind.h" |  | 
|   13 #include "base/files/file_path.h" |  | 
|   14 #include "base/files/file_util.h" |  | 
|   15 #include "base/files/scoped_file.h" |  | 
|   16 #include "base/files/scoped_temp_dir.h" |  | 
|   17 #include "base/location.h" |  | 
|   18 #include "base/logging.h" |  | 
|   19 #include "base/macros.h" |  | 
|   20 #include "base/memory/scoped_ptr.h" |  | 
|   21 #include "base/memory/scoped_vector.h" |  | 
|   22 #include "base/rand_util.h" |  | 
|   23 #include "base/synchronization/lock.h" |  | 
|   24 #include "base/synchronization/waitable_event.h" |  | 
|   25 #include "base/test/test_io_thread.h" |  | 
|   26 #include "base/threading/platform_thread.h"  // For |Sleep()|. |  | 
|   27 #include "base/threading/simple_thread.h" |  | 
|   28 #include "base/time/time.h" |  | 
|   29 #include "build/build_config.h"  // TODO(vtl): Remove this. |  | 
|   30 #include "mojo/edk/embedder/platform_channel_pair.h" |  | 
|   31 #include "mojo/edk/embedder/platform_handle.h" |  | 
|   32 #include "mojo/edk/embedder/scoped_platform_handle.h" |  | 
|   33 #include "mojo/edk/system/message_in_transit.h" |  | 
|   34 #include "mojo/edk/system/test_utils.h" |  | 
|   35 #include "mojo/edk/system/transport_data.h" |  | 
|   36 #include "mojo/edk/test/test_utils.h" |  | 
|   37 #include "testing/gtest/include/gtest/gtest.h" |  | 
|   38  |  | 
|   39 namespace mojo { |  | 
|   40 namespace system { |  | 
|   41 namespace { |  | 
|   42  |  | 
|   43 scoped_ptr<MessageInTransit> MakeTestMessage(uint32_t num_bytes) { |  | 
|   44   std::vector<unsigned char> bytes(num_bytes, 0); |  | 
|   45   for (size_t i = 0; i < num_bytes; i++) |  | 
|   46     bytes[i] = static_cast<unsigned char>(i + num_bytes); |  | 
|   47   return make_scoped_ptr(new MessageInTransit( |  | 
|   48       MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, |  | 
|   49       num_bytes, bytes.empty() ? nullptr : &bytes[0])); |  | 
|   50 } |  | 
|   51  |  | 
|   52 bool CheckMessageData(const void* bytes, uint32_t num_bytes) { |  | 
|   53   const unsigned char* b = static_cast<const unsigned char*>(bytes); |  | 
|   54   for (uint32_t i = 0; i < num_bytes; i++) { |  | 
|   55     if (b[i] != static_cast<unsigned char>(i + num_bytes)) |  | 
|   56       return false; |  | 
|   57   } |  | 
|   58   return true; |  | 
|   59 } |  | 
|   60  |  | 
|   61 void InitOnIOThread(RawChannel* raw_channel, RawChannel::Delegate* delegate) { |  | 
|   62   raw_channel->Init(delegate); |  | 
|   63 } |  | 
|   64  |  | 
|   65 bool WriteTestMessageToHandle(const embedder::PlatformHandle& handle, |  | 
|   66                               uint32_t num_bytes) { |  | 
|   67   scoped_ptr<MessageInTransit> message(MakeTestMessage(num_bytes)); |  | 
|   68  |  | 
|   69   size_t write_size = 0; |  | 
|   70   mojo::test::BlockingWrite(handle, message->main_buffer(), |  | 
|   71                             message->main_buffer_size(), &write_size); |  | 
|   72   return write_size == message->main_buffer_size(); |  | 
|   73 } |  | 
|   74  |  | 
|   75 // ----------------------------------------------------------------------------- |  | 
|   76  |  | 
|   77 class RawChannelTest : public testing::Test { |  | 
|   78  public: |  | 
|   79   RawChannelTest() : io_thread_(base::TestIOThread::kManualStart) {} |  | 
|   80   ~RawChannelTest() override {} |  | 
|   81  |  | 
|   82   void SetUp() override { |  | 
|   83     embedder::PlatformChannelPair channel_pair; |  | 
|   84     handles[0] = channel_pair.PassServerHandle(); |  | 
|   85     handles[1] = channel_pair.PassClientHandle(); |  | 
|   86     io_thread_.Start(); |  | 
|   87   } |  | 
|   88  |  | 
|   89   void TearDown() override { |  | 
|   90     io_thread_.Stop(); |  | 
|   91     handles[0].reset(); |  | 
|   92     handles[1].reset(); |  | 
|   93   } |  | 
|   94  |  | 
|   95  protected: |  | 
|   96   base::TestIOThread* io_thread() { return &io_thread_; } |  | 
|   97  |  | 
|   98   embedder::ScopedPlatformHandle handles[2]; |  | 
|   99  |  | 
|  100  private: |  | 
|  101   base::TestIOThread io_thread_; |  | 
|  102  |  | 
|  103   DISALLOW_COPY_AND_ASSIGN(RawChannelTest); |  | 
|  104 }; |  | 
|  105  |  | 
|  106 // RawChannelTest.WriteMessage ------------------------------------------------- |  | 
|  107  |  | 
|  108 class WriteOnlyRawChannelDelegate : public RawChannel::Delegate { |  | 
|  109  public: |  | 
|  110   WriteOnlyRawChannelDelegate() {} |  | 
|  111   ~WriteOnlyRawChannelDelegate() override {} |  | 
|  112  |  | 
|  113   // |RawChannel::Delegate| implementation: |  | 
|  114   void OnReadMessage( |  | 
|  115       const MessageInTransit::View& /*message_view*/, |  | 
|  116       embedder::ScopedPlatformHandleVectorPtr /*platform_handles*/) override { |  | 
|  117     CHECK(false);  // Should not get called. |  | 
|  118   } |  | 
|  119   void OnError(Error error) override { |  | 
|  120     // We'll get a read (shutdown) error when the connection is closed. |  | 
|  121     CHECK_EQ(error, ERROR_READ_SHUTDOWN); |  | 
|  122   } |  | 
|  123  |  | 
|  124  private: |  | 
|  125   DISALLOW_COPY_AND_ASSIGN(WriteOnlyRawChannelDelegate); |  | 
|  126 }; |  | 
|  127  |  | 
|  128 static const int64_t kMessageReaderSleepMs = 1; |  | 
|  129 static const size_t kMessageReaderMaxPollIterations = 3000; |  | 
|  130  |  | 
|  131 class TestMessageReaderAndChecker { |  | 
|  132  public: |  | 
|  133   explicit TestMessageReaderAndChecker(embedder::PlatformHandle handle) |  | 
|  134       : handle_(handle) {} |  | 
|  135   ~TestMessageReaderAndChecker() { CHECK(bytes_.empty()); } |  | 
|  136  |  | 
|  137   bool ReadAndCheckNextMessage(uint32_t expected_size) { |  | 
|  138     unsigned char buffer[4096]; |  | 
|  139  |  | 
|  140     for (size_t i = 0; i < kMessageReaderMaxPollIterations;) { |  | 
|  141       size_t read_size = 0; |  | 
|  142       CHECK(mojo::test::NonBlockingRead(handle_, buffer, sizeof(buffer), |  | 
|  143                                         &read_size)); |  | 
|  144  |  | 
|  145       // Append newly-read data to |bytes_|. |  | 
|  146       bytes_.insert(bytes_.end(), buffer, buffer + read_size); |  | 
|  147  |  | 
|  148       // If we have the header.... |  | 
|  149       size_t message_size; |  | 
|  150       if (MessageInTransit::GetNextMessageSize( |  | 
|  151               bytes_.empty() ? nullptr : &bytes_[0], bytes_.size(), |  | 
|  152               &message_size)) { |  | 
|  153         // If we've read the whole message.... |  | 
|  154         if (bytes_.size() >= message_size) { |  | 
|  155           bool rv = true; |  | 
|  156           MessageInTransit::View message_view(message_size, &bytes_[0]); |  | 
|  157           CHECK_EQ(message_view.main_buffer_size(), message_size); |  | 
|  158  |  | 
|  159           if (message_view.num_bytes() != expected_size) { |  | 
|  160             LOG(ERROR) << "Wrong size: " << message_size << " instead of " |  | 
|  161                        << expected_size << " bytes."; |  | 
|  162             rv = false; |  | 
|  163           } else if (!CheckMessageData(message_view.bytes(), |  | 
|  164                                        message_view.num_bytes())) { |  | 
|  165             LOG(ERROR) << "Incorrect message bytes."; |  | 
|  166             rv = false; |  | 
|  167           } |  | 
|  168  |  | 
|  169           // Erase message data. |  | 
|  170           bytes_.erase(bytes_.begin(), |  | 
|  171                        bytes_.begin() + message_view.main_buffer_size()); |  | 
|  172           return rv; |  | 
|  173         } |  | 
|  174       } |  | 
|  175  |  | 
|  176       if (static_cast<size_t>(read_size) < sizeof(buffer)) { |  | 
|  177         i++; |  | 
|  178         base::PlatformThread::Sleep( |  | 
|  179             base::TimeDelta::FromMilliseconds(kMessageReaderSleepMs)); |  | 
|  180       } |  | 
|  181     } |  | 
|  182  |  | 
|  183     LOG(ERROR) << "Too many iterations."; |  | 
|  184     return false; |  | 
|  185   } |  | 
|  186  |  | 
|  187  private: |  | 
|  188   const embedder::PlatformHandle handle_; |  | 
|  189  |  | 
|  190   // The start of the received data should always be on a message boundary. |  | 
|  191   std::vector<unsigned char> bytes_; |  | 
|  192  |  | 
|  193   DISALLOW_COPY_AND_ASSIGN(TestMessageReaderAndChecker); |  | 
|  194 }; |  | 
|  195  |  | 
|  196 // Tests writing (and verifies reading using our own custom reader). |  | 
|  197 TEST_F(RawChannelTest, WriteMessage) { |  | 
|  198   WriteOnlyRawChannelDelegate delegate; |  | 
|  199   scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); |  | 
|  200   TestMessageReaderAndChecker checker(handles[1].get()); |  | 
|  201   io_thread()->PostTaskAndWait( |  | 
|  202       FROM_HERE, |  | 
|  203       base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); |  | 
|  204  |  | 
|  205   // Write and read, for a variety of sizes. |  | 
|  206   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) { |  | 
|  207     EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size))); |  | 
|  208     EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size; |  | 
|  209   } |  | 
|  210  |  | 
|  211   // Write/queue and read afterwards, for a variety of sizes. |  | 
|  212   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) |  | 
|  213     EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size))); |  | 
|  214   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) |  | 
|  215     EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size; |  | 
|  216  |  | 
|  217   io_thread()->PostTaskAndWait( |  | 
|  218       FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); |  | 
|  219 } |  | 
|  220  |  | 
|  221 // RawChannelTest.OnReadMessage ------------------------------------------------ |  | 
|  222  |  | 
|  223 class ReadCheckerRawChannelDelegate : public RawChannel::Delegate { |  | 
|  224  public: |  | 
|  225   ReadCheckerRawChannelDelegate() : done_event_(false, false), position_(0) {} |  | 
|  226   ~ReadCheckerRawChannelDelegate() override {} |  | 
|  227  |  | 
|  228   // |RawChannel::Delegate| implementation (called on the I/O thread): |  | 
|  229   void OnReadMessage( |  | 
|  230       const MessageInTransit::View& message_view, |  | 
|  231       embedder::ScopedPlatformHandleVectorPtr platform_handles) override { |  | 
|  232     EXPECT_FALSE(platform_handles); |  | 
|  233  |  | 
|  234     size_t position; |  | 
|  235     size_t expected_size; |  | 
|  236     bool should_signal = false; |  | 
|  237     { |  | 
|  238       base::AutoLock locker(lock_); |  | 
|  239       CHECK_LT(position_, expected_sizes_.size()); |  | 
|  240       position = position_; |  | 
|  241       expected_size = expected_sizes_[position]; |  | 
|  242       position_++; |  | 
|  243       if (position_ >= expected_sizes_.size()) |  | 
|  244         should_signal = true; |  | 
|  245     } |  | 
|  246  |  | 
|  247     EXPECT_EQ(expected_size, message_view.num_bytes()) << position; |  | 
|  248     if (message_view.num_bytes() == expected_size) { |  | 
|  249       EXPECT_TRUE( |  | 
|  250           CheckMessageData(message_view.bytes(), message_view.num_bytes())) |  | 
|  251           << position; |  | 
|  252     } |  | 
|  253  |  | 
|  254     if (should_signal) |  | 
|  255       done_event_.Signal(); |  | 
|  256   } |  | 
|  257   void OnError(Error error) override { |  | 
|  258     // We'll get a read (shutdown) error when the connection is closed. |  | 
|  259     CHECK_EQ(error, ERROR_READ_SHUTDOWN); |  | 
|  260   } |  | 
|  261  |  | 
|  262   // Waits for all the messages (of sizes |expected_sizes_|) to be seen. |  | 
|  263   void Wait() { done_event_.Wait(); } |  | 
|  264  |  | 
|  265   void SetExpectedSizes(const std::vector<uint32_t>& expected_sizes) { |  | 
|  266     base::AutoLock locker(lock_); |  | 
|  267     CHECK_EQ(position_, expected_sizes_.size()); |  | 
|  268     expected_sizes_ = expected_sizes; |  | 
|  269     position_ = 0; |  | 
|  270   } |  | 
|  271  |  | 
|  272  private: |  | 
|  273   base::WaitableEvent done_event_; |  | 
|  274  |  | 
|  275   base::Lock lock_;  // Protects the following members. |  | 
|  276   std::vector<uint32_t> expected_sizes_; |  | 
|  277   size_t position_; |  | 
|  278  |  | 
|  279   DISALLOW_COPY_AND_ASSIGN(ReadCheckerRawChannelDelegate); |  | 
|  280 }; |  | 
|  281  |  | 
|  282 // Tests reading (writing using our own custom writer). |  | 
|  283 TEST_F(RawChannelTest, OnReadMessage) { |  | 
|  284   ReadCheckerRawChannelDelegate delegate; |  | 
|  285   scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); |  | 
|  286   io_thread()->PostTaskAndWait( |  | 
|  287       FROM_HERE, |  | 
|  288       base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); |  | 
|  289  |  | 
|  290   // Write and read, for a variety of sizes. |  | 
|  291   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) { |  | 
|  292     delegate.SetExpectedSizes(std::vector<uint32_t>(1, size)); |  | 
|  293  |  | 
|  294     EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), size)); |  | 
|  295  |  | 
|  296     delegate.Wait(); |  | 
|  297   } |  | 
|  298  |  | 
|  299   // Set up reader and write as fast as we can. |  | 
|  300   // Write/queue and read afterwards, for a variety of sizes. |  | 
|  301   std::vector<uint32_t> expected_sizes; |  | 
|  302   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) |  | 
|  303     expected_sizes.push_back(size); |  | 
|  304   delegate.SetExpectedSizes(expected_sizes); |  | 
|  305   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) |  | 
|  306     EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), size)); |  | 
|  307   delegate.Wait(); |  | 
|  308  |  | 
|  309   io_thread()->PostTaskAndWait( |  | 
|  310       FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); |  | 
|  311 } |  | 
|  312  |  | 
|  313 // RawChannelTest.WriteMessageAndOnReadMessage --------------------------------- |  | 
|  314  |  | 
|  315 class RawChannelWriterThread : public base::SimpleThread { |  | 
|  316  public: |  | 
|  317   RawChannelWriterThread(RawChannel* raw_channel, size_t write_count) |  | 
|  318       : base::SimpleThread("raw_channel_writer_thread"), |  | 
|  319         raw_channel_(raw_channel), |  | 
|  320         left_to_write_(write_count) {} |  | 
|  321  |  | 
|  322   ~RawChannelWriterThread() override { Join(); } |  | 
|  323  |  | 
|  324  private: |  | 
|  325   void Run() override { |  | 
|  326     static const int kMaxRandomMessageSize = 25000; |  | 
|  327  |  | 
|  328     while (left_to_write_-- > 0) { |  | 
|  329       EXPECT_TRUE(raw_channel_->WriteMessage(MakeTestMessage( |  | 
|  330           static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize))))); |  | 
|  331     } |  | 
|  332   } |  | 
|  333  |  | 
|  334   RawChannel* const raw_channel_; |  | 
|  335   size_t left_to_write_; |  | 
|  336  |  | 
|  337   DISALLOW_COPY_AND_ASSIGN(RawChannelWriterThread); |  | 
|  338 }; |  | 
|  339  |  | 
|  340 class ReadCountdownRawChannelDelegate : public RawChannel::Delegate { |  | 
|  341  public: |  | 
|  342   explicit ReadCountdownRawChannelDelegate(size_t expected_count) |  | 
|  343       : done_event_(false, false), expected_count_(expected_count), count_(0) {} |  | 
|  344   ~ReadCountdownRawChannelDelegate() override {} |  | 
|  345  |  | 
|  346   // |RawChannel::Delegate| implementation (called on the I/O thread): |  | 
|  347   void OnReadMessage( |  | 
|  348       const MessageInTransit::View& message_view, |  | 
|  349       embedder::ScopedPlatformHandleVectorPtr platform_handles) override { |  | 
|  350     EXPECT_FALSE(platform_handles); |  | 
|  351  |  | 
|  352     EXPECT_LT(count_, expected_count_); |  | 
|  353     count_++; |  | 
|  354  |  | 
|  355     EXPECT_TRUE( |  | 
|  356         CheckMessageData(message_view.bytes(), message_view.num_bytes())); |  | 
|  357  |  | 
|  358     if (count_ >= expected_count_) |  | 
|  359       done_event_.Signal(); |  | 
|  360   } |  | 
|  361   void OnError(Error error) override { |  | 
|  362     // We'll get a read (shutdown) error when the connection is closed. |  | 
|  363     CHECK_EQ(error, ERROR_READ_SHUTDOWN); |  | 
|  364   } |  | 
|  365  |  | 
|  366   // Waits for all the messages to have been seen. |  | 
|  367   void Wait() { done_event_.Wait(); } |  | 
|  368  |  | 
|  369  private: |  | 
|  370   base::WaitableEvent done_event_; |  | 
|  371   size_t expected_count_; |  | 
|  372   size_t count_; |  | 
|  373  |  | 
|  374   DISALLOW_COPY_AND_ASSIGN(ReadCountdownRawChannelDelegate); |  | 
|  375 }; |  | 
|  376  |  | 
|  377 TEST_F(RawChannelTest, WriteMessageAndOnReadMessage) { |  | 
|  378   static const size_t kNumWriterThreads = 10; |  | 
|  379   static const size_t kNumWriteMessagesPerThread = 4000; |  | 
|  380  |  | 
|  381   WriteOnlyRawChannelDelegate writer_delegate; |  | 
|  382   scoped_ptr<RawChannel> writer_rc(RawChannel::Create(handles[0].Pass())); |  | 
|  383   io_thread()->PostTaskAndWait(FROM_HERE, |  | 
|  384                                base::Bind(&InitOnIOThread, writer_rc.get(), |  | 
|  385                                           base::Unretained(&writer_delegate))); |  | 
|  386  |  | 
|  387   ReadCountdownRawChannelDelegate reader_delegate(kNumWriterThreads * |  | 
|  388                                                   kNumWriteMessagesPerThread); |  | 
|  389   scoped_ptr<RawChannel> reader_rc(RawChannel::Create(handles[1].Pass())); |  | 
|  390   io_thread()->PostTaskAndWait(FROM_HERE, |  | 
|  391                                base::Bind(&InitOnIOThread, reader_rc.get(), |  | 
|  392                                           base::Unretained(&reader_delegate))); |  | 
|  393  |  | 
|  394   { |  | 
|  395     ScopedVector<RawChannelWriterThread> writer_threads; |  | 
|  396     for (size_t i = 0; i < kNumWriterThreads; i++) { |  | 
|  397       writer_threads.push_back(new RawChannelWriterThread( |  | 
|  398           writer_rc.get(), kNumWriteMessagesPerThread)); |  | 
|  399     } |  | 
|  400     for (size_t i = 0; i < writer_threads.size(); i++) |  | 
|  401       writer_threads[i]->Start(); |  | 
|  402   }  // Joins all the writer threads. |  | 
|  403  |  | 
|  404   // Sleep a bit, to let any extraneous reads be processed. (There shouldn't be |  | 
|  405   // any, but we want to know about them.) |  | 
|  406   base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100)); |  | 
|  407  |  | 
|  408   // Wait for reading to finish. |  | 
|  409   reader_delegate.Wait(); |  | 
|  410  |  | 
|  411   io_thread()->PostTaskAndWait( |  | 
|  412       FROM_HERE, |  | 
|  413       base::Bind(&RawChannel::Shutdown, base::Unretained(reader_rc.get()))); |  | 
|  414  |  | 
|  415   io_thread()->PostTaskAndWait( |  | 
|  416       FROM_HERE, |  | 
|  417       base::Bind(&RawChannel::Shutdown, base::Unretained(writer_rc.get()))); |  | 
|  418 } |  | 
|  419  |  | 
|  420 // RawChannelTest.OnError ------------------------------------------------------ |  | 
|  421  |  | 
|  422 class ErrorRecordingRawChannelDelegate |  | 
|  423     : public ReadCountdownRawChannelDelegate { |  | 
|  424  public: |  | 
|  425   ErrorRecordingRawChannelDelegate(size_t expected_read_count, |  | 
|  426                                    bool expect_read_error, |  | 
|  427                                    bool expect_write_error) |  | 
|  428       : ReadCountdownRawChannelDelegate(expected_read_count), |  | 
|  429         got_read_error_event_(false, false), |  | 
|  430         got_write_error_event_(false, false), |  | 
|  431         expecting_read_error_(expect_read_error), |  | 
|  432         expecting_write_error_(expect_write_error) {} |  | 
|  433  |  | 
|  434   ~ErrorRecordingRawChannelDelegate() override {} |  | 
|  435  |  | 
|  436   void OnError(Error error) override { |  | 
|  437     switch (error) { |  | 
|  438       case ERROR_READ_SHUTDOWN: |  | 
|  439         ASSERT_TRUE(expecting_read_error_); |  | 
|  440         expecting_read_error_ = false; |  | 
|  441         got_read_error_event_.Signal(); |  | 
|  442         break; |  | 
|  443       case ERROR_READ_BROKEN: |  | 
|  444         // TODO(vtl): Test broken connections. |  | 
|  445         CHECK(false); |  | 
|  446         break; |  | 
|  447       case ERROR_READ_BAD_MESSAGE: |  | 
|  448         // TODO(vtl): Test reception/detection of bad messages. |  | 
|  449         CHECK(false); |  | 
|  450         break; |  | 
|  451       case ERROR_READ_UNKNOWN: |  | 
|  452         // TODO(vtl): Test however it is we might get here. |  | 
|  453         CHECK(false); |  | 
|  454         break; |  | 
|  455       case ERROR_WRITE: |  | 
|  456         ASSERT_TRUE(expecting_write_error_); |  | 
|  457         expecting_write_error_ = false; |  | 
|  458         got_write_error_event_.Signal(); |  | 
|  459         break; |  | 
|  460     } |  | 
|  461   } |  | 
|  462  |  | 
|  463   void WaitForReadError() { got_read_error_event_.Wait(); } |  | 
|  464   void WaitForWriteError() { got_write_error_event_.Wait(); } |  | 
|  465  |  | 
|  466  private: |  | 
|  467   base::WaitableEvent got_read_error_event_; |  | 
|  468   base::WaitableEvent got_write_error_event_; |  | 
|  469  |  | 
|  470   bool expecting_read_error_; |  | 
|  471   bool expecting_write_error_; |  | 
|  472  |  | 
|  473   DISALLOW_COPY_AND_ASSIGN(ErrorRecordingRawChannelDelegate); |  | 
|  474 }; |  | 
|  475  |  | 
|  476 // Tests (fatal) errors. |  | 
|  477 TEST_F(RawChannelTest, OnError) { |  | 
|  478   ErrorRecordingRawChannelDelegate delegate(0, true, true); |  | 
|  479   scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); |  | 
|  480   io_thread()->PostTaskAndWait( |  | 
|  481       FROM_HERE, |  | 
|  482       base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); |  | 
|  483  |  | 
|  484   // Close the handle of the other end, which should make writing fail. |  | 
|  485   handles[1].reset(); |  | 
|  486  |  | 
|  487   EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); |  | 
|  488  |  | 
|  489   // We should get a write error. |  | 
|  490   delegate.WaitForWriteError(); |  | 
|  491  |  | 
|  492   // We should also get a read error. |  | 
|  493   delegate.WaitForReadError(); |  | 
|  494  |  | 
|  495   EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(2))); |  | 
|  496  |  | 
|  497   // Sleep a bit, to make sure we don't get another |OnError()| |  | 
|  498   // notification. (If we actually get another one, |OnError()| crashes.) |  | 
|  499   base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20)); |  | 
|  500  |  | 
|  501   io_thread()->PostTaskAndWait( |  | 
|  502       FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); |  | 
|  503 } |  | 
|  504  |  | 
|  505 // RawChannelTest.ReadUnaffectedByWriteError ----------------------------------- |  | 
|  506  |  | 
|  507 TEST_F(RawChannelTest, ReadUnaffectedByWriteError) { |  | 
|  508   const size_t kMessageCount = 5; |  | 
|  509  |  | 
|  510   // Write a few messages into the other end. |  | 
|  511   uint32_t message_size = 1; |  | 
|  512   for (size_t i = 0; i < kMessageCount; |  | 
|  513        i++, message_size += message_size / 2 + 1) |  | 
|  514     EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), message_size)); |  | 
|  515  |  | 
|  516   // Close the other end, which should make writing fail. |  | 
|  517   handles[1].reset(); |  | 
|  518  |  | 
|  519   // Only start up reading here. The system buffer should still contain the |  | 
|  520   // messages that were written. |  | 
|  521   ErrorRecordingRawChannelDelegate delegate(kMessageCount, true, true); |  | 
|  522   scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); |  | 
|  523   io_thread()->PostTaskAndWait( |  | 
|  524       FROM_HERE, |  | 
|  525       base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); |  | 
|  526  |  | 
|  527   EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); |  | 
|  528  |  | 
|  529   // We should definitely get a write error. |  | 
|  530   delegate.WaitForWriteError(); |  | 
|  531  |  | 
|  532   // Wait for reading to finish. A writing failure shouldn't affect reading. |  | 
|  533   delegate.Wait(); |  | 
|  534  |  | 
|  535   // And then we should get a read error. |  | 
|  536   delegate.WaitForReadError(); |  | 
|  537  |  | 
|  538   io_thread()->PostTaskAndWait( |  | 
|  539       FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); |  | 
|  540 } |  | 
|  541  |  | 
|  542 // RawChannelTest.WriteMessageAfterShutdown ------------------------------------ |  | 
|  543  |  | 
|  544 // Makes sure that calling |WriteMessage()| after |Shutdown()| behaves |  | 
|  545 // correctly. |  | 
|  546 TEST_F(RawChannelTest, WriteMessageAfterShutdown) { |  | 
|  547   WriteOnlyRawChannelDelegate delegate; |  | 
|  548   scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); |  | 
|  549   io_thread()->PostTaskAndWait( |  | 
|  550       FROM_HERE, |  | 
|  551       base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); |  | 
|  552   io_thread()->PostTaskAndWait( |  | 
|  553       FROM_HERE, base::Bind(&RawChannel::Shutdown, base::Unretained(rc.get()))); |  | 
|  554  |  | 
|  555   EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); |  | 
|  556 } |  | 
|  557  |  | 
|  558 // RawChannelTest.ShutdownOnReadMessage ---------------------------------------- |  | 
|  559  |  | 
|  560 class ShutdownOnReadMessageRawChannelDelegate : public RawChannel::Delegate { |  | 
|  561  public: |  | 
|  562   explicit ShutdownOnReadMessageRawChannelDelegate(RawChannel* raw_channel) |  | 
|  563       : raw_channel_(raw_channel), |  | 
|  564         done_event_(false, false), |  | 
|  565         did_shutdown_(false) {} |  | 
|  566   ~ShutdownOnReadMessageRawChannelDelegate() override {} |  | 
|  567  |  | 
|  568   // |RawChannel::Delegate| implementation (called on the I/O thread): |  | 
|  569   void OnReadMessage( |  | 
|  570       const MessageInTransit::View& message_view, |  | 
|  571       embedder::ScopedPlatformHandleVectorPtr platform_handles) override { |  | 
|  572     EXPECT_FALSE(platform_handles); |  | 
|  573     EXPECT_FALSE(did_shutdown_); |  | 
|  574     EXPECT_TRUE( |  | 
|  575         CheckMessageData(message_view.bytes(), message_view.num_bytes())); |  | 
|  576     raw_channel_->Shutdown(); |  | 
|  577     did_shutdown_ = true; |  | 
|  578     done_event_.Signal(); |  | 
|  579   } |  | 
|  580   void OnError(Error /*error*/) override { |  | 
|  581     CHECK(false);  // Should not get called. |  | 
|  582   } |  | 
|  583  |  | 
|  584   // Waits for shutdown. |  | 
|  585   void Wait() { |  | 
|  586     done_event_.Wait(); |  | 
|  587     EXPECT_TRUE(did_shutdown_); |  | 
|  588   } |  | 
|  589  |  | 
|  590  private: |  | 
|  591   RawChannel* const raw_channel_; |  | 
|  592   base::WaitableEvent done_event_; |  | 
|  593   bool did_shutdown_; |  | 
|  594  |  | 
|  595   DISALLOW_COPY_AND_ASSIGN(ShutdownOnReadMessageRawChannelDelegate); |  | 
|  596 }; |  | 
|  597  |  | 
|  598 TEST_F(RawChannelTest, ShutdownOnReadMessage) { |  | 
|  599   // Write a few messages into the other end. |  | 
|  600   for (size_t count = 0; count < 5; count++) |  | 
|  601     EXPECT_TRUE(WriteTestMessageToHandle(handles[1].get(), 10)); |  | 
|  602  |  | 
|  603   scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); |  | 
|  604   ShutdownOnReadMessageRawChannelDelegate delegate(rc.get()); |  | 
|  605   io_thread()->PostTaskAndWait( |  | 
|  606       FROM_HERE, |  | 
|  607       base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); |  | 
|  608  |  | 
|  609   // Wait for the delegate, which will shut the |RawChannel| down. |  | 
|  610   delegate.Wait(); |  | 
|  611 } |  | 
|  612  |  | 
|  613 // RawChannelTest.ShutdownOnError{Read, Write} --------------------------------- |  | 
|  614  |  | 
|  615 class ShutdownOnErrorRawChannelDelegate : public RawChannel::Delegate { |  | 
|  616  public: |  | 
|  617   ShutdownOnErrorRawChannelDelegate(RawChannel* raw_channel, |  | 
|  618                                     Error shutdown_on_error_type) |  | 
|  619       : raw_channel_(raw_channel), |  | 
|  620         shutdown_on_error_type_(shutdown_on_error_type), |  | 
|  621         done_event_(false, false), |  | 
|  622         did_shutdown_(false) {} |  | 
|  623   ~ShutdownOnErrorRawChannelDelegate() override {} |  | 
|  624  |  | 
|  625   // |RawChannel::Delegate| implementation (called on the I/O thread): |  | 
|  626   void OnReadMessage( |  | 
|  627       const MessageInTransit::View& /*message_view*/, |  | 
|  628       embedder::ScopedPlatformHandleVectorPtr /*platform_handles*/) override { |  | 
|  629     CHECK(false);  // Should not get called. |  | 
|  630   } |  | 
|  631   void OnError(Error error) override { |  | 
|  632     EXPECT_FALSE(did_shutdown_); |  | 
|  633     if (error != shutdown_on_error_type_) |  | 
|  634       return; |  | 
|  635     raw_channel_->Shutdown(); |  | 
|  636     did_shutdown_ = true; |  | 
|  637     done_event_.Signal(); |  | 
|  638   } |  | 
|  639  |  | 
|  640   // Waits for shutdown. |  | 
|  641   void Wait() { |  | 
|  642     done_event_.Wait(); |  | 
|  643     EXPECT_TRUE(did_shutdown_); |  | 
|  644   } |  | 
|  645  |  | 
|  646  private: |  | 
|  647   RawChannel* const raw_channel_; |  | 
|  648   const Error shutdown_on_error_type_; |  | 
|  649   base::WaitableEvent done_event_; |  | 
|  650   bool did_shutdown_; |  | 
|  651  |  | 
|  652   DISALLOW_COPY_AND_ASSIGN(ShutdownOnErrorRawChannelDelegate); |  | 
|  653 }; |  | 
|  654  |  | 
|  655 TEST_F(RawChannelTest, ShutdownOnErrorRead) { |  | 
|  656   scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); |  | 
|  657   ShutdownOnErrorRawChannelDelegate delegate( |  | 
|  658       rc.get(), RawChannel::Delegate::ERROR_READ_SHUTDOWN); |  | 
|  659   io_thread()->PostTaskAndWait( |  | 
|  660       FROM_HERE, |  | 
|  661       base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); |  | 
|  662  |  | 
|  663   // Close the handle of the other end, which should stuff fail. |  | 
|  664   handles[1].reset(); |  | 
|  665  |  | 
|  666   // Wait for the delegate, which will shut the |RawChannel| down. |  | 
|  667   delegate.Wait(); |  | 
|  668 } |  | 
|  669  |  | 
|  670 TEST_F(RawChannelTest, ShutdownOnErrorWrite) { |  | 
|  671   scoped_ptr<RawChannel> rc(RawChannel::Create(handles[0].Pass())); |  | 
|  672   ShutdownOnErrorRawChannelDelegate delegate(rc.get(), |  | 
|  673                                              RawChannel::Delegate::ERROR_WRITE); |  | 
|  674   io_thread()->PostTaskAndWait( |  | 
|  675       FROM_HERE, |  | 
|  676       base::Bind(&InitOnIOThread, rc.get(), base::Unretained(&delegate))); |  | 
|  677  |  | 
|  678   // Close the handle of the other end, which should stuff fail. |  | 
|  679   handles[1].reset(); |  | 
|  680  |  | 
|  681   EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1))); |  | 
|  682  |  | 
|  683   // Wait for the delegate, which will shut the |RawChannel| down. |  | 
|  684   delegate.Wait(); |  | 
|  685 } |  | 
|  686  |  | 
|  687 // RawChannelTest.ReadWritePlatformHandles ------------------------------------- |  | 
|  688  |  | 
|  689 class ReadPlatformHandlesCheckerRawChannelDelegate |  | 
|  690     : public RawChannel::Delegate { |  | 
|  691  public: |  | 
|  692   ReadPlatformHandlesCheckerRawChannelDelegate() : done_event_(false, false) {} |  | 
|  693   ~ReadPlatformHandlesCheckerRawChannelDelegate() override {} |  | 
|  694  |  | 
|  695   // |RawChannel::Delegate| implementation (called on the I/O thread): |  | 
|  696   void OnReadMessage( |  | 
|  697       const MessageInTransit::View& message_view, |  | 
|  698       embedder::ScopedPlatformHandleVectorPtr platform_handles) override { |  | 
|  699     const char kHello[] = "hello"; |  | 
|  700  |  | 
|  701     EXPECT_EQ(sizeof(kHello), message_view.num_bytes()); |  | 
|  702     EXPECT_STREQ(kHello, static_cast<const char*>(message_view.bytes())); |  | 
|  703  |  | 
|  704     ASSERT_TRUE(platform_handles); |  | 
|  705     ASSERT_EQ(2u, platform_handles->size()); |  | 
|  706     embedder::ScopedPlatformHandle h1(platform_handles->at(0)); |  | 
|  707     EXPECT_TRUE(h1.is_valid()); |  | 
|  708     embedder::ScopedPlatformHandle h2(platform_handles->at(1)); |  | 
|  709     EXPECT_TRUE(h2.is_valid()); |  | 
|  710     platform_handles->clear(); |  | 
|  711  |  | 
|  712     { |  | 
|  713       char buffer[100] = {}; |  | 
|  714  |  | 
|  715       base::ScopedFILE fp(mojo::test::FILEFromPlatformHandle(h1.Pass(), "rb")); |  | 
|  716       EXPECT_TRUE(fp); |  | 
|  717       rewind(fp.get()); |  | 
|  718       EXPECT_EQ(1u, fread(buffer, 1, sizeof(buffer), fp.get())); |  | 
|  719       EXPECT_EQ('1', buffer[0]); |  | 
|  720     } |  | 
|  721  |  | 
|  722     { |  | 
|  723       char buffer[100] = {}; |  | 
|  724       base::ScopedFILE fp(mojo::test::FILEFromPlatformHandle(h2.Pass(), "rb")); |  | 
|  725       EXPECT_TRUE(fp); |  | 
|  726       rewind(fp.get()); |  | 
|  727       EXPECT_EQ(1u, fread(buffer, 1, sizeof(buffer), fp.get())); |  | 
|  728       EXPECT_EQ('2', buffer[0]); |  | 
|  729     } |  | 
|  730  |  | 
|  731     done_event_.Signal(); |  | 
|  732   } |  | 
|  733   void OnError(Error error) override { |  | 
|  734     // We'll get a read (shutdown) error when the connection is closed. |  | 
|  735     CHECK_EQ(error, ERROR_READ_SHUTDOWN); |  | 
|  736   } |  | 
|  737  |  | 
|  738   void Wait() { done_event_.Wait(); } |  | 
|  739  |  | 
|  740  private: |  | 
|  741   base::WaitableEvent done_event_; |  | 
|  742  |  | 
|  743   DISALLOW_COPY_AND_ASSIGN(ReadPlatformHandlesCheckerRawChannelDelegate); |  | 
|  744 }; |  | 
|  745  |  | 
|  746 #if defined(OS_POSIX) |  | 
|  747 #define MAYBE_ReadWritePlatformHandles ReadWritePlatformHandles |  | 
|  748 #else |  | 
|  749 // Not yet implemented (on Windows). |  | 
|  750 #define MAYBE_ReadWritePlatformHandles DISABLED_ReadWritePlatformHandles |  | 
|  751 #endif |  | 
|  752 TEST_F(RawChannelTest, MAYBE_ReadWritePlatformHandles) { |  | 
|  753   base::ScopedTempDir temp_dir; |  | 
|  754   ASSERT_TRUE(temp_dir.CreateUniqueTempDir()); |  | 
|  755  |  | 
|  756   WriteOnlyRawChannelDelegate write_delegate; |  | 
|  757   scoped_ptr<RawChannel> rc_write(RawChannel::Create(handles[0].Pass())); |  | 
|  758   io_thread()->PostTaskAndWait(FROM_HERE, |  | 
|  759                                base::Bind(&InitOnIOThread, rc_write.get(), |  | 
|  760                                           base::Unretained(&write_delegate))); |  | 
|  761  |  | 
|  762   ReadPlatformHandlesCheckerRawChannelDelegate read_delegate; |  | 
|  763   scoped_ptr<RawChannel> rc_read(RawChannel::Create(handles[1].Pass())); |  | 
|  764   io_thread()->PostTaskAndWait(FROM_HERE, |  | 
|  765                                base::Bind(&InitOnIOThread, rc_read.get(), |  | 
|  766                                           base::Unretained(&read_delegate))); |  | 
|  767  |  | 
|  768   base::FilePath unused; |  | 
|  769   base::ScopedFILE fp1( |  | 
|  770       base::CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused)); |  | 
|  771   EXPECT_EQ(1u, fwrite("1", 1, 1, fp1.get())); |  | 
|  772   base::ScopedFILE fp2( |  | 
|  773       base::CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused)); |  | 
|  774   EXPECT_EQ(1u, fwrite("2", 1, 1, fp2.get())); |  | 
|  775  |  | 
|  776   { |  | 
|  777     const char kHello[] = "hello"; |  | 
|  778     embedder::ScopedPlatformHandleVectorPtr platform_handles( |  | 
|  779         new embedder::PlatformHandleVector()); |  | 
|  780     platform_handles->push_back( |  | 
|  781         mojo::test::PlatformHandleFromFILE(fp1.Pass()).release()); |  | 
|  782     platform_handles->push_back( |  | 
|  783         mojo::test::PlatformHandleFromFILE(fp2.Pass()).release()); |  | 
|  784  |  | 
|  785     scoped_ptr<MessageInTransit> message(new MessageInTransit( |  | 
|  786         MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, |  | 
|  787         sizeof(kHello), kHello)); |  | 
|  788     message->SetTransportData( |  | 
|  789         make_scoped_ptr(new TransportData(platform_handles.Pass()))); |  | 
|  790     EXPECT_TRUE(rc_write->WriteMessage(message.Pass())); |  | 
|  791   } |  | 
|  792  |  | 
|  793   read_delegate.Wait(); |  | 
|  794  |  | 
|  795   io_thread()->PostTaskAndWait( |  | 
|  796       FROM_HERE, |  | 
|  797       base::Bind(&RawChannel::Shutdown, base::Unretained(rc_read.get()))); |  | 
|  798   io_thread()->PostTaskAndWait( |  | 
|  799       FROM_HERE, |  | 
|  800       base::Bind(&RawChannel::Shutdown, base::Unretained(rc_write.get()))); |  | 
|  801 } |  | 
|  802  |  | 
|  803 }  // namespace |  | 
|  804 }  // namespace system |  | 
|  805 }  // namespace mojo |  | 
| OLD | NEW |