| OLD | NEW |
| (Empty) |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include <stdlib.h> | |
| 6 #include <string.h> | |
| 7 | |
| 8 #include <string> | |
| 9 | |
| 10 #include "gtest/gtest.h" | |
| 11 #include "mojo/public/cpp/bindings/lib/connector.h" | |
| 12 #include "mojo/public/cpp/bindings/lib/message_builder.h" | |
| 13 #include "mojo/public/cpp/bindings/tests/message_queue.h" | |
| 14 #include "mojo/public/cpp/environment/logging.h" | |
| 15 #include "mojo/public/cpp/system/macros.h" | |
| 16 #include "mojo/public/cpp/utility/run_loop.h" | |
| 17 | |
| 18 namespace mojo { | |
| 19 namespace test { | |
| 20 namespace { | |
| 21 | |
| 22 class ConnectorTest : public testing::Test { | |
| 23 public: | |
| 24 ConnectorTest() {} | |
| 25 | |
| 26 void SetUp() override { | |
| 27 CreateMessagePipe(nullptr, &handle0_, &handle1_); | |
| 28 } | |
| 29 | |
| 30 void TearDown() override {} | |
| 31 | |
| 32 void AllocMessage(const char* text, Message* message) { | |
| 33 size_t payload_size = strlen(text) + 1; // Plus null terminator. | |
| 34 MessageBuilder builder(1, payload_size); | |
| 35 memcpy(builder.buffer()->Allocate(payload_size), text, payload_size); | |
| 36 | |
| 37 builder.message()->MoveTo(message); | |
| 38 } | |
| 39 | |
| 40 void PumpMessages() { loop_.RunUntilIdle(); } | |
| 41 | |
| 42 protected: | |
| 43 ScopedMessagePipeHandle handle0_; | |
| 44 ScopedMessagePipeHandle handle1_; | |
| 45 | |
| 46 private: | |
| 47 RunLoop loop_; | |
| 48 | |
| 49 MOJO_DISALLOW_COPY_AND_ASSIGN(ConnectorTest); | |
| 50 }; | |
| 51 | |
| 52 class MessageAccumulator : public MessageReceiver { | |
| 53 public: | |
| 54 MessageAccumulator() {} | |
| 55 | |
| 56 bool Accept(Message* message) override { | |
| 57 queue_.Push(message); | |
| 58 return true; | |
| 59 } | |
| 60 | |
| 61 bool IsEmpty() const { return queue_.IsEmpty(); } | |
| 62 | |
| 63 void Pop(Message* message) { queue_.Pop(message); } | |
| 64 | |
| 65 private: | |
| 66 MessageQueue queue_; | |
| 67 | |
| 68 MOJO_DISALLOW_COPY_AND_ASSIGN(MessageAccumulator); | |
| 69 }; | |
| 70 | |
| 71 TEST_F(ConnectorTest, Basic) { | |
| 72 internal::Connector connector0(handle0_.Pass()); | |
| 73 internal::Connector connector1(handle1_.Pass()); | |
| 74 | |
| 75 const char kText[] = "hello world"; | |
| 76 | |
| 77 Message message; | |
| 78 AllocMessage(kText, &message); | |
| 79 | |
| 80 connector0.Accept(&message); | |
| 81 | |
| 82 MessageAccumulator accumulator; | |
| 83 connector1.set_incoming_receiver(&accumulator); | |
| 84 | |
| 85 PumpMessages(); | |
| 86 | |
| 87 ASSERT_FALSE(accumulator.IsEmpty()); | |
| 88 | |
| 89 Message message_received; | |
| 90 accumulator.Pop(&message_received); | |
| 91 | |
| 92 EXPECT_EQ( | |
| 93 std::string(kText), | |
| 94 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
| 95 } | |
| 96 | |
| 97 TEST_F(ConnectorTest, Basic_Synchronous) { | |
| 98 internal::Connector connector0(handle0_.Pass()); | |
| 99 internal::Connector connector1(handle1_.Pass()); | |
| 100 | |
| 101 const char kText[] = "hello world"; | |
| 102 | |
| 103 Message message; | |
| 104 AllocMessage(kText, &message); | |
| 105 | |
| 106 connector0.Accept(&message); | |
| 107 | |
| 108 MessageAccumulator accumulator; | |
| 109 connector1.set_incoming_receiver(&accumulator); | |
| 110 | |
| 111 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); | |
| 112 | |
| 113 ASSERT_FALSE(accumulator.IsEmpty()); | |
| 114 | |
| 115 Message message_received; | |
| 116 accumulator.Pop(&message_received); | |
| 117 | |
| 118 EXPECT_EQ( | |
| 119 std::string(kText), | |
| 120 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
| 121 } | |
| 122 | |
| 123 TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) { | |
| 124 internal::Connector connector0(handle0_.Pass()); | |
| 125 internal::Connector connector1(handle1_.Pass()); | |
| 126 | |
| 127 MessageAccumulator accumulator; | |
| 128 connector1.set_incoming_receiver(&accumulator); | |
| 129 | |
| 130 const char kText[] = "hello world"; | |
| 131 | |
| 132 Message message; | |
| 133 AllocMessage(kText, &message); | |
| 134 | |
| 135 connector0.Accept(&message); | |
| 136 | |
| 137 PumpMessages(); | |
| 138 | |
| 139 ASSERT_FALSE(accumulator.IsEmpty()); | |
| 140 | |
| 141 Message message_received; | |
| 142 accumulator.Pop(&message_received); | |
| 143 | |
| 144 EXPECT_EQ( | |
| 145 std::string(kText), | |
| 146 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
| 147 } | |
| 148 | |
| 149 TEST_F(ConnectorTest, Basic_TwoMessages) { | |
| 150 internal::Connector connector0(handle0_.Pass()); | |
| 151 internal::Connector connector1(handle1_.Pass()); | |
| 152 | |
| 153 const char* kText[] = {"hello", "world"}; | |
| 154 | |
| 155 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { | |
| 156 Message message; | |
| 157 AllocMessage(kText[i], &message); | |
| 158 | |
| 159 connector0.Accept(&message); | |
| 160 } | |
| 161 | |
| 162 MessageAccumulator accumulator; | |
| 163 connector1.set_incoming_receiver(&accumulator); | |
| 164 | |
| 165 PumpMessages(); | |
| 166 | |
| 167 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { | |
| 168 ASSERT_FALSE(accumulator.IsEmpty()); | |
| 169 | |
| 170 Message message_received; | |
| 171 accumulator.Pop(&message_received); | |
| 172 | |
| 173 EXPECT_EQ( | |
| 174 std::string(kText[i]), | |
| 175 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
| 176 } | |
| 177 } | |
| 178 | |
| 179 TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) { | |
| 180 internal::Connector connector0(handle0_.Pass()); | |
| 181 internal::Connector connector1(handle1_.Pass()); | |
| 182 | |
| 183 const char* kText[] = {"hello", "world"}; | |
| 184 | |
| 185 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { | |
| 186 Message message; | |
| 187 AllocMessage(kText[i], &message); | |
| 188 | |
| 189 connector0.Accept(&message); | |
| 190 } | |
| 191 | |
| 192 MessageAccumulator accumulator; | |
| 193 connector1.set_incoming_receiver(&accumulator); | |
| 194 | |
| 195 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); | |
| 196 | |
| 197 ASSERT_FALSE(accumulator.IsEmpty()); | |
| 198 | |
| 199 Message message_received; | |
| 200 accumulator.Pop(&message_received); | |
| 201 | |
| 202 EXPECT_EQ( | |
| 203 std::string(kText[0]), | |
| 204 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
| 205 | |
| 206 ASSERT_TRUE(accumulator.IsEmpty()); | |
| 207 } | |
| 208 | |
| 209 TEST_F(ConnectorTest, WriteToClosedPipe) { | |
| 210 internal::Connector connector0(handle0_.Pass()); | |
| 211 | |
| 212 const char kText[] = "hello world"; | |
| 213 | |
| 214 Message message; | |
| 215 AllocMessage(kText, &message); | |
| 216 | |
| 217 // Close the other end of the pipe. | |
| 218 handle1_.reset(); | |
| 219 | |
| 220 // Not observed yet because we haven't spun the RunLoop yet. | |
| 221 EXPECT_FALSE(connector0.encountered_error()); | |
| 222 | |
| 223 // Write failures are not reported. | |
| 224 bool ok = connector0.Accept(&message); | |
| 225 EXPECT_TRUE(ok); | |
| 226 | |
| 227 // Still not observed. | |
| 228 EXPECT_FALSE(connector0.encountered_error()); | |
| 229 | |
| 230 // Spin the RunLoop, and then we should start observing the closed pipe. | |
| 231 PumpMessages(); | |
| 232 | |
| 233 EXPECT_TRUE(connector0.encountered_error()); | |
| 234 } | |
| 235 | |
| 236 TEST_F(ConnectorTest, MessageWithHandles) { | |
| 237 internal::Connector connector0(handle0_.Pass()); | |
| 238 internal::Connector connector1(handle1_.Pass()); | |
| 239 | |
| 240 const char kText[] = "hello world"; | |
| 241 | |
| 242 Message message1; | |
| 243 AllocMessage(kText, &message1); | |
| 244 | |
| 245 MessagePipe pipe; | |
| 246 message1.mutable_handles()->push_back(pipe.handle0.release()); | |
| 247 | |
| 248 connector0.Accept(&message1); | |
| 249 | |
| 250 // The message should have been transferred, releasing the handles. | |
| 251 EXPECT_TRUE(message1.handles()->empty()); | |
| 252 | |
| 253 MessageAccumulator accumulator; | |
| 254 connector1.set_incoming_receiver(&accumulator); | |
| 255 | |
| 256 PumpMessages(); | |
| 257 | |
| 258 ASSERT_FALSE(accumulator.IsEmpty()); | |
| 259 | |
| 260 Message message_received; | |
| 261 accumulator.Pop(&message_received); | |
| 262 | |
| 263 EXPECT_EQ( | |
| 264 std::string(kText), | |
| 265 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
| 266 ASSERT_EQ(1U, message_received.handles()->size()); | |
| 267 | |
| 268 // Now send a message to the transferred handle and confirm it's sent through | |
| 269 // to the orginal pipe. | |
| 270 // TODO(vtl): Do we need a better way of "downcasting" the handle types? | |
| 271 ScopedMessagePipeHandle smph; | |
| 272 smph.reset(MessagePipeHandle(message_received.handles()->front().value())); | |
| 273 message_received.mutable_handles()->front() = Handle(); | |
| 274 // |smph| now owns this handle. | |
| 275 | |
| 276 internal::Connector connector_received(smph.Pass()); | |
| 277 internal::Connector connector_original(pipe.handle1.Pass()); | |
| 278 | |
| 279 Message message2; | |
| 280 AllocMessage(kText, &message2); | |
| 281 | |
| 282 connector_received.Accept(&message2); | |
| 283 connector_original.set_incoming_receiver(&accumulator); | |
| 284 PumpMessages(); | |
| 285 | |
| 286 ASSERT_FALSE(accumulator.IsEmpty()); | |
| 287 | |
| 288 accumulator.Pop(&message_received); | |
| 289 | |
| 290 EXPECT_EQ( | |
| 291 std::string(kText), | |
| 292 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
| 293 } | |
| 294 | |
| 295 TEST_F(ConnectorTest, WaitForIncomingMessageWithError) { | |
| 296 internal::Connector connector0(handle0_.Pass()); | |
| 297 // Close the other end of the pipe. | |
| 298 handle1_.reset(); | |
| 299 ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE)); | |
| 300 } | |
| 301 | |
| 302 class ConnectorDeletingMessageAccumulator : public MessageAccumulator { | |
| 303 public: | |
| 304 explicit ConnectorDeletingMessageAccumulator(internal::Connector** connector) | |
| 305 : connector_(connector) {} | |
| 306 | |
| 307 bool Accept(Message* message) override { | |
| 308 delete *connector_; | |
| 309 *connector_ = 0; | |
| 310 return MessageAccumulator::Accept(message); | |
| 311 } | |
| 312 | |
| 313 private: | |
| 314 internal::Connector** connector_; | |
| 315 | |
| 316 MOJO_DISALLOW_COPY_AND_ASSIGN(ConnectorDeletingMessageAccumulator); | |
| 317 }; | |
| 318 | |
| 319 TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) { | |
| 320 internal::Connector connector0(handle0_.Pass()); | |
| 321 internal::Connector* connector1 = new internal::Connector(handle1_.Pass()); | |
| 322 | |
| 323 const char kText[] = "hello world"; | |
| 324 | |
| 325 Message message; | |
| 326 AllocMessage(kText, &message); | |
| 327 | |
| 328 connector0.Accept(&message); | |
| 329 | |
| 330 ConnectorDeletingMessageAccumulator accumulator(&connector1); | |
| 331 connector1->set_incoming_receiver(&accumulator); | |
| 332 | |
| 333 connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); | |
| 334 | |
| 335 ASSERT_FALSE(connector1); | |
| 336 ASSERT_FALSE(accumulator.IsEmpty()); | |
| 337 | |
| 338 Message message_received; | |
| 339 accumulator.Pop(&message_received); | |
| 340 | |
| 341 EXPECT_EQ( | |
| 342 std::string(kText), | |
| 343 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
| 344 } | |
| 345 | |
| 346 class ReentrantMessageAccumulator : public MessageAccumulator { | |
| 347 public: | |
| 348 explicit ReentrantMessageAccumulator(internal::Connector* connector) | |
| 349 : connector_(connector), number_of_calls_(0) {} | |
| 350 | |
| 351 bool Accept(Message* message) override { | |
| 352 if (!MessageAccumulator::Accept(message)) | |
| 353 return false; | |
| 354 number_of_calls_++; | |
| 355 if (number_of_calls_ == 1) { | |
| 356 return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); | |
| 357 } | |
| 358 return true; | |
| 359 } | |
| 360 | |
| 361 int number_of_calls() { return number_of_calls_; } | |
| 362 | |
| 363 private: | |
| 364 internal::Connector* connector_; | |
| 365 int number_of_calls_; | |
| 366 | |
| 367 MOJO_DISALLOW_COPY_AND_ASSIGN(ReentrantMessageAccumulator); | |
| 368 }; | |
| 369 | |
| 370 TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) { | |
| 371 internal::Connector connector0(handle0_.Pass()); | |
| 372 internal::Connector connector1(handle1_.Pass()); | |
| 373 | |
| 374 const char* kText[] = {"hello", "world"}; | |
| 375 | |
| 376 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { | |
| 377 Message message; | |
| 378 AllocMessage(kText[i], &message); | |
| 379 | |
| 380 connector0.Accept(&message); | |
| 381 } | |
| 382 | |
| 383 ReentrantMessageAccumulator accumulator(&connector1); | |
| 384 connector1.set_incoming_receiver(&accumulator); | |
| 385 | |
| 386 PumpMessages(); | |
| 387 | |
| 388 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { | |
| 389 ASSERT_FALSE(accumulator.IsEmpty()); | |
| 390 | |
| 391 Message message_received; | |
| 392 accumulator.Pop(&message_received); | |
| 393 | |
| 394 EXPECT_EQ( | |
| 395 std::string(kText[i]), | |
| 396 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
| 397 } | |
| 398 | |
| 399 ASSERT_EQ(2, accumulator.number_of_calls()); | |
| 400 } | |
| 401 | |
| 402 // This message receiver just accepts messages, and responds (to another fixed | |
| 403 // receiver) | |
| 404 class NoTaskStarvationReplier : public MessageReceiver { | |
| 405 public: | |
| 406 explicit NoTaskStarvationReplier(MessageReceiver* reply_to) | |
| 407 : reply_to_(reply_to) { | |
| 408 MOJO_CHECK(reply_to_ != this); | |
| 409 } | |
| 410 | |
| 411 bool Accept(Message* message) override { | |
| 412 num_accepted_++; | |
| 413 | |
| 414 uint32_t name = message->name(); | |
| 415 | |
| 416 if (name >= 10u) { | |
| 417 RunLoop::current()->PostDelayedTask([]() { RunLoop::current()->Quit(); }, | |
| 418 0); | |
| 419 } | |
| 420 | |
| 421 // We don't necessarily expect the quit task to be processed immediately, | |
| 422 // but if some large number (say, ten thousand-ish) messages have been | |
| 423 // processed, we can say that starvation has occurred. | |
| 424 static const uint32_t kStarvationThreshold = 10000; | |
| 425 EXPECT_LE(name, kStarvationThreshold); | |
| 426 // We'd prefer our test not hang, so don't send the reply in the failing | |
| 427 // case. | |
| 428 if (name > kStarvationThreshold) | |
| 429 return true; | |
| 430 | |
| 431 MessageBuilder builder(name + 1u, 0u); | |
| 432 MOJO_CHECK(reply_to_->Accept(builder.message())); | |
| 433 | |
| 434 return true; | |
| 435 } | |
| 436 | |
| 437 unsigned num_accepted() const { return num_accepted_; } | |
| 438 | |
| 439 private: | |
| 440 MessageReceiver* const reply_to_; | |
| 441 unsigned num_accepted_ = 0; | |
| 442 | |
| 443 MOJO_DISALLOW_COPY_AND_ASSIGN(NoTaskStarvationReplier); | |
| 444 }; | |
| 445 | |
| 446 // TODO(vtl): This test currently fails. See the discussion on issue #604 | |
| 447 // (https://github.com/domokit/mojo/issues/604). | |
| 448 TEST_F(ConnectorTest, DISABLED_NoTaskStarvation) { | |
| 449 internal::Connector connector0(handle0_.Pass()); | |
| 450 internal::Connector connector1(handle1_.Pass()); | |
| 451 | |
| 452 // The replier will bounce messages to |connector0|, and will receiver | |
| 453 // messages from |connector1|. | |
| 454 NoTaskStarvationReplier replier(&connector0); | |
| 455 connector1.set_incoming_receiver(&replier); | |
| 456 | |
| 457 // Kick things off by sending a messagge on |connector0| (starting with a | |
| 458 // "name" of 1). | |
| 459 MessageBuilder builder(1u, 0u); | |
| 460 ASSERT_TRUE(connector0.Accept(builder.message())); | |
| 461 | |
| 462 PumpMessages(); | |
| 463 | |
| 464 EXPECT_GE(replier.num_accepted(), 10u); | |
| 465 } | |
| 466 | |
| 467 } // namespace | |
| 468 } // namespace test | |
| 469 } // namespace mojo | |
| OLD | NEW |