Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(324)

Side by Side Diff: mojo/public/cpp/bindings/tests/connector_unittest.cc

Issue 1881243002: Add a (disabled) test to check against starvation caused by Connector::ReadAllAvailableMessages(). (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include <stdlib.h> 5 #include <stdlib.h>
6 #include <string.h> 6 #include <string.h>
7 7
8 #include <string>
9
8 #include "mojo/public/cpp/bindings/lib/connector.h" 10 #include "mojo/public/cpp/bindings/lib/connector.h"
9 #include "mojo/public/cpp/bindings/lib/message_builder.h" 11 #include "mojo/public/cpp/bindings/lib/message_builder.h"
10 #include "mojo/public/cpp/bindings/tests/message_queue.h" 12 #include "mojo/public/cpp/bindings/tests/message_queue.h"
11 #include "mojo/public/cpp/environment/environment.h" 13 #include "mojo/public/cpp/environment/environment.h"
14 #include "mojo/public/cpp/environment/logging.h"
12 #include "mojo/public/cpp/system/macros.h" 15 #include "mojo/public/cpp/system/macros.h"
13 #include "mojo/public/cpp/utility/run_loop.h" 16 #include "mojo/public/cpp/utility/run_loop.h"
14 #include "testing/gtest/include/gtest/gtest.h" 17 #include "testing/gtest/include/gtest/gtest.h"
15 18
16 namespace mojo { 19 namespace mojo {
17 namespace test { 20 namespace test {
18 namespace { 21 namespace {
19 22
20 class MessageAccumulator : public MessageReceiver {
21 public:
22 MessageAccumulator() {}
23
24 bool Accept(Message* message) override {
25 queue_.Push(message);
26 return true;
27 }
28
29 bool IsEmpty() const { return queue_.IsEmpty(); }
30
31 void Pop(Message* message) { queue_.Pop(message); }
32
33 private:
34 MessageQueue queue_;
35 };
36
37 class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
38 public:
39 explicit ConnectorDeletingMessageAccumulator(internal::Connector** connector)
40 : connector_(connector) {}
41
42 bool Accept(Message* message) override {
43 delete *connector_;
44 *connector_ = 0;
45 return MessageAccumulator::Accept(message);
46 }
47
48 private:
49 internal::Connector** connector_;
50 };
51
52 class ReentrantMessageAccumulator : public MessageAccumulator {
53 public:
54 explicit ReentrantMessageAccumulator(internal::Connector* connector)
55 : connector_(connector), number_of_calls_(0) {}
56
57 bool Accept(Message* message) override {
58 if (!MessageAccumulator::Accept(message))
59 return false;
60 number_of_calls_++;
61 if (number_of_calls_ == 1) {
62 return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
63 }
64 return true;
65 }
66
67 int number_of_calls() { return number_of_calls_; }
68
69 private:
70 internal::Connector* connector_;
71 int number_of_calls_;
72 };
73
74 class ConnectorTest : public testing::Test { 23 class ConnectorTest : public testing::Test {
75 public: 24 public:
76 ConnectorTest() {} 25 ConnectorTest() {}
77 26
78 void SetUp() override { 27 void SetUp() override {
79 CreateMessagePipe(nullptr, &handle0_, &handle1_); 28 CreateMessagePipe(nullptr, &handle0_, &handle1_);
80 } 29 }
81 30
82 void TearDown() override {} 31 void TearDown() override {}
83 32
84 void AllocMessage(const char* text, Message* message) { 33 void AllocMessage(const char* text, Message* message) {
85 size_t payload_size = strlen(text) + 1; // Plus null terminator. 34 size_t payload_size = strlen(text) + 1; // Plus null terminator.
86 MessageBuilder builder(1, payload_size); 35 MessageBuilder builder(1, payload_size);
87 memcpy(builder.buffer()->Allocate(payload_size), text, payload_size); 36 memcpy(builder.buffer()->Allocate(payload_size), text, payload_size);
88 37
89 builder.message()->MoveTo(message); 38 builder.message()->MoveTo(message);
90 } 39 }
91 40
92 void PumpMessages() { loop_.RunUntilIdle(); } 41 void PumpMessages() { loop_.RunUntilIdle(); }
93 42
94 protected: 43 protected:
95 ScopedMessagePipeHandle handle0_; 44 ScopedMessagePipeHandle handle0_;
96 ScopedMessagePipeHandle handle1_; 45 ScopedMessagePipeHandle handle1_;
97 46
98 private: 47 private:
99 Environment env_; 48 Environment env_;
100 RunLoop loop_; 49 RunLoop loop_;
50
51 MOJO_DISALLOW_COPY_AND_ASSIGN(ConnectorTest);
52 };
53
54 class MessageAccumulator : public MessageReceiver {
55 public:
56 MessageAccumulator() {}
57
58 bool Accept(Message* message) override {
59 queue_.Push(message);
60 return true;
61 }
62
63 bool IsEmpty() const { return queue_.IsEmpty(); }
64
65 void Pop(Message* message) { queue_.Pop(message); }
66
67 private:
68 MessageQueue queue_;
69
70 MOJO_DISALLOW_COPY_AND_ASSIGN(MessageAccumulator);
101 }; 71 };
102 72
103 TEST_F(ConnectorTest, Basic) { 73 TEST_F(ConnectorTest, Basic) {
104 internal::Connector connector0(handle0_.Pass()); 74 internal::Connector connector0(handle0_.Pass());
105 internal::Connector connector1(handle1_.Pass()); 75 internal::Connector connector1(handle1_.Pass());
106 76
107 const char kText[] = "hello world"; 77 const char kText[] = "hello world";
108 78
109 Message message; 79 Message message;
110 AllocMessage(kText, &message); 80 AllocMessage(kText, &message);
(...skipping 213 matching lines...) Expand 10 before | Expand all | Expand 10 after
324 std::string(reinterpret_cast<const char*>(message_received.payload()))); 294 std::string(reinterpret_cast<const char*>(message_received.payload())));
325 } 295 }
326 296
327 TEST_F(ConnectorTest, WaitForIncomingMessageWithError) { 297 TEST_F(ConnectorTest, WaitForIncomingMessageWithError) {
328 internal::Connector connector0(handle0_.Pass()); 298 internal::Connector connector0(handle0_.Pass());
329 // Close the other end of the pipe. 299 // Close the other end of the pipe.
330 handle1_.reset(); 300 handle1_.reset();
331 ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE)); 301 ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE));
332 } 302 }
333 303
304 class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
305 public:
306 explicit ConnectorDeletingMessageAccumulator(internal::Connector** connector)
307 : connector_(connector) {}
308
309 bool Accept(Message* message) override {
310 delete *connector_;
311 *connector_ = 0;
312 return MessageAccumulator::Accept(message);
313 }
314
315 private:
316 internal::Connector** connector_;
317
318 MOJO_DISALLOW_COPY_AND_ASSIGN(ConnectorDeletingMessageAccumulator);
319 };
320
334 TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) { 321 TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) {
335 internal::Connector connector0(handle0_.Pass()); 322 internal::Connector connector0(handle0_.Pass());
336 internal::Connector* connector1 = new internal::Connector(handle1_.Pass()); 323 internal::Connector* connector1 = new internal::Connector(handle1_.Pass());
337 324
338 const char kText[] = "hello world"; 325 const char kText[] = "hello world";
339 326
340 Message message; 327 Message message;
341 AllocMessage(kText, &message); 328 AllocMessage(kText, &message);
342 329
343 connector0.Accept(&message); 330 connector0.Accept(&message);
344 331
345 ConnectorDeletingMessageAccumulator accumulator(&connector1); 332 ConnectorDeletingMessageAccumulator accumulator(&connector1);
346 connector1->set_incoming_receiver(&accumulator); 333 connector1->set_incoming_receiver(&accumulator);
347 334
348 connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); 335 connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
349 336
350 ASSERT_FALSE(connector1); 337 ASSERT_FALSE(connector1);
351 ASSERT_FALSE(accumulator.IsEmpty()); 338 ASSERT_FALSE(accumulator.IsEmpty());
352 339
353 Message message_received; 340 Message message_received;
354 accumulator.Pop(&message_received); 341 accumulator.Pop(&message_received);
355 342
356 EXPECT_EQ( 343 EXPECT_EQ(
357 std::string(kText), 344 std::string(kText),
358 std::string(reinterpret_cast<const char*>(message_received.payload()))); 345 std::string(reinterpret_cast<const char*>(message_received.payload())));
359 } 346 }
360 347
348 class ReentrantMessageAccumulator : public MessageAccumulator {
349 public:
350 explicit ReentrantMessageAccumulator(internal::Connector* connector)
351 : connector_(connector), number_of_calls_(0) {}
352
353 bool Accept(Message* message) override {
354 if (!MessageAccumulator::Accept(message))
355 return false;
356 number_of_calls_++;
357 if (number_of_calls_ == 1) {
358 return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
359 }
360 return true;
361 }
362
363 int number_of_calls() { return number_of_calls_; }
364
365 private:
366 internal::Connector* connector_;
367 int number_of_calls_;
368
369 MOJO_DISALLOW_COPY_AND_ASSIGN(ReentrantMessageAccumulator);
370 };
371
361 TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) { 372 TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) {
362 internal::Connector connector0(handle0_.Pass()); 373 internal::Connector connector0(handle0_.Pass());
363 internal::Connector connector1(handle1_.Pass()); 374 internal::Connector connector1(handle1_.Pass());
364 375
365 const char* kText[] = {"hello", "world"}; 376 const char* kText[] = {"hello", "world"};
366 377
367 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { 378 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
368 Message message; 379 Message message;
369 AllocMessage(kText[i], &message); 380 AllocMessage(kText[i], &message);
370 381
(...skipping 12 matching lines...) Expand all
383 accumulator.Pop(&message_received); 394 accumulator.Pop(&message_received);
384 395
385 EXPECT_EQ( 396 EXPECT_EQ(
386 std::string(kText[i]), 397 std::string(kText[i]),
387 std::string(reinterpret_cast<const char*>(message_received.payload()))); 398 std::string(reinterpret_cast<const char*>(message_received.payload())));
388 } 399 }
389 400
390 ASSERT_EQ(2, accumulator.number_of_calls()); 401 ASSERT_EQ(2, accumulator.number_of_calls());
391 } 402 }
392 403
404 // This message receiver just accepts messages, and responds (to another fixed
405 // receiver)
406 class NoTaskStarvationReplier : public MessageReceiver {
407 public:
408 explicit NoTaskStarvationReplier(MessageReceiver* reply_to)
409 : reply_to_(reply_to) {
410 MOJO_CHECK(reply_to_ != this);
411 }
412
413 bool Accept(Message* message) override {
414 num_accepted_++;
415
416 uint32_t name = message->name();
417
418 if (name >= 10u) {
419 RunLoop::current()->PostDelayedTask([]() { RunLoop::current()->Quit(); },
420 0);
421 }
422
423 // We don't necessarily expect the quit task to be processed immediately,
424 // but if some large number (say, ten thousand-ish) messages have been
425 // processed, we can say that starvation has occurred.
426 static const uint32_t kStarvationThreshold = 10000;
427 EXPECT_LE(name, kStarvationThreshold);
428 // We'd prefer our test not hang, so don't send the reply in the failing
429 // case.
430 if (name > kStarvationThreshold)
431 return true;
432
433 MessageBuilder builder(name + 1u, 0u);
434 MOJO_CHECK(reply_to_->Accept(builder.message()));
435
436 return true;
437 }
438
439 unsigned num_accepted() const { return num_accepted_; }
440
441 private:
442 MessageReceiver* const reply_to_;
443 unsigned num_accepted_ = 0;
444
445 MOJO_DISALLOW_COPY_AND_ASSIGN(NoTaskStarvationReplier);
446 };
447
448 // TODO(vtl): This test currently fails. See the discussion on issue #604
449 // (https://github.com/domokit/mojo/issues/604).
450 TEST_F(ConnectorTest, DISABLED_NoTaskStarvation) {
451 internal::Connector connector0(handle0_.Pass());
452 internal::Connector connector1(handle1_.Pass());
453
454 // The replier will bounce messages to |connector0|, and will receiver
455 // messages from |connector1|.
456 NoTaskStarvationReplier replier(&connector0);
457 connector1.set_incoming_receiver(&replier);
458
459 // Kick things off by sending a messagge on |connector0| (starting with a
460 // "name" of 1).
461 MessageBuilder builder(1u, 0u);
462 ASSERT_TRUE(connector0.Accept(builder.message()));
463
464 PumpMessages();
465
466 EXPECT_GE(replier.num_accepted(), 10u);
467 }
468
393 } // namespace 469 } // namespace
394 } // namespace test 470 } // namespace test
395 } // namespace mojo 471 } // namespace mojo
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698