OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a | |
6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to | |
7 // increase tolerance and reduce observed flakiness (though doing so reduces the | |
8 // meaningfulness of the test). | |
9 | |
10 #include "mojo/system/message_pipe_dispatcher.h" | |
11 | |
12 #include <string.h> | |
13 | |
14 #include <limits> | |
15 | |
16 #include "base/memory/ref_counted.h" | |
17 #include "base/memory/scoped_vector.h" | |
18 #include "base/rand_util.h" | |
19 #include "base/threading/platform_thread.h" // For |Sleep()|. | |
20 #include "base/threading/simple_thread.h" | |
21 #include "base/time/time.h" | |
22 #include "mojo/system/message_pipe.h" | |
23 #include "mojo/system/test_utils.h" | |
24 #include "mojo/system/waiter.h" | |
25 #include "mojo/system/waiter_test_utils.h" | |
26 #include "testing/gtest/include/gtest/gtest.h" | |
27 | |
28 namespace mojo { | |
29 namespace system { | |
30 namespace { | |
31 | |
32 TEST(MessagePipeDispatcherTest, Basic) { | |
33 test::Stopwatch stopwatch; | |
34 int32_t buffer[1]; | |
35 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); | |
36 uint32_t buffer_size; | |
37 | |
38 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. | |
39 for (unsigned i = 0; i < 2; i++) { | |
40 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( | |
41 MessagePipeDispatcher::kDefaultCreateOptions)); | |
42 EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType()); | |
43 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( | |
44 MessagePipeDispatcher::kDefaultCreateOptions)); | |
45 { | |
46 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
47 d0->Init(mp, i); // 0, 1. | |
48 d1->Init(mp, i ^ 1); // 1, 0. | |
49 } | |
50 Waiter w; | |
51 uint32_t context = 0; | |
52 HandleSignalsState hss; | |
53 | |
54 // Try adding a writable waiter when already writable. | |
55 w.Init(); | |
56 hss = HandleSignalsState(); | |
57 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, | |
58 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); | |
59 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); | |
60 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
61 hss.satisfiable_signals); | |
62 // Shouldn't need to remove the waiter (it was not added). | |
63 | |
64 // Add a readable waiter to |d0|, then make it readable (by writing to | |
65 // |d1|), then wait. | |
66 w.Init(); | |
67 ASSERT_EQ(MOJO_RESULT_OK, | |
68 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr)); | |
69 buffer[0] = 123456789; | |
70 EXPECT_EQ(MOJO_RESULT_OK, | |
71 d1->WriteMessage(UserPointer<const void>(buffer), | |
72 kBufferSize, | |
73 nullptr, | |
74 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
75 stopwatch.Start(); | |
76 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context)); | |
77 EXPECT_EQ(1u, context); | |
78 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); | |
79 hss = HandleSignalsState(); | |
80 d0->RemoveWaiter(&w, &hss); | |
81 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
82 hss.satisfied_signals); | |
83 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
84 hss.satisfiable_signals); | |
85 | |
86 // Try adding a readable waiter when already readable (from above). | |
87 w.Init(); | |
88 hss = HandleSignalsState(); | |
89 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, | |
90 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss)); | |
91 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
92 hss.satisfied_signals); | |
93 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
94 hss.satisfiable_signals); | |
95 // Shouldn't need to remove the waiter (it was not added). | |
96 | |
97 // Make |d0| no longer readable (by reading from it). | |
98 buffer[0] = 0; | |
99 buffer_size = kBufferSize; | |
100 EXPECT_EQ(MOJO_RESULT_OK, | |
101 d0->ReadMessage(UserPointer<void>(buffer), | |
102 MakeUserPointer(&buffer_size), | |
103 0, | |
104 nullptr, | |
105 MOJO_READ_MESSAGE_FLAG_NONE)); | |
106 EXPECT_EQ(kBufferSize, buffer_size); | |
107 EXPECT_EQ(123456789, buffer[0]); | |
108 | |
109 // Wait for zero time for readability on |d0| (will time out). | |
110 w.Init(); | |
111 ASSERT_EQ(MOJO_RESULT_OK, | |
112 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr)); | |
113 stopwatch.Start(); | |
114 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr)); | |
115 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); | |
116 hss = HandleSignalsState(); | |
117 d0->RemoveWaiter(&w, &hss); | |
118 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); | |
119 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
120 hss.satisfiable_signals); | |
121 | |
122 // Wait for non-zero, finite time for readability on |d0| (will time out). | |
123 w.Init(); | |
124 ASSERT_EQ(MOJO_RESULT_OK, | |
125 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr)); | |
126 stopwatch.Start(); | |
127 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, | |
128 w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), nullptr)); | |
129 base::TimeDelta elapsed = stopwatch.Elapsed(); | |
130 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); | |
131 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); | |
132 hss = HandleSignalsState(); | |
133 d0->RemoveWaiter(&w, &hss); | |
134 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); | |
135 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
136 hss.satisfiable_signals); | |
137 | |
138 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
139 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
140 } | |
141 } | |
142 | |
143 TEST(MessagePipeDispatcherTest, InvalidParams) { | |
144 char buffer[1]; | |
145 | |
146 scoped_refptr<MessagePipeDispatcher> d0( | |
147 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
148 scoped_refptr<MessagePipeDispatcher> d1( | |
149 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
150 { | |
151 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
152 d0->Init(mp, 0); | |
153 d1->Init(mp, 1); | |
154 } | |
155 | |
156 // |WriteMessage|: | |
157 // Huge buffer size. | |
158 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, | |
159 d0->WriteMessage(UserPointer<const void>(buffer), | |
160 std::numeric_limits<uint32_t>::max(), | |
161 nullptr, | |
162 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
163 | |
164 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
165 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
166 } | |
167 | |
168 // These test invalid arguments that should cause death if we're being paranoid | |
169 // about checking arguments (which we would want to do if, e.g., we were in a | |
170 // true "kernel" situation, but we might not want to do otherwise for | |
171 // performance reasons). Probably blatant errors like passing in null pointers | |
172 // (for required pointer arguments) will still cause death, but perhaps not | |
173 // predictably. | |
174 TEST(MessagePipeDispatcherTest, InvalidParamsDeath) { | |
175 const char kMemoryCheckFailedRegex[] = "Check failed"; | |
176 | |
177 scoped_refptr<MessagePipeDispatcher> d0( | |
178 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
179 scoped_refptr<MessagePipeDispatcher> d1( | |
180 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
181 { | |
182 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
183 d0->Init(mp, 0); | |
184 d1->Init(mp, 1); | |
185 } | |
186 | |
187 // |WriteMessage|: | |
188 // Null buffer with nonzero buffer size. | |
189 EXPECT_DEATH_IF_SUPPORTED( | |
190 d0->WriteMessage( | |
191 NullUserPointer(), 1, nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE), | |
192 kMemoryCheckFailedRegex); | |
193 | |
194 // |ReadMessage|: | |
195 // Null buffer with nonzero buffer size. | |
196 // First write something so that we actually have something to read. | |
197 EXPECT_EQ(MOJO_RESULT_OK, | |
198 d1->WriteMessage(UserPointer<const void>("x"), | |
199 1, | |
200 nullptr, | |
201 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
202 uint32_t buffer_size = 1; | |
203 EXPECT_DEATH_IF_SUPPORTED(d0->ReadMessage(NullUserPointer(), | |
204 MakeUserPointer(&buffer_size), | |
205 0, | |
206 nullptr, | |
207 MOJO_READ_MESSAGE_FLAG_NONE), | |
208 kMemoryCheckFailedRegex); | |
209 | |
210 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
211 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
212 } | |
213 | |
214 // Test what happens when one end is closed (single-threaded test). | |
215 TEST(MessagePipeDispatcherTest, BasicClosed) { | |
216 int32_t buffer[1]; | |
217 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); | |
218 uint32_t buffer_size; | |
219 | |
220 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. | |
221 for (unsigned i = 0; i < 2; i++) { | |
222 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( | |
223 MessagePipeDispatcher::kDefaultCreateOptions)); | |
224 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( | |
225 MessagePipeDispatcher::kDefaultCreateOptions)); | |
226 { | |
227 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
228 d0->Init(mp, i); // 0, 1. | |
229 d1->Init(mp, i ^ 1); // 1, 0. | |
230 } | |
231 Waiter w; | |
232 HandleSignalsState hss; | |
233 | |
234 // Write (twice) to |d1|. | |
235 buffer[0] = 123456789; | |
236 EXPECT_EQ(MOJO_RESULT_OK, | |
237 d1->WriteMessage(UserPointer<const void>(buffer), | |
238 kBufferSize, | |
239 nullptr, | |
240 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
241 buffer[0] = 234567890; | |
242 EXPECT_EQ(MOJO_RESULT_OK, | |
243 d1->WriteMessage(UserPointer<const void>(buffer), | |
244 kBufferSize, | |
245 nullptr, | |
246 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
247 | |
248 // Try waiting for readable on |d0|; should fail (already satisfied). | |
249 w.Init(); | |
250 hss = HandleSignalsState(); | |
251 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, | |
252 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); | |
253 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
254 hss.satisfied_signals); | |
255 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
256 hss.satisfiable_signals); | |
257 | |
258 // Try reading from |d1|; should fail (nothing to read). | |
259 buffer[0] = 0; | |
260 buffer_size = kBufferSize; | |
261 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, | |
262 d1->ReadMessage(UserPointer<void>(buffer), | |
263 MakeUserPointer(&buffer_size), | |
264 0, | |
265 nullptr, | |
266 MOJO_READ_MESSAGE_FLAG_NONE)); | |
267 | |
268 // Close |d1|. | |
269 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
270 | |
271 // Try waiting for readable on |d0|; should fail (already satisfied). | |
272 w.Init(); | |
273 hss = HandleSignalsState(); | |
274 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, | |
275 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss)); | |
276 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); | |
277 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); | |
278 | |
279 // Read from |d0|. | |
280 buffer[0] = 0; | |
281 buffer_size = kBufferSize; | |
282 EXPECT_EQ(MOJO_RESULT_OK, | |
283 d0->ReadMessage(UserPointer<void>(buffer), | |
284 MakeUserPointer(&buffer_size), | |
285 0, | |
286 nullptr, | |
287 MOJO_READ_MESSAGE_FLAG_NONE)); | |
288 EXPECT_EQ(kBufferSize, buffer_size); | |
289 EXPECT_EQ(123456789, buffer[0]); | |
290 | |
291 // Try waiting for readable on |d0|; should fail (already satisfied). | |
292 w.Init(); | |
293 hss = HandleSignalsState(); | |
294 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, | |
295 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss)); | |
296 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); | |
297 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals); | |
298 | |
299 // Read again from |d0|. | |
300 buffer[0] = 0; | |
301 buffer_size = kBufferSize; | |
302 EXPECT_EQ(MOJO_RESULT_OK, | |
303 d0->ReadMessage(UserPointer<void>(buffer), | |
304 MakeUserPointer(&buffer_size), | |
305 0, | |
306 nullptr, | |
307 MOJO_READ_MESSAGE_FLAG_NONE)); | |
308 EXPECT_EQ(kBufferSize, buffer_size); | |
309 EXPECT_EQ(234567890, buffer[0]); | |
310 | |
311 // Try waiting for readable on |d0|; should fail (unsatisfiable). | |
312 w.Init(); | |
313 hss = HandleSignalsState(); | |
314 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, | |
315 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss)); | |
316 EXPECT_EQ(0u, hss.satisfied_signals); | |
317 EXPECT_EQ(0u, hss.satisfiable_signals); | |
318 | |
319 // Try waiting for writable on |d0|; should fail (unsatisfiable). | |
320 w.Init(); | |
321 hss = HandleSignalsState(); | |
322 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, | |
323 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss)); | |
324 EXPECT_EQ(0u, hss.satisfied_signals); | |
325 EXPECT_EQ(0u, hss.satisfiable_signals); | |
326 | |
327 // Try reading from |d0|; should fail (nothing to read and other end | |
328 // closed). | |
329 buffer[0] = 0; | |
330 buffer_size = kBufferSize; | |
331 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, | |
332 d0->ReadMessage(UserPointer<void>(buffer), | |
333 MakeUserPointer(&buffer_size), | |
334 0, | |
335 nullptr, | |
336 MOJO_READ_MESSAGE_FLAG_NONE)); | |
337 | |
338 // Try writing to |d0|; should fail (other end closed). | |
339 buffer[0] = 345678901; | |
340 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, | |
341 d0->WriteMessage(UserPointer<const void>(buffer), | |
342 kBufferSize, | |
343 nullptr, | |
344 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
345 | |
346 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
347 } | |
348 } | |
349 | |
350 #if defined(OS_WIN) | |
351 // http://crbug.com/396386 | |
352 #define MAYBE_BasicThreaded DISABLED_BasicThreaded | |
353 #else | |
354 #define MAYBE_BasicThreaded BasicThreaded | |
355 #endif | |
356 TEST(MessagePipeDispatcherTest, MAYBE_BasicThreaded) { | |
357 test::Stopwatch stopwatch; | |
358 int32_t buffer[1]; | |
359 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); | |
360 uint32_t buffer_size; | |
361 base::TimeDelta elapsed; | |
362 bool did_wait; | |
363 MojoResult result; | |
364 uint32_t context; | |
365 HandleSignalsState hss; | |
366 | |
367 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. | |
368 for (unsigned i = 0; i < 2; i++) { | |
369 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( | |
370 MessagePipeDispatcher::kDefaultCreateOptions)); | |
371 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( | |
372 MessagePipeDispatcher::kDefaultCreateOptions)); | |
373 { | |
374 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
375 d0->Init(mp, i); // 0, 1. | |
376 d1->Init(mp, i ^ 1); // 1, 0. | |
377 } | |
378 | |
379 // Wait for readable on |d1|, which will become readable after some time. | |
380 { | |
381 test::WaiterThread thread(d1, | |
382 MOJO_HANDLE_SIGNAL_READABLE, | |
383 MOJO_DEADLINE_INDEFINITE, | |
384 1, | |
385 &did_wait, | |
386 &result, | |
387 &context, | |
388 &hss); | |
389 stopwatch.Start(); | |
390 thread.Start(); | |
391 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); | |
392 // Wake it up by writing to |d0|. | |
393 buffer[0] = 123456789; | |
394 EXPECT_EQ(MOJO_RESULT_OK, | |
395 d0->WriteMessage(UserPointer<const void>(buffer), | |
396 kBufferSize, | |
397 nullptr, | |
398 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
399 } // Joins the thread. | |
400 elapsed = stopwatch.Elapsed(); | |
401 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); | |
402 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); | |
403 EXPECT_TRUE(did_wait); | |
404 EXPECT_EQ(MOJO_RESULT_OK, result); | |
405 EXPECT_EQ(1u, context); | |
406 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
407 hss.satisfied_signals); | |
408 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
409 hss.satisfiable_signals); | |
410 | |
411 // Now |d1| is already readable. Try waiting for it again. | |
412 { | |
413 test::WaiterThread thread(d1, | |
414 MOJO_HANDLE_SIGNAL_READABLE, | |
415 MOJO_DEADLINE_INDEFINITE, | |
416 2, | |
417 &did_wait, | |
418 &result, | |
419 &context, | |
420 &hss); | |
421 stopwatch.Start(); | |
422 thread.Start(); | |
423 } // Joins the thread. | |
424 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); | |
425 EXPECT_FALSE(did_wait); | |
426 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); | |
427 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
428 hss.satisfied_signals); | |
429 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
430 hss.satisfiable_signals); | |
431 | |
432 // Consume what we wrote to |d0|. | |
433 buffer[0] = 0; | |
434 buffer_size = kBufferSize; | |
435 EXPECT_EQ(MOJO_RESULT_OK, | |
436 d1->ReadMessage(UserPointer<void>(buffer), | |
437 MakeUserPointer(&buffer_size), | |
438 0, | |
439 nullptr, | |
440 MOJO_READ_MESSAGE_FLAG_NONE)); | |
441 EXPECT_EQ(kBufferSize, buffer_size); | |
442 EXPECT_EQ(123456789, buffer[0]); | |
443 | |
444 // Wait for readable on |d1| and close |d0| after some time, which should | |
445 // cancel that wait. | |
446 { | |
447 test::WaiterThread thread(d1, | |
448 MOJO_HANDLE_SIGNAL_READABLE, | |
449 MOJO_DEADLINE_INDEFINITE, | |
450 3, | |
451 &did_wait, | |
452 &result, | |
453 &context, | |
454 &hss); | |
455 stopwatch.Start(); | |
456 thread.Start(); | |
457 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); | |
458 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
459 } // Joins the thread. | |
460 elapsed = stopwatch.Elapsed(); | |
461 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); | |
462 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); | |
463 EXPECT_TRUE(did_wait); | |
464 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); | |
465 EXPECT_EQ(3u, context); | |
466 EXPECT_EQ(0u, hss.satisfied_signals); | |
467 EXPECT_EQ(0u, hss.satisfiable_signals); | |
468 | |
469 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
470 } | |
471 | |
472 for (unsigned i = 0; i < 2; i++) { | |
473 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( | |
474 MessagePipeDispatcher::kDefaultCreateOptions)); | |
475 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( | |
476 MessagePipeDispatcher::kDefaultCreateOptions)); | |
477 { | |
478 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
479 d0->Init(mp, i); // 0, 1. | |
480 d1->Init(mp, i ^ 1); // 1, 0. | |
481 } | |
482 | |
483 // Wait for readable on |d1| and close |d1| after some time, which should | |
484 // cancel that wait. | |
485 { | |
486 test::WaiterThread thread(d1, | |
487 MOJO_HANDLE_SIGNAL_READABLE, | |
488 MOJO_DEADLINE_INDEFINITE, | |
489 4, | |
490 &did_wait, | |
491 &result, | |
492 &context, | |
493 &hss); | |
494 stopwatch.Start(); | |
495 thread.Start(); | |
496 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); | |
497 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
498 } // Joins the thread. | |
499 elapsed = stopwatch.Elapsed(); | |
500 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); | |
501 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); | |
502 EXPECT_TRUE(did_wait); | |
503 EXPECT_EQ(MOJO_RESULT_CANCELLED, result); | |
504 EXPECT_EQ(4u, context); | |
505 EXPECT_EQ(0u, hss.satisfied_signals); | |
506 EXPECT_EQ(0u, hss.satisfiable_signals); | |
507 | |
508 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
509 } | |
510 } | |
511 | |
512 // Stress test ----------------------------------------------------------------- | |
513 | |
514 const size_t kMaxMessageSize = 2000; | |
515 | |
516 class WriterThread : public base::SimpleThread { | |
517 public: | |
518 // |*messages_written| and |*bytes_written| belong to the thread while it's | |
519 // alive. | |
520 WriterThread(scoped_refptr<Dispatcher> write_dispatcher, | |
521 size_t* messages_written, | |
522 size_t* bytes_written) | |
523 : base::SimpleThread("writer_thread"), | |
524 write_dispatcher_(write_dispatcher), | |
525 messages_written_(messages_written), | |
526 bytes_written_(bytes_written) { | |
527 *messages_written_ = 0; | |
528 *bytes_written_ = 0; | |
529 } | |
530 | |
531 virtual ~WriterThread() { Join(); } | |
532 | |
533 private: | |
534 virtual void Run() override { | |
535 // Make some data to write. | |
536 unsigned char buffer[kMaxMessageSize]; | |
537 for (size_t i = 0; i < kMaxMessageSize; i++) | |
538 buffer[i] = static_cast<unsigned char>(i); | |
539 | |
540 // Number of messages to write. | |
541 *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000)); | |
542 | |
543 // Write messages. | |
544 for (size_t i = 0; i < *messages_written_; i++) { | |
545 uint32_t bytes_to_write = static_cast<uint32_t>( | |
546 base::RandInt(1, static_cast<int>(kMaxMessageSize))); | |
547 EXPECT_EQ(MOJO_RESULT_OK, | |
548 write_dispatcher_->WriteMessage(UserPointer<const void>(buffer), | |
549 bytes_to_write, | |
550 nullptr, | |
551 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
552 *bytes_written_ += bytes_to_write; | |
553 } | |
554 | |
555 // Write one last "quit" message. | |
556 EXPECT_EQ(MOJO_RESULT_OK, | |
557 write_dispatcher_->WriteMessage(UserPointer<const void>("quit"), | |
558 4, | |
559 nullptr, | |
560 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
561 } | |
562 | |
563 const scoped_refptr<Dispatcher> write_dispatcher_; | |
564 size_t* const messages_written_; | |
565 size_t* const bytes_written_; | |
566 | |
567 DISALLOW_COPY_AND_ASSIGN(WriterThread); | |
568 }; | |
569 | |
570 class ReaderThread : public base::SimpleThread { | |
571 public: | |
572 // |*messages_read| and |*bytes_read| belong to the thread while it's alive. | |
573 ReaderThread(scoped_refptr<Dispatcher> read_dispatcher, | |
574 size_t* messages_read, | |
575 size_t* bytes_read) | |
576 : base::SimpleThread("reader_thread"), | |
577 read_dispatcher_(read_dispatcher), | |
578 messages_read_(messages_read), | |
579 bytes_read_(bytes_read) { | |
580 *messages_read_ = 0; | |
581 *bytes_read_ = 0; | |
582 } | |
583 | |
584 virtual ~ReaderThread() { Join(); } | |
585 | |
586 private: | |
587 virtual void Run() override { | |
588 unsigned char buffer[kMaxMessageSize]; | |
589 Waiter w; | |
590 HandleSignalsState hss; | |
591 MojoResult result; | |
592 | |
593 // Read messages. | |
594 for (;;) { | |
595 // Wait for it to be readable. | |
596 w.Init(); | |
597 hss = HandleSignalsState(); | |
598 result = | |
599 read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss); | |
600 EXPECT_TRUE(result == MOJO_RESULT_OK || | |
601 result == MOJO_RESULT_ALREADY_EXISTS) | |
602 << "result: " << result; | |
603 if (result == MOJO_RESULT_OK) { | |
604 // Actually need to wait. | |
605 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr)); | |
606 read_dispatcher_->RemoveWaiter(&w, &hss); | |
607 } | |
608 // We may not actually be readable, since we're racing with other threads. | |
609 EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE)); | |
610 | |
611 // Now, try to do the read. | |
612 // Clear the buffer so that we can check the result. | |
613 memset(buffer, 0, sizeof(buffer)); | |
614 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); | |
615 result = read_dispatcher_->ReadMessage(UserPointer<void>(buffer), | |
616 MakeUserPointer(&buffer_size), | |
617 0, | |
618 nullptr, | |
619 MOJO_READ_MESSAGE_FLAG_NONE); | |
620 EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT) | |
621 << "result: " << result; | |
622 // We're racing with others to read, so maybe we failed. | |
623 if (result == MOJO_RESULT_SHOULD_WAIT) | |
624 continue; // In which case, try again. | |
625 // Check for quit. | |
626 if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0) | |
627 return; | |
628 EXPECT_GE(buffer_size, 1u); | |
629 EXPECT_LE(buffer_size, kMaxMessageSize); | |
630 EXPECT_TRUE(IsValidMessage(buffer, buffer_size)); | |
631 | |
632 (*messages_read_)++; | |
633 *bytes_read_ += buffer_size; | |
634 } | |
635 } | |
636 | |
637 static bool IsValidMessage(const unsigned char* buffer, | |
638 uint32_t message_size) { | |
639 size_t i; | |
640 for (i = 0; i < message_size; i++) { | |
641 if (buffer[i] != static_cast<unsigned char>(i)) | |
642 return false; | |
643 } | |
644 // Check that the remaining bytes weren't stomped on. | |
645 for (; i < kMaxMessageSize; i++) { | |
646 if (buffer[i] != 0) | |
647 return false; | |
648 } | |
649 return true; | |
650 } | |
651 | |
652 const scoped_refptr<Dispatcher> read_dispatcher_; | |
653 size_t* const messages_read_; | |
654 size_t* const bytes_read_; | |
655 | |
656 DISALLOW_COPY_AND_ASSIGN(ReaderThread); | |
657 }; | |
658 | |
659 TEST(MessagePipeDispatcherTest, Stress) { | |
660 static const size_t kNumWriters = 30; | |
661 static const size_t kNumReaders = kNumWriters; | |
662 | |
663 scoped_refptr<MessagePipeDispatcher> d_write( | |
664 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
665 scoped_refptr<MessagePipeDispatcher> d_read( | |
666 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
667 { | |
668 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
669 d_write->Init(mp, 0); | |
670 d_read->Init(mp, 1); | |
671 } | |
672 | |
673 size_t messages_written[kNumWriters]; | |
674 size_t bytes_written[kNumWriters]; | |
675 size_t messages_read[kNumReaders]; | |
676 size_t bytes_read[kNumReaders]; | |
677 { | |
678 // Make writers. | |
679 ScopedVector<WriterThread> writers; | |
680 for (size_t i = 0; i < kNumWriters; i++) { | |
681 writers.push_back( | |
682 new WriterThread(d_write, &messages_written[i], &bytes_written[i])); | |
683 } | |
684 | |
685 // Make readers. | |
686 ScopedVector<ReaderThread> readers; | |
687 for (size_t i = 0; i < kNumReaders; i++) { | |
688 readers.push_back( | |
689 new ReaderThread(d_read, &messages_read[i], &bytes_read[i])); | |
690 } | |
691 | |
692 // Start writers. | |
693 for (size_t i = 0; i < kNumWriters; i++) | |
694 writers[i]->Start(); | |
695 | |
696 // Start readers. | |
697 for (size_t i = 0; i < kNumReaders; i++) | |
698 readers[i]->Start(); | |
699 | |
700 // TODO(vtl): Maybe I should have an event that triggers all the threads to | |
701 // start doing stuff for real (so that the first ones created/started aren't | |
702 // advantaged). | |
703 } // Joins all the threads. | |
704 | |
705 size_t total_messages_written = 0; | |
706 size_t total_bytes_written = 0; | |
707 for (size_t i = 0; i < kNumWriters; i++) { | |
708 total_messages_written += messages_written[i]; | |
709 total_bytes_written += bytes_written[i]; | |
710 } | |
711 size_t total_messages_read = 0; | |
712 size_t total_bytes_read = 0; | |
713 for (size_t i = 0; i < kNumReaders; i++) { | |
714 total_messages_read += messages_read[i]; | |
715 total_bytes_read += bytes_read[i]; | |
716 // We'd have to be really unlucky to have read no messages on a thread. | |
717 EXPECT_GT(messages_read[i], 0u) << "reader: " << i; | |
718 EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i; | |
719 } | |
720 EXPECT_EQ(total_messages_written, total_messages_read); | |
721 EXPECT_EQ(total_bytes_written, total_bytes_read); | |
722 | |
723 EXPECT_EQ(MOJO_RESULT_OK, d_write->Close()); | |
724 EXPECT_EQ(MOJO_RESULT_OK, d_read->Close()); | |
725 } | |
726 | |
727 } // namespace | |
728 } // namespace system | |
729 } // namespace mojo | |
OLD | NEW |