Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(238)

Side by Side Diff: mojo/public/cpp/bindings/tests/connector_unittest.cc

Issue 2250183003: Make the fuchsia mojo/public repo the source of truth. (Closed) Base URL: https://github.com/domokit/mojo.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdlib.h>
6 #include <string.h>
7
8 #include <string>
9
10 #include "gtest/gtest.h"
11 #include "mojo/public/cpp/bindings/lib/connector.h"
12 #include "mojo/public/cpp/bindings/lib/message_builder.h"
13 #include "mojo/public/cpp/bindings/tests/message_queue.h"
14 #include "mojo/public/cpp/environment/logging.h"
15 #include "mojo/public/cpp/system/macros.h"
16 #include "mojo/public/cpp/utility/run_loop.h"
17
18 namespace mojo {
19 namespace test {
20 namespace {
21
22 class ConnectorTest : public testing::Test {
23 public:
24 ConnectorTest() {}
25
26 void SetUp() override {
27 CreateMessagePipe(nullptr, &handle0_, &handle1_);
28 }
29
30 void TearDown() override {}
31
32 void AllocMessage(const char* text, Message* message) {
33 size_t payload_size = strlen(text) + 1; // Plus null terminator.
34 MessageBuilder builder(1, payload_size);
35 memcpy(builder.buffer()->Allocate(payload_size), text, payload_size);
36
37 builder.message()->MoveTo(message);
38 }
39
40 void PumpMessages() { loop_.RunUntilIdle(); }
41
42 protected:
43 ScopedMessagePipeHandle handle0_;
44 ScopedMessagePipeHandle handle1_;
45
46 private:
47 RunLoop loop_;
48
49 MOJO_DISALLOW_COPY_AND_ASSIGN(ConnectorTest);
50 };
51
52 class MessageAccumulator : public MessageReceiver {
53 public:
54 MessageAccumulator() {}
55
56 bool Accept(Message* message) override {
57 queue_.Push(message);
58 return true;
59 }
60
61 bool IsEmpty() const { return queue_.IsEmpty(); }
62
63 void Pop(Message* message) { queue_.Pop(message); }
64
65 private:
66 MessageQueue queue_;
67
68 MOJO_DISALLOW_COPY_AND_ASSIGN(MessageAccumulator);
69 };
70
71 TEST_F(ConnectorTest, Basic) {
72 internal::Connector connector0(handle0_.Pass());
73 internal::Connector connector1(handle1_.Pass());
74
75 const char kText[] = "hello world";
76
77 Message message;
78 AllocMessage(kText, &message);
79
80 connector0.Accept(&message);
81
82 MessageAccumulator accumulator;
83 connector1.set_incoming_receiver(&accumulator);
84
85 PumpMessages();
86
87 ASSERT_FALSE(accumulator.IsEmpty());
88
89 Message message_received;
90 accumulator.Pop(&message_received);
91
92 EXPECT_EQ(
93 std::string(kText),
94 std::string(reinterpret_cast<const char*>(message_received.payload())));
95 }
96
97 TEST_F(ConnectorTest, Basic_Synchronous) {
98 internal::Connector connector0(handle0_.Pass());
99 internal::Connector connector1(handle1_.Pass());
100
101 const char kText[] = "hello world";
102
103 Message message;
104 AllocMessage(kText, &message);
105
106 connector0.Accept(&message);
107
108 MessageAccumulator accumulator;
109 connector1.set_incoming_receiver(&accumulator);
110
111 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
112
113 ASSERT_FALSE(accumulator.IsEmpty());
114
115 Message message_received;
116 accumulator.Pop(&message_received);
117
118 EXPECT_EQ(
119 std::string(kText),
120 std::string(reinterpret_cast<const char*>(message_received.payload())));
121 }
122
123 TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) {
124 internal::Connector connector0(handle0_.Pass());
125 internal::Connector connector1(handle1_.Pass());
126
127 MessageAccumulator accumulator;
128 connector1.set_incoming_receiver(&accumulator);
129
130 const char kText[] = "hello world";
131
132 Message message;
133 AllocMessage(kText, &message);
134
135 connector0.Accept(&message);
136
137 PumpMessages();
138
139 ASSERT_FALSE(accumulator.IsEmpty());
140
141 Message message_received;
142 accumulator.Pop(&message_received);
143
144 EXPECT_EQ(
145 std::string(kText),
146 std::string(reinterpret_cast<const char*>(message_received.payload())));
147 }
148
149 TEST_F(ConnectorTest, Basic_TwoMessages) {
150 internal::Connector connector0(handle0_.Pass());
151 internal::Connector connector1(handle1_.Pass());
152
153 const char* kText[] = {"hello", "world"};
154
155 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
156 Message message;
157 AllocMessage(kText[i], &message);
158
159 connector0.Accept(&message);
160 }
161
162 MessageAccumulator accumulator;
163 connector1.set_incoming_receiver(&accumulator);
164
165 PumpMessages();
166
167 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
168 ASSERT_FALSE(accumulator.IsEmpty());
169
170 Message message_received;
171 accumulator.Pop(&message_received);
172
173 EXPECT_EQ(
174 std::string(kText[i]),
175 std::string(reinterpret_cast<const char*>(message_received.payload())));
176 }
177 }
178
179 TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) {
180 internal::Connector connector0(handle0_.Pass());
181 internal::Connector connector1(handle1_.Pass());
182
183 const char* kText[] = {"hello", "world"};
184
185 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
186 Message message;
187 AllocMessage(kText[i], &message);
188
189 connector0.Accept(&message);
190 }
191
192 MessageAccumulator accumulator;
193 connector1.set_incoming_receiver(&accumulator);
194
195 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
196
197 ASSERT_FALSE(accumulator.IsEmpty());
198
199 Message message_received;
200 accumulator.Pop(&message_received);
201
202 EXPECT_EQ(
203 std::string(kText[0]),
204 std::string(reinterpret_cast<const char*>(message_received.payload())));
205
206 ASSERT_TRUE(accumulator.IsEmpty());
207 }
208
209 TEST_F(ConnectorTest, WriteToClosedPipe) {
210 internal::Connector connector0(handle0_.Pass());
211
212 const char kText[] = "hello world";
213
214 Message message;
215 AllocMessage(kText, &message);
216
217 // Close the other end of the pipe.
218 handle1_.reset();
219
220 // Not observed yet because we haven't spun the RunLoop yet.
221 EXPECT_FALSE(connector0.encountered_error());
222
223 // Write failures are not reported.
224 bool ok = connector0.Accept(&message);
225 EXPECT_TRUE(ok);
226
227 // Still not observed.
228 EXPECT_FALSE(connector0.encountered_error());
229
230 // Spin the RunLoop, and then we should start observing the closed pipe.
231 PumpMessages();
232
233 EXPECT_TRUE(connector0.encountered_error());
234 }
235
236 TEST_F(ConnectorTest, MessageWithHandles) {
237 internal::Connector connector0(handle0_.Pass());
238 internal::Connector connector1(handle1_.Pass());
239
240 const char kText[] = "hello world";
241
242 Message message1;
243 AllocMessage(kText, &message1);
244
245 MessagePipe pipe;
246 message1.mutable_handles()->push_back(pipe.handle0.release());
247
248 connector0.Accept(&message1);
249
250 // The message should have been transferred, releasing the handles.
251 EXPECT_TRUE(message1.handles()->empty());
252
253 MessageAccumulator accumulator;
254 connector1.set_incoming_receiver(&accumulator);
255
256 PumpMessages();
257
258 ASSERT_FALSE(accumulator.IsEmpty());
259
260 Message message_received;
261 accumulator.Pop(&message_received);
262
263 EXPECT_EQ(
264 std::string(kText),
265 std::string(reinterpret_cast<const char*>(message_received.payload())));
266 ASSERT_EQ(1U, message_received.handles()->size());
267
268 // Now send a message to the transferred handle and confirm it's sent through
269 // to the orginal pipe.
270 // TODO(vtl): Do we need a better way of "downcasting" the handle types?
271 ScopedMessagePipeHandle smph;
272 smph.reset(MessagePipeHandle(message_received.handles()->front().value()));
273 message_received.mutable_handles()->front() = Handle();
274 // |smph| now owns this handle.
275
276 internal::Connector connector_received(smph.Pass());
277 internal::Connector connector_original(pipe.handle1.Pass());
278
279 Message message2;
280 AllocMessage(kText, &message2);
281
282 connector_received.Accept(&message2);
283 connector_original.set_incoming_receiver(&accumulator);
284 PumpMessages();
285
286 ASSERT_FALSE(accumulator.IsEmpty());
287
288 accumulator.Pop(&message_received);
289
290 EXPECT_EQ(
291 std::string(kText),
292 std::string(reinterpret_cast<const char*>(message_received.payload())));
293 }
294
295 TEST_F(ConnectorTest, WaitForIncomingMessageWithError) {
296 internal::Connector connector0(handle0_.Pass());
297 // Close the other end of the pipe.
298 handle1_.reset();
299 ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE));
300 }
301
302 class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
303 public:
304 explicit ConnectorDeletingMessageAccumulator(internal::Connector** connector)
305 : connector_(connector) {}
306
307 bool Accept(Message* message) override {
308 delete *connector_;
309 *connector_ = 0;
310 return MessageAccumulator::Accept(message);
311 }
312
313 private:
314 internal::Connector** connector_;
315
316 MOJO_DISALLOW_COPY_AND_ASSIGN(ConnectorDeletingMessageAccumulator);
317 };
318
319 TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) {
320 internal::Connector connector0(handle0_.Pass());
321 internal::Connector* connector1 = new internal::Connector(handle1_.Pass());
322
323 const char kText[] = "hello world";
324
325 Message message;
326 AllocMessage(kText, &message);
327
328 connector0.Accept(&message);
329
330 ConnectorDeletingMessageAccumulator accumulator(&connector1);
331 connector1->set_incoming_receiver(&accumulator);
332
333 connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
334
335 ASSERT_FALSE(connector1);
336 ASSERT_FALSE(accumulator.IsEmpty());
337
338 Message message_received;
339 accumulator.Pop(&message_received);
340
341 EXPECT_EQ(
342 std::string(kText),
343 std::string(reinterpret_cast<const char*>(message_received.payload())));
344 }
345
346 class ReentrantMessageAccumulator : public MessageAccumulator {
347 public:
348 explicit ReentrantMessageAccumulator(internal::Connector* connector)
349 : connector_(connector), number_of_calls_(0) {}
350
351 bool Accept(Message* message) override {
352 if (!MessageAccumulator::Accept(message))
353 return false;
354 number_of_calls_++;
355 if (number_of_calls_ == 1) {
356 return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
357 }
358 return true;
359 }
360
361 int number_of_calls() { return number_of_calls_; }
362
363 private:
364 internal::Connector* connector_;
365 int number_of_calls_;
366
367 MOJO_DISALLOW_COPY_AND_ASSIGN(ReentrantMessageAccumulator);
368 };
369
370 TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) {
371 internal::Connector connector0(handle0_.Pass());
372 internal::Connector connector1(handle1_.Pass());
373
374 const char* kText[] = {"hello", "world"};
375
376 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
377 Message message;
378 AllocMessage(kText[i], &message);
379
380 connector0.Accept(&message);
381 }
382
383 ReentrantMessageAccumulator accumulator(&connector1);
384 connector1.set_incoming_receiver(&accumulator);
385
386 PumpMessages();
387
388 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) {
389 ASSERT_FALSE(accumulator.IsEmpty());
390
391 Message message_received;
392 accumulator.Pop(&message_received);
393
394 EXPECT_EQ(
395 std::string(kText[i]),
396 std::string(reinterpret_cast<const char*>(message_received.payload())));
397 }
398
399 ASSERT_EQ(2, accumulator.number_of_calls());
400 }
401
402 // This message receiver just accepts messages, and responds (to another fixed
403 // receiver)
404 class NoTaskStarvationReplier : public MessageReceiver {
405 public:
406 explicit NoTaskStarvationReplier(MessageReceiver* reply_to)
407 : reply_to_(reply_to) {
408 MOJO_CHECK(reply_to_ != this);
409 }
410
411 bool Accept(Message* message) override {
412 num_accepted_++;
413
414 uint32_t name = message->name();
415
416 if (name >= 10u) {
417 RunLoop::current()->PostDelayedTask([]() { RunLoop::current()->Quit(); },
418 0);
419 }
420
421 // We don't necessarily expect the quit task to be processed immediately,
422 // but if some large number (say, ten thousand-ish) messages have been
423 // processed, we can say that starvation has occurred.
424 static const uint32_t kStarvationThreshold = 10000;
425 EXPECT_LE(name, kStarvationThreshold);
426 // We'd prefer our test not hang, so don't send the reply in the failing
427 // case.
428 if (name > kStarvationThreshold)
429 return true;
430
431 MessageBuilder builder(name + 1u, 0u);
432 MOJO_CHECK(reply_to_->Accept(builder.message()));
433
434 return true;
435 }
436
437 unsigned num_accepted() const { return num_accepted_; }
438
439 private:
440 MessageReceiver* const reply_to_;
441 unsigned num_accepted_ = 0;
442
443 MOJO_DISALLOW_COPY_AND_ASSIGN(NoTaskStarvationReplier);
444 };
445
446 // TODO(vtl): This test currently fails. See the discussion on issue #604
447 // (https://github.com/domokit/mojo/issues/604).
448 TEST_F(ConnectorTest, DISABLED_NoTaskStarvation) {
449 internal::Connector connector0(handle0_.Pass());
450 internal::Connector connector1(handle1_.Pass());
451
452 // The replier will bounce messages to |connector0|, and will receiver
453 // messages from |connector1|.
454 NoTaskStarvationReplier replier(&connector0);
455 connector1.set_incoming_receiver(&replier);
456
457 // Kick things off by sending a messagge on |connector0| (starting with a
458 // "name" of 1).
459 MessageBuilder builder(1u, 0u);
460 ASSERT_TRUE(connector0.Accept(builder.message()));
461
462 PumpMessages();
463
464 EXPECT_GE(replier.num_accepted(), 10u);
465 }
466
467 } // namespace
468 } // namespace test
469 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/tests/callback_unittest.cc ('k') | mojo/public/cpp/bindings/tests/constant_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698