OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include <stdlib.h> | |
6 #include <string.h> | |
7 | |
8 #include <string> | |
9 | |
10 #include "gtest/gtest.h" | |
11 #include "mojo/public/cpp/bindings/lib/connector.h" | |
12 #include "mojo/public/cpp/bindings/lib/message_builder.h" | |
13 #include "mojo/public/cpp/bindings/tests/message_queue.h" | |
14 #include "mojo/public/cpp/environment/logging.h" | |
15 #include "mojo/public/cpp/system/macros.h" | |
16 #include "mojo/public/cpp/utility/run_loop.h" | |
17 | |
18 namespace mojo { | |
19 namespace test { | |
20 namespace { | |
21 | |
22 class ConnectorTest : public testing::Test { | |
23 public: | |
24 ConnectorTest() {} | |
25 | |
26 void SetUp() override { | |
27 CreateMessagePipe(nullptr, &handle0_, &handle1_); | |
28 } | |
29 | |
30 void TearDown() override {} | |
31 | |
32 void AllocMessage(const char* text, Message* message) { | |
33 size_t payload_size = strlen(text) + 1; // Plus null terminator. | |
34 MessageBuilder builder(1, payload_size); | |
35 memcpy(builder.buffer()->Allocate(payload_size), text, payload_size); | |
36 | |
37 builder.message()->MoveTo(message); | |
38 } | |
39 | |
40 void PumpMessages() { loop_.RunUntilIdle(); } | |
41 | |
42 protected: | |
43 ScopedMessagePipeHandle handle0_; | |
44 ScopedMessagePipeHandle handle1_; | |
45 | |
46 private: | |
47 RunLoop loop_; | |
48 | |
49 MOJO_DISALLOW_COPY_AND_ASSIGN(ConnectorTest); | |
50 }; | |
51 | |
52 class MessageAccumulator : public MessageReceiver { | |
53 public: | |
54 MessageAccumulator() {} | |
55 | |
56 bool Accept(Message* message) override { | |
57 queue_.Push(message); | |
58 return true; | |
59 } | |
60 | |
61 bool IsEmpty() const { return queue_.IsEmpty(); } | |
62 | |
63 void Pop(Message* message) { queue_.Pop(message); } | |
64 | |
65 private: | |
66 MessageQueue queue_; | |
67 | |
68 MOJO_DISALLOW_COPY_AND_ASSIGN(MessageAccumulator); | |
69 }; | |
70 | |
71 TEST_F(ConnectorTest, Basic) { | |
72 internal::Connector connector0(handle0_.Pass()); | |
73 internal::Connector connector1(handle1_.Pass()); | |
74 | |
75 const char kText[] = "hello world"; | |
76 | |
77 Message message; | |
78 AllocMessage(kText, &message); | |
79 | |
80 connector0.Accept(&message); | |
81 | |
82 MessageAccumulator accumulator; | |
83 connector1.set_incoming_receiver(&accumulator); | |
84 | |
85 PumpMessages(); | |
86 | |
87 ASSERT_FALSE(accumulator.IsEmpty()); | |
88 | |
89 Message message_received; | |
90 accumulator.Pop(&message_received); | |
91 | |
92 EXPECT_EQ( | |
93 std::string(kText), | |
94 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
95 } | |
96 | |
97 TEST_F(ConnectorTest, Basic_Synchronous) { | |
98 internal::Connector connector0(handle0_.Pass()); | |
99 internal::Connector connector1(handle1_.Pass()); | |
100 | |
101 const char kText[] = "hello world"; | |
102 | |
103 Message message; | |
104 AllocMessage(kText, &message); | |
105 | |
106 connector0.Accept(&message); | |
107 | |
108 MessageAccumulator accumulator; | |
109 connector1.set_incoming_receiver(&accumulator); | |
110 | |
111 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); | |
112 | |
113 ASSERT_FALSE(accumulator.IsEmpty()); | |
114 | |
115 Message message_received; | |
116 accumulator.Pop(&message_received); | |
117 | |
118 EXPECT_EQ( | |
119 std::string(kText), | |
120 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
121 } | |
122 | |
123 TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) { | |
124 internal::Connector connector0(handle0_.Pass()); | |
125 internal::Connector connector1(handle1_.Pass()); | |
126 | |
127 MessageAccumulator accumulator; | |
128 connector1.set_incoming_receiver(&accumulator); | |
129 | |
130 const char kText[] = "hello world"; | |
131 | |
132 Message message; | |
133 AllocMessage(kText, &message); | |
134 | |
135 connector0.Accept(&message); | |
136 | |
137 PumpMessages(); | |
138 | |
139 ASSERT_FALSE(accumulator.IsEmpty()); | |
140 | |
141 Message message_received; | |
142 accumulator.Pop(&message_received); | |
143 | |
144 EXPECT_EQ( | |
145 std::string(kText), | |
146 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
147 } | |
148 | |
149 TEST_F(ConnectorTest, Basic_TwoMessages) { | |
150 internal::Connector connector0(handle0_.Pass()); | |
151 internal::Connector connector1(handle1_.Pass()); | |
152 | |
153 const char* kText[] = {"hello", "world"}; | |
154 | |
155 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { | |
156 Message message; | |
157 AllocMessage(kText[i], &message); | |
158 | |
159 connector0.Accept(&message); | |
160 } | |
161 | |
162 MessageAccumulator accumulator; | |
163 connector1.set_incoming_receiver(&accumulator); | |
164 | |
165 PumpMessages(); | |
166 | |
167 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { | |
168 ASSERT_FALSE(accumulator.IsEmpty()); | |
169 | |
170 Message message_received; | |
171 accumulator.Pop(&message_received); | |
172 | |
173 EXPECT_EQ( | |
174 std::string(kText[i]), | |
175 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
176 } | |
177 } | |
178 | |
179 TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) { | |
180 internal::Connector connector0(handle0_.Pass()); | |
181 internal::Connector connector1(handle1_.Pass()); | |
182 | |
183 const char* kText[] = {"hello", "world"}; | |
184 | |
185 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { | |
186 Message message; | |
187 AllocMessage(kText[i], &message); | |
188 | |
189 connector0.Accept(&message); | |
190 } | |
191 | |
192 MessageAccumulator accumulator; | |
193 connector1.set_incoming_receiver(&accumulator); | |
194 | |
195 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); | |
196 | |
197 ASSERT_FALSE(accumulator.IsEmpty()); | |
198 | |
199 Message message_received; | |
200 accumulator.Pop(&message_received); | |
201 | |
202 EXPECT_EQ( | |
203 std::string(kText[0]), | |
204 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
205 | |
206 ASSERT_TRUE(accumulator.IsEmpty()); | |
207 } | |
208 | |
209 TEST_F(ConnectorTest, WriteToClosedPipe) { | |
210 internal::Connector connector0(handle0_.Pass()); | |
211 | |
212 const char kText[] = "hello world"; | |
213 | |
214 Message message; | |
215 AllocMessage(kText, &message); | |
216 | |
217 // Close the other end of the pipe. | |
218 handle1_.reset(); | |
219 | |
220 // Not observed yet because we haven't spun the RunLoop yet. | |
221 EXPECT_FALSE(connector0.encountered_error()); | |
222 | |
223 // Write failures are not reported. | |
224 bool ok = connector0.Accept(&message); | |
225 EXPECT_TRUE(ok); | |
226 | |
227 // Still not observed. | |
228 EXPECT_FALSE(connector0.encountered_error()); | |
229 | |
230 // Spin the RunLoop, and then we should start observing the closed pipe. | |
231 PumpMessages(); | |
232 | |
233 EXPECT_TRUE(connector0.encountered_error()); | |
234 } | |
235 | |
236 TEST_F(ConnectorTest, MessageWithHandles) { | |
237 internal::Connector connector0(handle0_.Pass()); | |
238 internal::Connector connector1(handle1_.Pass()); | |
239 | |
240 const char kText[] = "hello world"; | |
241 | |
242 Message message1; | |
243 AllocMessage(kText, &message1); | |
244 | |
245 MessagePipe pipe; | |
246 message1.mutable_handles()->push_back(pipe.handle0.release()); | |
247 | |
248 connector0.Accept(&message1); | |
249 | |
250 // The message should have been transferred, releasing the handles. | |
251 EXPECT_TRUE(message1.handles()->empty()); | |
252 | |
253 MessageAccumulator accumulator; | |
254 connector1.set_incoming_receiver(&accumulator); | |
255 | |
256 PumpMessages(); | |
257 | |
258 ASSERT_FALSE(accumulator.IsEmpty()); | |
259 | |
260 Message message_received; | |
261 accumulator.Pop(&message_received); | |
262 | |
263 EXPECT_EQ( | |
264 std::string(kText), | |
265 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
266 ASSERT_EQ(1U, message_received.handles()->size()); | |
267 | |
268 // Now send a message to the transferred handle and confirm it's sent through | |
269 // to the orginal pipe. | |
270 // TODO(vtl): Do we need a better way of "downcasting" the handle types? | |
271 ScopedMessagePipeHandle smph; | |
272 smph.reset(MessagePipeHandle(message_received.handles()->front().value())); | |
273 message_received.mutable_handles()->front() = Handle(); | |
274 // |smph| now owns this handle. | |
275 | |
276 internal::Connector connector_received(smph.Pass()); | |
277 internal::Connector connector_original(pipe.handle1.Pass()); | |
278 | |
279 Message message2; | |
280 AllocMessage(kText, &message2); | |
281 | |
282 connector_received.Accept(&message2); | |
283 connector_original.set_incoming_receiver(&accumulator); | |
284 PumpMessages(); | |
285 | |
286 ASSERT_FALSE(accumulator.IsEmpty()); | |
287 | |
288 accumulator.Pop(&message_received); | |
289 | |
290 EXPECT_EQ( | |
291 std::string(kText), | |
292 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
293 } | |
294 | |
295 TEST_F(ConnectorTest, WaitForIncomingMessageWithError) { | |
296 internal::Connector connector0(handle0_.Pass()); | |
297 // Close the other end of the pipe. | |
298 handle1_.reset(); | |
299 ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE)); | |
300 } | |
301 | |
302 class ConnectorDeletingMessageAccumulator : public MessageAccumulator { | |
303 public: | |
304 explicit ConnectorDeletingMessageAccumulator(internal::Connector** connector) | |
305 : connector_(connector) {} | |
306 | |
307 bool Accept(Message* message) override { | |
308 delete *connector_; | |
309 *connector_ = 0; | |
310 return MessageAccumulator::Accept(message); | |
311 } | |
312 | |
313 private: | |
314 internal::Connector** connector_; | |
315 | |
316 MOJO_DISALLOW_COPY_AND_ASSIGN(ConnectorDeletingMessageAccumulator); | |
317 }; | |
318 | |
319 TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) { | |
320 internal::Connector connector0(handle0_.Pass()); | |
321 internal::Connector* connector1 = new internal::Connector(handle1_.Pass()); | |
322 | |
323 const char kText[] = "hello world"; | |
324 | |
325 Message message; | |
326 AllocMessage(kText, &message); | |
327 | |
328 connector0.Accept(&message); | |
329 | |
330 ConnectorDeletingMessageAccumulator accumulator(&connector1); | |
331 connector1->set_incoming_receiver(&accumulator); | |
332 | |
333 connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); | |
334 | |
335 ASSERT_FALSE(connector1); | |
336 ASSERT_FALSE(accumulator.IsEmpty()); | |
337 | |
338 Message message_received; | |
339 accumulator.Pop(&message_received); | |
340 | |
341 EXPECT_EQ( | |
342 std::string(kText), | |
343 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
344 } | |
345 | |
346 class ReentrantMessageAccumulator : public MessageAccumulator { | |
347 public: | |
348 explicit ReentrantMessageAccumulator(internal::Connector* connector) | |
349 : connector_(connector), number_of_calls_(0) {} | |
350 | |
351 bool Accept(Message* message) override { | |
352 if (!MessageAccumulator::Accept(message)) | |
353 return false; | |
354 number_of_calls_++; | |
355 if (number_of_calls_ == 1) { | |
356 return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); | |
357 } | |
358 return true; | |
359 } | |
360 | |
361 int number_of_calls() { return number_of_calls_; } | |
362 | |
363 private: | |
364 internal::Connector* connector_; | |
365 int number_of_calls_; | |
366 | |
367 MOJO_DISALLOW_COPY_AND_ASSIGN(ReentrantMessageAccumulator); | |
368 }; | |
369 | |
370 TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) { | |
371 internal::Connector connector0(handle0_.Pass()); | |
372 internal::Connector connector1(handle1_.Pass()); | |
373 | |
374 const char* kText[] = {"hello", "world"}; | |
375 | |
376 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { | |
377 Message message; | |
378 AllocMessage(kText[i], &message); | |
379 | |
380 connector0.Accept(&message); | |
381 } | |
382 | |
383 ReentrantMessageAccumulator accumulator(&connector1); | |
384 connector1.set_incoming_receiver(&accumulator); | |
385 | |
386 PumpMessages(); | |
387 | |
388 for (size_t i = 0; i < MOJO_ARRAYSIZE(kText); ++i) { | |
389 ASSERT_FALSE(accumulator.IsEmpty()); | |
390 | |
391 Message message_received; | |
392 accumulator.Pop(&message_received); | |
393 | |
394 EXPECT_EQ( | |
395 std::string(kText[i]), | |
396 std::string(reinterpret_cast<const char*>(message_received.payload()))); | |
397 } | |
398 | |
399 ASSERT_EQ(2, accumulator.number_of_calls()); | |
400 } | |
401 | |
402 // This message receiver just accepts messages, and responds (to another fixed | |
403 // receiver) | |
404 class NoTaskStarvationReplier : public MessageReceiver { | |
405 public: | |
406 explicit NoTaskStarvationReplier(MessageReceiver* reply_to) | |
407 : reply_to_(reply_to) { | |
408 MOJO_CHECK(reply_to_ != this); | |
409 } | |
410 | |
411 bool Accept(Message* message) override { | |
412 num_accepted_++; | |
413 | |
414 uint32_t name = message->name(); | |
415 | |
416 if (name >= 10u) { | |
417 RunLoop::current()->PostDelayedTask([]() { RunLoop::current()->Quit(); }, | |
418 0); | |
419 } | |
420 | |
421 // We don't necessarily expect the quit task to be processed immediately, | |
422 // but if some large number (say, ten thousand-ish) messages have been | |
423 // processed, we can say that starvation has occurred. | |
424 static const uint32_t kStarvationThreshold = 10000; | |
425 EXPECT_LE(name, kStarvationThreshold); | |
426 // We'd prefer our test not hang, so don't send the reply in the failing | |
427 // case. | |
428 if (name > kStarvationThreshold) | |
429 return true; | |
430 | |
431 MessageBuilder builder(name + 1u, 0u); | |
432 MOJO_CHECK(reply_to_->Accept(builder.message())); | |
433 | |
434 return true; | |
435 } | |
436 | |
437 unsigned num_accepted() const { return num_accepted_; } | |
438 | |
439 private: | |
440 MessageReceiver* const reply_to_; | |
441 unsigned num_accepted_ = 0; | |
442 | |
443 MOJO_DISALLOW_COPY_AND_ASSIGN(NoTaskStarvationReplier); | |
444 }; | |
445 | |
446 // TODO(vtl): This test currently fails. See the discussion on issue #604 | |
447 // (https://github.com/domokit/mojo/issues/604). | |
448 TEST_F(ConnectorTest, DISABLED_NoTaskStarvation) { | |
449 internal::Connector connector0(handle0_.Pass()); | |
450 internal::Connector connector1(handle1_.Pass()); | |
451 | |
452 // The replier will bounce messages to |connector0|, and will receiver | |
453 // messages from |connector1|. | |
454 NoTaskStarvationReplier replier(&connector0); | |
455 connector1.set_incoming_receiver(&replier); | |
456 | |
457 // Kick things off by sending a messagge on |connector0| (starting with a | |
458 // "name" of 1). | |
459 MessageBuilder builder(1u, 0u); | |
460 ASSERT_TRUE(connector0.Accept(builder.message())); | |
461 | |
462 PumpMessages(); | |
463 | |
464 EXPECT_GE(replier.num_accepted(), 10u); | |
465 } | |
466 | |
467 } // namespace | |
468 } // namespace test | |
469 } // namespace mojo | |
OLD | NEW |