OLD | NEW |
| (Empty) |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a | |
6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to | |
7 // increase tolerance and reduce observed flakiness (though doing so reduces the | |
8 // meaningfulness of the test). | |
9 | |
10 #include "mojo/edk/system/message_pipe_dispatcher.h" | |
11 | |
12 #include <string.h> | |
13 | |
14 #include <limits> | |
15 | |
16 #include "base/memory/ref_counted.h" | |
17 #include "base/memory/scoped_vector.h" | |
18 #include "base/rand_util.h" | |
19 #include "base/threading/platform_thread.h" // For |Sleep()|. | |
20 #include "base/threading/simple_thread.h" | |
21 #include "base/time/time.h" | |
22 #include "mojo/edk/system/message_pipe.h" | |
23 #include "mojo/edk/system/test_utils.h" | |
24 #include "mojo/edk/system/waiter.h" | |
25 #include "mojo/edk/system/waiter_test_utils.h" | |
26 #include "testing/gtest/include/gtest/gtest.h" | |
27 | |
28 namespace mojo { | |
29 namespace system { | |
30 namespace { | |
31 | |
32 const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE | | |
33 MOJO_HANDLE_SIGNAL_WRITABLE | | |
34 MOJO_HANDLE_SIGNAL_PEER_CLOSED; | |
35 | |
36 TEST(MessagePipeDispatcherTest, Basic) { | |
37 test::Stopwatch stopwatch; | |
38 int32_t buffer[1]; | |
39 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); | |
40 uint32_t buffer_size; | |
41 | |
42 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. | |
43 for (unsigned i = 0; i < 2; i++) { | |
44 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( | |
45 MessagePipeDispatcher::kDefaultCreateOptions)); | |
46 EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType()); | |
47 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( | |
48 MessagePipeDispatcher::kDefaultCreateOptions)); | |
49 { | |
50 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
51 d0->Init(mp, i); // 0, 1. | |
52 d1->Init(mp, i ^ 1); // 1, 0. | |
53 } | |
54 Waiter w; | |
55 uint32_t context = 0; | |
56 HandleSignalsState hss; | |
57 | |
58 // Try adding a writable waiter when already writable. | |
59 w.Init(); | |
60 hss = HandleSignalsState(); | |
61 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, | |
62 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); | |
63 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); | |
64 EXPECT_EQ(kAllSignals, hss.satisfiable_signals); | |
65 // Shouldn't need to remove the waiter (it was not added). | |
66 | |
67 // Add a readable waiter to |d0|, then make it readable (by writing to | |
68 // |d1|), then wait. | |
69 w.Init(); | |
70 ASSERT_EQ(MOJO_RESULT_OK, | |
71 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr)); | |
72 buffer[0] = 123456789; | |
73 EXPECT_EQ(MOJO_RESULT_OK, | |
74 d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize, | |
75 nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
76 stopwatch.Start(); | |
77 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context)); | |
78 EXPECT_EQ(1u, context); | |
79 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); | |
80 hss = HandleSignalsState(); | |
81 d0->RemoveAwakable(&w, &hss); | |
82 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
83 hss.satisfied_signals); | |
84 EXPECT_EQ(kAllSignals, hss.satisfiable_signals); | |
85 | |
86 // Try adding a readable waiter when already readable (from above). | |
87 w.Init(); | |
88 hss = HandleSignalsState(); | |
89 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, | |
90 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss)); | |
91 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
92 hss.satisfied_signals); | |
93 EXPECT_EQ(kAllSignals, hss.satisfiable_signals); | |
94 // Shouldn't need to remove the waiter (it was not added). | |
95 | |
96 // Make |d0| no longer readable (by reading from it). | |
97 buffer[0] = 0; | |
98 buffer_size = kBufferSize; | |
99 EXPECT_EQ(MOJO_RESULT_OK, | |
100 d0->ReadMessage(UserPointer<void>(buffer), | |
101 MakeUserPointer(&buffer_size), 0, nullptr, | |
102 MOJO_READ_MESSAGE_FLAG_NONE)); | |
103 EXPECT_EQ(kBufferSize, buffer_size); | |
104 EXPECT_EQ(123456789, buffer[0]); | |
105 | |
106 // Wait for zero time for readability on |d0| (will time out). | |
107 w.Init(); | |
108 ASSERT_EQ(MOJO_RESULT_OK, | |
109 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr)); | |
110 stopwatch.Start(); | |
111 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr)); | |
112 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); | |
113 hss = HandleSignalsState(); | |
114 d0->RemoveAwakable(&w, &hss); | |
115 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); | |
116 EXPECT_EQ(kAllSignals, hss.satisfiable_signals); | |
117 | |
118 // Wait for non-zero, finite time for readability on |d0| (will time out). | |
119 w.Init(); | |
120 ASSERT_EQ(MOJO_RESULT_OK, | |
121 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr)); | |
122 stopwatch.Start(); | |
123 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, | |
124 w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), nullptr)); | |
125 base::TimeDelta elapsed = stopwatch.Elapsed(); | |
126 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); | |
127 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); | |
128 hss = HandleSignalsState(); | |
129 d0->RemoveAwakable(&w, &hss); | |
130 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); | |
131 EXPECT_EQ(kAllSignals, hss.satisfiable_signals); | |
132 | |
133 // Check the peer closed signal. | |
134 w.Init(); | |
135 ASSERT_EQ(MOJO_RESULT_OK, | |
136 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr)); | |
137 | |
138 // Close the peer. | |
139 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
140 | |
141 // It should be signaled. | |
142 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(1000, &context)); | |
143 EXPECT_EQ(12u, context); | |
144 hss = HandleSignalsState(); | |
145 d0->RemoveAwakable(&w, &hss); | |
146 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); | |
147 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); | |
148 | |
149 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
150 } | |
151 } | |
152 | |
153 TEST(MessagePipeDispatcherTest, InvalidParams) { | |
154 char buffer[1]; | |
155 | |
156 scoped_refptr<MessagePipeDispatcher> d0( | |
157 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
158 scoped_refptr<MessagePipeDispatcher> d1( | |
159 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
160 { | |
161 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
162 d0->Init(mp, 0); | |
163 d1->Init(mp, 1); | |
164 } | |
165 | |
166 // |WriteMessage|: | |
167 // Huge buffer size. | |
168 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED, | |
169 d0->WriteMessage(UserPointer<const void>(buffer), | |
170 std::numeric_limits<uint32_t>::max(), nullptr, | |
171 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
172 | |
173 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
174 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
175 } | |
176 | |
177 // These test invalid arguments that should cause death if we're being paranoid | |
178 // about checking arguments (which we would want to do if, e.g., we were in a | |
179 // true "kernel" situation, but we might not want to do otherwise for | |
180 // performance reasons). Probably blatant errors like passing in null pointers | |
181 // (for required pointer arguments) will still cause death, but perhaps not | |
182 // predictably. | |
183 TEST(MessagePipeDispatcherTest, InvalidParamsDeath) { | |
184 const char kMemoryCheckFailedRegex[] = "Check failed"; | |
185 | |
186 scoped_refptr<MessagePipeDispatcher> d0( | |
187 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
188 scoped_refptr<MessagePipeDispatcher> d1( | |
189 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
190 { | |
191 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
192 d0->Init(mp, 0); | |
193 d1->Init(mp, 1); | |
194 } | |
195 | |
196 // |WriteMessage|: | |
197 // Null buffer with nonzero buffer size. | |
198 EXPECT_DEATH_IF_SUPPORTED(d0->WriteMessage(NullUserPointer(), 1, nullptr, | |
199 MOJO_WRITE_MESSAGE_FLAG_NONE), | |
200 kMemoryCheckFailedRegex); | |
201 | |
202 // |ReadMessage|: | |
203 // Null buffer with nonzero buffer size. | |
204 // First write something so that we actually have something to read. | |
205 EXPECT_EQ(MOJO_RESULT_OK, | |
206 d1->WriteMessage(UserPointer<const void>("x"), 1, nullptr, | |
207 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
208 uint32_t buffer_size = 1; | |
209 EXPECT_DEATH_IF_SUPPORTED( | |
210 d0->ReadMessage(NullUserPointer(), MakeUserPointer(&buffer_size), 0, | |
211 nullptr, MOJO_READ_MESSAGE_FLAG_NONE), | |
212 kMemoryCheckFailedRegex); | |
213 | |
214 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
215 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
216 } | |
217 | |
218 // Test what happens when one end is closed (single-threaded test). | |
219 TEST(MessagePipeDispatcherTest, BasicClosed) { | |
220 int32_t buffer[1]; | |
221 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); | |
222 uint32_t buffer_size; | |
223 | |
224 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. | |
225 for (unsigned i = 0; i < 2; i++) { | |
226 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( | |
227 MessagePipeDispatcher::kDefaultCreateOptions)); | |
228 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( | |
229 MessagePipeDispatcher::kDefaultCreateOptions)); | |
230 { | |
231 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
232 d0->Init(mp, i); // 0, 1. | |
233 d1->Init(mp, i ^ 1); // 1, 0. | |
234 } | |
235 Waiter w; | |
236 HandleSignalsState hss; | |
237 | |
238 // Write (twice) to |d1|. | |
239 buffer[0] = 123456789; | |
240 EXPECT_EQ(MOJO_RESULT_OK, | |
241 d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize, | |
242 nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
243 buffer[0] = 234567890; | |
244 EXPECT_EQ(MOJO_RESULT_OK, | |
245 d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize, | |
246 nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
247 | |
248 // Try waiting for readable on |d0|; should fail (already satisfied). | |
249 w.Init(); | |
250 hss = HandleSignalsState(); | |
251 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, | |
252 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss)); | |
253 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
254 hss.satisfied_signals); | |
255 EXPECT_EQ(kAllSignals, hss.satisfiable_signals); | |
256 | |
257 // Try reading from |d1|; should fail (nothing to read). | |
258 buffer[0] = 0; | |
259 buffer_size = kBufferSize; | |
260 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, | |
261 d1->ReadMessage(UserPointer<void>(buffer), | |
262 MakeUserPointer(&buffer_size), 0, nullptr, | |
263 MOJO_READ_MESSAGE_FLAG_NONE)); | |
264 | |
265 // Close |d1|. | |
266 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
267 | |
268 // Try waiting for readable on |d0|; should fail (already satisfied). | |
269 w.Init(); | |
270 hss = HandleSignalsState(); | |
271 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, | |
272 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss)); | |
273 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, | |
274 hss.satisfied_signals); | |
275 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, | |
276 hss.satisfiable_signals); | |
277 | |
278 // Read from |d0|. | |
279 buffer[0] = 0; | |
280 buffer_size = kBufferSize; | |
281 EXPECT_EQ(MOJO_RESULT_OK, | |
282 d0->ReadMessage(UserPointer<void>(buffer), | |
283 MakeUserPointer(&buffer_size), 0, nullptr, | |
284 MOJO_READ_MESSAGE_FLAG_NONE)); | |
285 EXPECT_EQ(kBufferSize, buffer_size); | |
286 EXPECT_EQ(123456789, buffer[0]); | |
287 | |
288 // Try waiting for readable on |d0|; should fail (already satisfied). | |
289 w.Init(); | |
290 hss = HandleSignalsState(); | |
291 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, | |
292 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss)); | |
293 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, | |
294 hss.satisfied_signals); | |
295 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, | |
296 hss.satisfiable_signals); | |
297 | |
298 // Read again from |d0|. | |
299 buffer[0] = 0; | |
300 buffer_size = kBufferSize; | |
301 EXPECT_EQ(MOJO_RESULT_OK, | |
302 d0->ReadMessage(UserPointer<void>(buffer), | |
303 MakeUserPointer(&buffer_size), 0, nullptr, | |
304 MOJO_READ_MESSAGE_FLAG_NONE)); | |
305 EXPECT_EQ(kBufferSize, buffer_size); | |
306 EXPECT_EQ(234567890, buffer[0]); | |
307 | |
308 // Try waiting for readable on |d0|; should fail (unsatisfiable). | |
309 w.Init(); | |
310 hss = HandleSignalsState(); | |
311 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, | |
312 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss)); | |
313 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); | |
314 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); | |
315 | |
316 // Try waiting for writable on |d0|; should fail (unsatisfiable). | |
317 w.Init(); | |
318 hss = HandleSignalsState(); | |
319 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, | |
320 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss)); | |
321 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); | |
322 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); | |
323 | |
324 // Try reading from |d0|; should fail (nothing to read and other end | |
325 // closed). | |
326 buffer[0] = 0; | |
327 buffer_size = kBufferSize; | |
328 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, | |
329 d0->ReadMessage(UserPointer<void>(buffer), | |
330 MakeUserPointer(&buffer_size), 0, nullptr, | |
331 MOJO_READ_MESSAGE_FLAG_NONE)); | |
332 | |
333 // Try writing to |d0|; should fail (other end closed). | |
334 buffer[0] = 345678901; | |
335 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, | |
336 d0->WriteMessage(UserPointer<const void>(buffer), kBufferSize, | |
337 nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
338 | |
339 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
340 } | |
341 } | |
342 | |
343 #if defined(OS_WIN) | |
344 // http://crbug.com/396386 | |
345 #define MAYBE_BasicThreaded DISABLED_BasicThreaded | |
346 #else | |
347 #define MAYBE_BasicThreaded BasicThreaded | |
348 #endif | |
349 TEST(MessagePipeDispatcherTest, MAYBE_BasicThreaded) { | |
350 test::Stopwatch stopwatch; | |
351 int32_t buffer[1]; | |
352 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer)); | |
353 uint32_t buffer_size; | |
354 base::TimeDelta elapsed; | |
355 bool did_wait; | |
356 MojoResult result; | |
357 uint32_t context; | |
358 HandleSignalsState hss; | |
359 | |
360 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa. | |
361 for (unsigned i = 0; i < 2; i++) { | |
362 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( | |
363 MessagePipeDispatcher::kDefaultCreateOptions)); | |
364 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( | |
365 MessagePipeDispatcher::kDefaultCreateOptions)); | |
366 { | |
367 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
368 d0->Init(mp, i); // 0, 1. | |
369 d1->Init(mp, i ^ 1); // 1, 0. | |
370 } | |
371 | |
372 // Wait for readable on |d1|, which will become readable after some time. | |
373 { | |
374 test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, | |
375 MOJO_DEADLINE_INDEFINITE, 1, &did_wait, &result, | |
376 &context, &hss); | |
377 stopwatch.Start(); | |
378 thread.Start(); | |
379 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); | |
380 // Wake it up by writing to |d0|. | |
381 buffer[0] = 123456789; | |
382 EXPECT_EQ(MOJO_RESULT_OK, | |
383 d0->WriteMessage(UserPointer<const void>(buffer), kBufferSize, | |
384 nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
385 } // Joins the thread. | |
386 elapsed = stopwatch.Elapsed(); | |
387 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); | |
388 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); | |
389 EXPECT_TRUE(did_wait); | |
390 EXPECT_EQ(MOJO_RESULT_OK, result); | |
391 EXPECT_EQ(1u, context); | |
392 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
393 hss.satisfied_signals); | |
394 EXPECT_EQ(kAllSignals, hss.satisfiable_signals); | |
395 | |
396 // Now |d1| is already readable. Try waiting for it again. | |
397 { | |
398 test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, | |
399 MOJO_DEADLINE_INDEFINITE, 2, &did_wait, &result, | |
400 &context, &hss); | |
401 stopwatch.Start(); | |
402 thread.Start(); | |
403 } // Joins the thread. | |
404 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout()); | |
405 EXPECT_FALSE(did_wait); | |
406 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); | |
407 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, | |
408 hss.satisfied_signals); | |
409 EXPECT_EQ(kAllSignals, hss.satisfiable_signals); | |
410 | |
411 // Consume what we wrote to |d0|. | |
412 buffer[0] = 0; | |
413 buffer_size = kBufferSize; | |
414 EXPECT_EQ(MOJO_RESULT_OK, | |
415 d1->ReadMessage(UserPointer<void>(buffer), | |
416 MakeUserPointer(&buffer_size), 0, nullptr, | |
417 MOJO_READ_MESSAGE_FLAG_NONE)); | |
418 EXPECT_EQ(kBufferSize, buffer_size); | |
419 EXPECT_EQ(123456789, buffer[0]); | |
420 | |
421 // Wait for readable on |d1| and close |d0| after some time, which should | |
422 // cancel that wait. | |
423 { | |
424 test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, | |
425 MOJO_DEADLINE_INDEFINITE, 3, &did_wait, &result, | |
426 &context, &hss); | |
427 stopwatch.Start(); | |
428 thread.Start(); | |
429 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); | |
430 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
431 } // Joins the thread. | |
432 elapsed = stopwatch.Elapsed(); | |
433 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); | |
434 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); | |
435 EXPECT_TRUE(did_wait); | |
436 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); | |
437 EXPECT_EQ(3u, context); | |
438 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); | |
439 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); | |
440 | |
441 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
442 } | |
443 | |
444 for (unsigned i = 0; i < 2; i++) { | |
445 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher( | |
446 MessagePipeDispatcher::kDefaultCreateOptions)); | |
447 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher( | |
448 MessagePipeDispatcher::kDefaultCreateOptions)); | |
449 { | |
450 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
451 d0->Init(mp, i); // 0, 1. | |
452 d1->Init(mp, i ^ 1); // 1, 0. | |
453 } | |
454 | |
455 // Wait for readable on |d1| and close |d1| after some time, which should | |
456 // cancel that wait. | |
457 { | |
458 test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE, | |
459 MOJO_DEADLINE_INDEFINITE, 4, &did_wait, &result, | |
460 &context, &hss); | |
461 stopwatch.Start(); | |
462 thread.Start(); | |
463 base::PlatformThread::Sleep(2 * test::EpsilonTimeout()); | |
464 EXPECT_EQ(MOJO_RESULT_OK, d1->Close()); | |
465 } // Joins the thread. | |
466 elapsed = stopwatch.Elapsed(); | |
467 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout()); | |
468 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout()); | |
469 EXPECT_TRUE(did_wait); | |
470 EXPECT_EQ(MOJO_RESULT_CANCELLED, result); | |
471 EXPECT_EQ(4u, context); | |
472 EXPECT_EQ(0u, hss.satisfied_signals); | |
473 EXPECT_EQ(0u, hss.satisfiable_signals); | |
474 | |
475 EXPECT_EQ(MOJO_RESULT_OK, d0->Close()); | |
476 } | |
477 } | |
478 | |
479 // Stress test ----------------------------------------------------------------- | |
480 | |
481 const size_t kMaxMessageSize = 2000; | |
482 | |
483 class WriterThread : public base::SimpleThread { | |
484 public: | |
485 // |*messages_written| and |*bytes_written| belong to the thread while it's | |
486 // alive. | |
487 WriterThread(scoped_refptr<Dispatcher> write_dispatcher, | |
488 size_t* messages_written, | |
489 size_t* bytes_written) | |
490 : base::SimpleThread("writer_thread"), | |
491 write_dispatcher_(write_dispatcher), | |
492 messages_written_(messages_written), | |
493 bytes_written_(bytes_written) { | |
494 *messages_written_ = 0; | |
495 *bytes_written_ = 0; | |
496 } | |
497 | |
498 ~WriterThread() override { Join(); } | |
499 | |
500 private: | |
501 void Run() override { | |
502 // Make some data to write. | |
503 unsigned char buffer[kMaxMessageSize]; | |
504 for (size_t i = 0; i < kMaxMessageSize; i++) | |
505 buffer[i] = static_cast<unsigned char>(i); | |
506 | |
507 // Number of messages to write. | |
508 *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000)); | |
509 | |
510 // Write messages. | |
511 for (size_t i = 0; i < *messages_written_; i++) { | |
512 uint32_t bytes_to_write = static_cast<uint32_t>( | |
513 base::RandInt(1, static_cast<int>(kMaxMessageSize))); | |
514 EXPECT_EQ(MOJO_RESULT_OK, | |
515 write_dispatcher_->WriteMessage(UserPointer<const void>(buffer), | |
516 bytes_to_write, nullptr, | |
517 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
518 *bytes_written_ += bytes_to_write; | |
519 } | |
520 | |
521 // Write one last "quit" message. | |
522 EXPECT_EQ(MOJO_RESULT_OK, write_dispatcher_->WriteMessage( | |
523 UserPointer<const void>("quit"), 4, nullptr, | |
524 MOJO_WRITE_MESSAGE_FLAG_NONE)); | |
525 } | |
526 | |
527 const scoped_refptr<Dispatcher> write_dispatcher_; | |
528 size_t* const messages_written_; | |
529 size_t* const bytes_written_; | |
530 | |
531 DISALLOW_COPY_AND_ASSIGN(WriterThread); | |
532 }; | |
533 | |
534 class ReaderThread : public base::SimpleThread { | |
535 public: | |
536 // |*messages_read| and |*bytes_read| belong to the thread while it's alive. | |
537 ReaderThread(scoped_refptr<Dispatcher> read_dispatcher, | |
538 size_t* messages_read, | |
539 size_t* bytes_read) | |
540 : base::SimpleThread("reader_thread"), | |
541 read_dispatcher_(read_dispatcher), | |
542 messages_read_(messages_read), | |
543 bytes_read_(bytes_read) { | |
544 *messages_read_ = 0; | |
545 *bytes_read_ = 0; | |
546 } | |
547 | |
548 ~ReaderThread() override { Join(); } | |
549 | |
550 private: | |
551 void Run() override { | |
552 unsigned char buffer[kMaxMessageSize]; | |
553 Waiter w; | |
554 HandleSignalsState hss; | |
555 MojoResult result; | |
556 | |
557 // Read messages. | |
558 for (;;) { | |
559 // Wait for it to be readable. | |
560 w.Init(); | |
561 hss = HandleSignalsState(); | |
562 result = read_dispatcher_->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, | |
563 &hss); | |
564 EXPECT_TRUE(result == MOJO_RESULT_OK || | |
565 result == MOJO_RESULT_ALREADY_EXISTS) | |
566 << "result: " << result; | |
567 if (result == MOJO_RESULT_OK) { | |
568 // Actually need to wait. | |
569 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr)); | |
570 read_dispatcher_->RemoveAwakable(&w, &hss); | |
571 } | |
572 // We may not actually be readable, since we're racing with other threads. | |
573 EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE)); | |
574 | |
575 // Now, try to do the read. | |
576 // Clear the buffer so that we can check the result. | |
577 memset(buffer, 0, sizeof(buffer)); | |
578 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer)); | |
579 result = read_dispatcher_->ReadMessage( | |
580 UserPointer<void>(buffer), MakeUserPointer(&buffer_size), 0, nullptr, | |
581 MOJO_READ_MESSAGE_FLAG_NONE); | |
582 EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT) | |
583 << "result: " << result; | |
584 // We're racing with others to read, so maybe we failed. | |
585 if (result == MOJO_RESULT_SHOULD_WAIT) | |
586 continue; // In which case, try again. | |
587 // Check for quit. | |
588 if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0) | |
589 return; | |
590 EXPECT_GE(buffer_size, 1u); | |
591 EXPECT_LE(buffer_size, kMaxMessageSize); | |
592 EXPECT_TRUE(IsValidMessage(buffer, buffer_size)); | |
593 | |
594 (*messages_read_)++; | |
595 *bytes_read_ += buffer_size; | |
596 } | |
597 } | |
598 | |
599 static bool IsValidMessage(const unsigned char* buffer, | |
600 uint32_t message_size) { | |
601 size_t i; | |
602 for (i = 0; i < message_size; i++) { | |
603 if (buffer[i] != static_cast<unsigned char>(i)) | |
604 return false; | |
605 } | |
606 // Check that the remaining bytes weren't stomped on. | |
607 for (; i < kMaxMessageSize; i++) { | |
608 if (buffer[i] != 0) | |
609 return false; | |
610 } | |
611 return true; | |
612 } | |
613 | |
614 const scoped_refptr<Dispatcher> read_dispatcher_; | |
615 size_t* const messages_read_; | |
616 size_t* const bytes_read_; | |
617 | |
618 DISALLOW_COPY_AND_ASSIGN(ReaderThread); | |
619 }; | |
620 | |
621 TEST(MessagePipeDispatcherTest, Stress) { | |
622 static const size_t kNumWriters = 30; | |
623 static const size_t kNumReaders = kNumWriters; | |
624 | |
625 scoped_refptr<MessagePipeDispatcher> d_write( | |
626 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
627 scoped_refptr<MessagePipeDispatcher> d_read( | |
628 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions)); | |
629 { | |
630 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal()); | |
631 d_write->Init(mp, 0); | |
632 d_read->Init(mp, 1); | |
633 } | |
634 | |
635 size_t messages_written[kNumWriters]; | |
636 size_t bytes_written[kNumWriters]; | |
637 size_t messages_read[kNumReaders]; | |
638 size_t bytes_read[kNumReaders]; | |
639 { | |
640 // Make writers. | |
641 ScopedVector<WriterThread> writers; | |
642 for (size_t i = 0; i < kNumWriters; i++) { | |
643 writers.push_back( | |
644 new WriterThread(d_write, &messages_written[i], &bytes_written[i])); | |
645 } | |
646 | |
647 // Make readers. | |
648 ScopedVector<ReaderThread> readers; | |
649 for (size_t i = 0; i < kNumReaders; i++) { | |
650 readers.push_back( | |
651 new ReaderThread(d_read, &messages_read[i], &bytes_read[i])); | |
652 } | |
653 | |
654 // Start writers. | |
655 for (size_t i = 0; i < kNumWriters; i++) | |
656 writers[i]->Start(); | |
657 | |
658 // Start readers. | |
659 for (size_t i = 0; i < kNumReaders; i++) | |
660 readers[i]->Start(); | |
661 | |
662 // TODO(vtl): Maybe I should have an event that triggers all the threads to | |
663 // start doing stuff for real (so that the first ones created/started aren't | |
664 // advantaged). | |
665 } // Joins all the threads. | |
666 | |
667 size_t total_messages_written = 0; | |
668 size_t total_bytes_written = 0; | |
669 for (size_t i = 0; i < kNumWriters; i++) { | |
670 total_messages_written += messages_written[i]; | |
671 total_bytes_written += bytes_written[i]; | |
672 } | |
673 size_t total_messages_read = 0; | |
674 size_t total_bytes_read = 0; | |
675 for (size_t i = 0; i < kNumReaders; i++) { | |
676 total_messages_read += messages_read[i]; | |
677 total_bytes_read += bytes_read[i]; | |
678 // We'd have to be really unlucky to have read no messages on a thread. | |
679 EXPECT_GT(messages_read[i], 0u) << "reader: " << i; | |
680 EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i; | |
681 } | |
682 EXPECT_EQ(total_messages_written, total_messages_read); | |
683 EXPECT_EQ(total_bytes_written, total_bytes_read); | |
684 | |
685 EXPECT_EQ(MOJO_RESULT_OK, d_write->Close()); | |
686 EXPECT_EQ(MOJO_RESULT_OK, d_read->Close()); | |
687 } | |
688 | |
689 } // namespace | |
690 } // namespace system | |
691 } // namespace mojo | |
OLD | NEW |