Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(20)

Side by Side Diff: mojo/system/message_pipe_dispatcher_unittest.cc

Issue 621153003: Move mojo edk into mojo/edk (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix checkdeps Created 6 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/system/message_pipe_dispatcher.cc ('k') | mojo/system/message_pipe_endpoint.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to
7 // increase tolerance and reduce observed flakiness (though doing so reduces the
8 // meaningfulness of the test).
9
10 #include "mojo/system/message_pipe_dispatcher.h"
11
12 #include <string.h>
13
14 #include <limits>
15
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/scoped_vector.h"
18 #include "base/rand_util.h"
19 #include "base/threading/platform_thread.h" // For |Sleep()|.
20 #include "base/threading/simple_thread.h"
21 #include "base/time/time.h"
22 #include "mojo/system/message_pipe.h"
23 #include "mojo/system/test_utils.h"
24 #include "mojo/system/waiter.h"
25 #include "mojo/system/waiter_test_utils.h"
26 #include "testing/gtest/include/gtest/gtest.h"
27
28 namespace mojo {
29 namespace system {
30 namespace {
31
32 TEST(MessagePipeDispatcherTest, Basic) {
33 test::Stopwatch stopwatch;
34 int32_t buffer[1];
35 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
36 uint32_t buffer_size;
37
38 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
39 for (unsigned i = 0; i < 2; i++) {
40 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
41 MessagePipeDispatcher::kDefaultCreateOptions));
42 EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType());
43 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
44 MessagePipeDispatcher::kDefaultCreateOptions));
45 {
46 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
47 d0->Init(mp, i); // 0, 1.
48 d1->Init(mp, i ^ 1); // 1, 0.
49 }
50 Waiter w;
51 uint32_t context = 0;
52 HandleSignalsState hss;
53
54 // Try adding a writable waiter when already writable.
55 w.Init();
56 hss = HandleSignalsState();
57 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
58 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
59 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
60 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
61 hss.satisfiable_signals);
62 // Shouldn't need to remove the waiter (it was not added).
63
64 // Add a readable waiter to |d0|, then make it readable (by writing to
65 // |d1|), then wait.
66 w.Init();
67 ASSERT_EQ(MOJO_RESULT_OK,
68 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr));
69 buffer[0] = 123456789;
70 EXPECT_EQ(MOJO_RESULT_OK,
71 d1->WriteMessage(UserPointer<const void>(buffer),
72 kBufferSize,
73 nullptr,
74 MOJO_WRITE_MESSAGE_FLAG_NONE));
75 stopwatch.Start();
76 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context));
77 EXPECT_EQ(1u, context);
78 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
79 hss = HandleSignalsState();
80 d0->RemoveWaiter(&w, &hss);
81 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
82 hss.satisfied_signals);
83 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
84 hss.satisfiable_signals);
85
86 // Try adding a readable waiter when already readable (from above).
87 w.Init();
88 hss = HandleSignalsState();
89 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
90 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
91 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
92 hss.satisfied_signals);
93 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
94 hss.satisfiable_signals);
95 // Shouldn't need to remove the waiter (it was not added).
96
97 // Make |d0| no longer readable (by reading from it).
98 buffer[0] = 0;
99 buffer_size = kBufferSize;
100 EXPECT_EQ(MOJO_RESULT_OK,
101 d0->ReadMessage(UserPointer<void>(buffer),
102 MakeUserPointer(&buffer_size),
103 0,
104 nullptr,
105 MOJO_READ_MESSAGE_FLAG_NONE));
106 EXPECT_EQ(kBufferSize, buffer_size);
107 EXPECT_EQ(123456789, buffer[0]);
108
109 // Wait for zero time for readability on |d0| (will time out).
110 w.Init();
111 ASSERT_EQ(MOJO_RESULT_OK,
112 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr));
113 stopwatch.Start();
114 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr));
115 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
116 hss = HandleSignalsState();
117 d0->RemoveWaiter(&w, &hss);
118 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
119 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
120 hss.satisfiable_signals);
121
122 // Wait for non-zero, finite time for readability on |d0| (will time out).
123 w.Init();
124 ASSERT_EQ(MOJO_RESULT_OK,
125 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr));
126 stopwatch.Start();
127 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
128 w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), nullptr));
129 base::TimeDelta elapsed = stopwatch.Elapsed();
130 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
131 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
132 hss = HandleSignalsState();
133 d0->RemoveWaiter(&w, &hss);
134 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
135 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
136 hss.satisfiable_signals);
137
138 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
139 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
140 }
141 }
142
143 TEST(MessagePipeDispatcherTest, InvalidParams) {
144 char buffer[1];
145
146 scoped_refptr<MessagePipeDispatcher> d0(
147 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
148 scoped_refptr<MessagePipeDispatcher> d1(
149 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
150 {
151 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
152 d0->Init(mp, 0);
153 d1->Init(mp, 1);
154 }
155
156 // |WriteMessage|:
157 // Huge buffer size.
158 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
159 d0->WriteMessage(UserPointer<const void>(buffer),
160 std::numeric_limits<uint32_t>::max(),
161 nullptr,
162 MOJO_WRITE_MESSAGE_FLAG_NONE));
163
164 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
165 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
166 }
167
168 // These test invalid arguments that should cause death if we're being paranoid
169 // about checking arguments (which we would want to do if, e.g., we were in a
170 // true "kernel" situation, but we might not want to do otherwise for
171 // performance reasons). Probably blatant errors like passing in null pointers
172 // (for required pointer arguments) will still cause death, but perhaps not
173 // predictably.
174 TEST(MessagePipeDispatcherTest, InvalidParamsDeath) {
175 const char kMemoryCheckFailedRegex[] = "Check failed";
176
177 scoped_refptr<MessagePipeDispatcher> d0(
178 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
179 scoped_refptr<MessagePipeDispatcher> d1(
180 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
181 {
182 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
183 d0->Init(mp, 0);
184 d1->Init(mp, 1);
185 }
186
187 // |WriteMessage|:
188 // Null buffer with nonzero buffer size.
189 EXPECT_DEATH_IF_SUPPORTED(
190 d0->WriteMessage(
191 NullUserPointer(), 1, nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE),
192 kMemoryCheckFailedRegex);
193
194 // |ReadMessage|:
195 // Null buffer with nonzero buffer size.
196 // First write something so that we actually have something to read.
197 EXPECT_EQ(MOJO_RESULT_OK,
198 d1->WriteMessage(UserPointer<const void>("x"),
199 1,
200 nullptr,
201 MOJO_WRITE_MESSAGE_FLAG_NONE));
202 uint32_t buffer_size = 1;
203 EXPECT_DEATH_IF_SUPPORTED(d0->ReadMessage(NullUserPointer(),
204 MakeUserPointer(&buffer_size),
205 0,
206 nullptr,
207 MOJO_READ_MESSAGE_FLAG_NONE),
208 kMemoryCheckFailedRegex);
209
210 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
211 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
212 }
213
214 // Test what happens when one end is closed (single-threaded test).
215 TEST(MessagePipeDispatcherTest, BasicClosed) {
216 int32_t buffer[1];
217 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
218 uint32_t buffer_size;
219
220 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
221 for (unsigned i = 0; i < 2; i++) {
222 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
223 MessagePipeDispatcher::kDefaultCreateOptions));
224 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
225 MessagePipeDispatcher::kDefaultCreateOptions));
226 {
227 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
228 d0->Init(mp, i); // 0, 1.
229 d1->Init(mp, i ^ 1); // 1, 0.
230 }
231 Waiter w;
232 HandleSignalsState hss;
233
234 // Write (twice) to |d1|.
235 buffer[0] = 123456789;
236 EXPECT_EQ(MOJO_RESULT_OK,
237 d1->WriteMessage(UserPointer<const void>(buffer),
238 kBufferSize,
239 nullptr,
240 MOJO_WRITE_MESSAGE_FLAG_NONE));
241 buffer[0] = 234567890;
242 EXPECT_EQ(MOJO_RESULT_OK,
243 d1->WriteMessage(UserPointer<const void>(buffer),
244 kBufferSize,
245 nullptr,
246 MOJO_WRITE_MESSAGE_FLAG_NONE));
247
248 // Try waiting for readable on |d0|; should fail (already satisfied).
249 w.Init();
250 hss = HandleSignalsState();
251 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
252 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
253 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
254 hss.satisfied_signals);
255 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
256 hss.satisfiable_signals);
257
258 // Try reading from |d1|; should fail (nothing to read).
259 buffer[0] = 0;
260 buffer_size = kBufferSize;
261 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
262 d1->ReadMessage(UserPointer<void>(buffer),
263 MakeUserPointer(&buffer_size),
264 0,
265 nullptr,
266 MOJO_READ_MESSAGE_FLAG_NONE));
267
268 // Close |d1|.
269 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
270
271 // Try waiting for readable on |d0|; should fail (already satisfied).
272 w.Init();
273 hss = HandleSignalsState();
274 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
275 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss));
276 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
277 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
278
279 // Read from |d0|.
280 buffer[0] = 0;
281 buffer_size = kBufferSize;
282 EXPECT_EQ(MOJO_RESULT_OK,
283 d0->ReadMessage(UserPointer<void>(buffer),
284 MakeUserPointer(&buffer_size),
285 0,
286 nullptr,
287 MOJO_READ_MESSAGE_FLAG_NONE));
288 EXPECT_EQ(kBufferSize, buffer_size);
289 EXPECT_EQ(123456789, buffer[0]);
290
291 // Try waiting for readable on |d0|; should fail (already satisfied).
292 w.Init();
293 hss = HandleSignalsState();
294 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
295 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
296 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
297 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
298
299 // Read again from |d0|.
300 buffer[0] = 0;
301 buffer_size = kBufferSize;
302 EXPECT_EQ(MOJO_RESULT_OK,
303 d0->ReadMessage(UserPointer<void>(buffer),
304 MakeUserPointer(&buffer_size),
305 0,
306 nullptr,
307 MOJO_READ_MESSAGE_FLAG_NONE));
308 EXPECT_EQ(kBufferSize, buffer_size);
309 EXPECT_EQ(234567890, buffer[0]);
310
311 // Try waiting for readable on |d0|; should fail (unsatisfiable).
312 w.Init();
313 hss = HandleSignalsState();
314 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
315 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss));
316 EXPECT_EQ(0u, hss.satisfied_signals);
317 EXPECT_EQ(0u, hss.satisfiable_signals);
318
319 // Try waiting for writable on |d0|; should fail (unsatisfiable).
320 w.Init();
321 hss = HandleSignalsState();
322 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
323 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss));
324 EXPECT_EQ(0u, hss.satisfied_signals);
325 EXPECT_EQ(0u, hss.satisfiable_signals);
326
327 // Try reading from |d0|; should fail (nothing to read and other end
328 // closed).
329 buffer[0] = 0;
330 buffer_size = kBufferSize;
331 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
332 d0->ReadMessage(UserPointer<void>(buffer),
333 MakeUserPointer(&buffer_size),
334 0,
335 nullptr,
336 MOJO_READ_MESSAGE_FLAG_NONE));
337
338 // Try writing to |d0|; should fail (other end closed).
339 buffer[0] = 345678901;
340 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
341 d0->WriteMessage(UserPointer<const void>(buffer),
342 kBufferSize,
343 nullptr,
344 MOJO_WRITE_MESSAGE_FLAG_NONE));
345
346 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
347 }
348 }
349
350 #if defined(OS_WIN)
351 // http://crbug.com/396386
352 #define MAYBE_BasicThreaded DISABLED_BasicThreaded
353 #else
354 #define MAYBE_BasicThreaded BasicThreaded
355 #endif
356 TEST(MessagePipeDispatcherTest, MAYBE_BasicThreaded) {
357 test::Stopwatch stopwatch;
358 int32_t buffer[1];
359 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
360 uint32_t buffer_size;
361 base::TimeDelta elapsed;
362 bool did_wait;
363 MojoResult result;
364 uint32_t context;
365 HandleSignalsState hss;
366
367 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
368 for (unsigned i = 0; i < 2; i++) {
369 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
370 MessagePipeDispatcher::kDefaultCreateOptions));
371 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
372 MessagePipeDispatcher::kDefaultCreateOptions));
373 {
374 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
375 d0->Init(mp, i); // 0, 1.
376 d1->Init(mp, i ^ 1); // 1, 0.
377 }
378
379 // Wait for readable on |d1|, which will become readable after some time.
380 {
381 test::WaiterThread thread(d1,
382 MOJO_HANDLE_SIGNAL_READABLE,
383 MOJO_DEADLINE_INDEFINITE,
384 1,
385 &did_wait,
386 &result,
387 &context,
388 &hss);
389 stopwatch.Start();
390 thread.Start();
391 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
392 // Wake it up by writing to |d0|.
393 buffer[0] = 123456789;
394 EXPECT_EQ(MOJO_RESULT_OK,
395 d0->WriteMessage(UserPointer<const void>(buffer),
396 kBufferSize,
397 nullptr,
398 MOJO_WRITE_MESSAGE_FLAG_NONE));
399 } // Joins the thread.
400 elapsed = stopwatch.Elapsed();
401 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
402 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
403 EXPECT_TRUE(did_wait);
404 EXPECT_EQ(MOJO_RESULT_OK, result);
405 EXPECT_EQ(1u, context);
406 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
407 hss.satisfied_signals);
408 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
409 hss.satisfiable_signals);
410
411 // Now |d1| is already readable. Try waiting for it again.
412 {
413 test::WaiterThread thread(d1,
414 MOJO_HANDLE_SIGNAL_READABLE,
415 MOJO_DEADLINE_INDEFINITE,
416 2,
417 &did_wait,
418 &result,
419 &context,
420 &hss);
421 stopwatch.Start();
422 thread.Start();
423 } // Joins the thread.
424 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
425 EXPECT_FALSE(did_wait);
426 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
427 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
428 hss.satisfied_signals);
429 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
430 hss.satisfiable_signals);
431
432 // Consume what we wrote to |d0|.
433 buffer[0] = 0;
434 buffer_size = kBufferSize;
435 EXPECT_EQ(MOJO_RESULT_OK,
436 d1->ReadMessage(UserPointer<void>(buffer),
437 MakeUserPointer(&buffer_size),
438 0,
439 nullptr,
440 MOJO_READ_MESSAGE_FLAG_NONE));
441 EXPECT_EQ(kBufferSize, buffer_size);
442 EXPECT_EQ(123456789, buffer[0]);
443
444 // Wait for readable on |d1| and close |d0| after some time, which should
445 // cancel that wait.
446 {
447 test::WaiterThread thread(d1,
448 MOJO_HANDLE_SIGNAL_READABLE,
449 MOJO_DEADLINE_INDEFINITE,
450 3,
451 &did_wait,
452 &result,
453 &context,
454 &hss);
455 stopwatch.Start();
456 thread.Start();
457 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
458 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
459 } // Joins the thread.
460 elapsed = stopwatch.Elapsed();
461 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
462 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
463 EXPECT_TRUE(did_wait);
464 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
465 EXPECT_EQ(3u, context);
466 EXPECT_EQ(0u, hss.satisfied_signals);
467 EXPECT_EQ(0u, hss.satisfiable_signals);
468
469 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
470 }
471
472 for (unsigned i = 0; i < 2; i++) {
473 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
474 MessagePipeDispatcher::kDefaultCreateOptions));
475 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
476 MessagePipeDispatcher::kDefaultCreateOptions));
477 {
478 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
479 d0->Init(mp, i); // 0, 1.
480 d1->Init(mp, i ^ 1); // 1, 0.
481 }
482
483 // Wait for readable on |d1| and close |d1| after some time, which should
484 // cancel that wait.
485 {
486 test::WaiterThread thread(d1,
487 MOJO_HANDLE_SIGNAL_READABLE,
488 MOJO_DEADLINE_INDEFINITE,
489 4,
490 &did_wait,
491 &result,
492 &context,
493 &hss);
494 stopwatch.Start();
495 thread.Start();
496 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
497 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
498 } // Joins the thread.
499 elapsed = stopwatch.Elapsed();
500 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
501 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
502 EXPECT_TRUE(did_wait);
503 EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
504 EXPECT_EQ(4u, context);
505 EXPECT_EQ(0u, hss.satisfied_signals);
506 EXPECT_EQ(0u, hss.satisfiable_signals);
507
508 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
509 }
510 }
511
512 // Stress test -----------------------------------------------------------------
513
514 const size_t kMaxMessageSize = 2000;
515
516 class WriterThread : public base::SimpleThread {
517 public:
518 // |*messages_written| and |*bytes_written| belong to the thread while it's
519 // alive.
520 WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
521 size_t* messages_written,
522 size_t* bytes_written)
523 : base::SimpleThread("writer_thread"),
524 write_dispatcher_(write_dispatcher),
525 messages_written_(messages_written),
526 bytes_written_(bytes_written) {
527 *messages_written_ = 0;
528 *bytes_written_ = 0;
529 }
530
531 virtual ~WriterThread() { Join(); }
532
533 private:
534 virtual void Run() override {
535 // Make some data to write.
536 unsigned char buffer[kMaxMessageSize];
537 for (size_t i = 0; i < kMaxMessageSize; i++)
538 buffer[i] = static_cast<unsigned char>(i);
539
540 // Number of messages to write.
541 *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
542
543 // Write messages.
544 for (size_t i = 0; i < *messages_written_; i++) {
545 uint32_t bytes_to_write = static_cast<uint32_t>(
546 base::RandInt(1, static_cast<int>(kMaxMessageSize)));
547 EXPECT_EQ(MOJO_RESULT_OK,
548 write_dispatcher_->WriteMessage(UserPointer<const void>(buffer),
549 bytes_to_write,
550 nullptr,
551 MOJO_WRITE_MESSAGE_FLAG_NONE));
552 *bytes_written_ += bytes_to_write;
553 }
554
555 // Write one last "quit" message.
556 EXPECT_EQ(MOJO_RESULT_OK,
557 write_dispatcher_->WriteMessage(UserPointer<const void>("quit"),
558 4,
559 nullptr,
560 MOJO_WRITE_MESSAGE_FLAG_NONE));
561 }
562
563 const scoped_refptr<Dispatcher> write_dispatcher_;
564 size_t* const messages_written_;
565 size_t* const bytes_written_;
566
567 DISALLOW_COPY_AND_ASSIGN(WriterThread);
568 };
569
570 class ReaderThread : public base::SimpleThread {
571 public:
572 // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
573 ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
574 size_t* messages_read,
575 size_t* bytes_read)
576 : base::SimpleThread("reader_thread"),
577 read_dispatcher_(read_dispatcher),
578 messages_read_(messages_read),
579 bytes_read_(bytes_read) {
580 *messages_read_ = 0;
581 *bytes_read_ = 0;
582 }
583
584 virtual ~ReaderThread() { Join(); }
585
586 private:
587 virtual void Run() override {
588 unsigned char buffer[kMaxMessageSize];
589 Waiter w;
590 HandleSignalsState hss;
591 MojoResult result;
592
593 // Read messages.
594 for (;;) {
595 // Wait for it to be readable.
596 w.Init();
597 hss = HandleSignalsState();
598 result =
599 read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss);
600 EXPECT_TRUE(result == MOJO_RESULT_OK ||
601 result == MOJO_RESULT_ALREADY_EXISTS)
602 << "result: " << result;
603 if (result == MOJO_RESULT_OK) {
604 // Actually need to wait.
605 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr));
606 read_dispatcher_->RemoveWaiter(&w, &hss);
607 }
608 // We may not actually be readable, since we're racing with other threads.
609 EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
610
611 // Now, try to do the read.
612 // Clear the buffer so that we can check the result.
613 memset(buffer, 0, sizeof(buffer));
614 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
615 result = read_dispatcher_->ReadMessage(UserPointer<void>(buffer),
616 MakeUserPointer(&buffer_size),
617 0,
618 nullptr,
619 MOJO_READ_MESSAGE_FLAG_NONE);
620 EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT)
621 << "result: " << result;
622 // We're racing with others to read, so maybe we failed.
623 if (result == MOJO_RESULT_SHOULD_WAIT)
624 continue; // In which case, try again.
625 // Check for quit.
626 if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
627 return;
628 EXPECT_GE(buffer_size, 1u);
629 EXPECT_LE(buffer_size, kMaxMessageSize);
630 EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
631
632 (*messages_read_)++;
633 *bytes_read_ += buffer_size;
634 }
635 }
636
637 static bool IsValidMessage(const unsigned char* buffer,
638 uint32_t message_size) {
639 size_t i;
640 for (i = 0; i < message_size; i++) {
641 if (buffer[i] != static_cast<unsigned char>(i))
642 return false;
643 }
644 // Check that the remaining bytes weren't stomped on.
645 for (; i < kMaxMessageSize; i++) {
646 if (buffer[i] != 0)
647 return false;
648 }
649 return true;
650 }
651
652 const scoped_refptr<Dispatcher> read_dispatcher_;
653 size_t* const messages_read_;
654 size_t* const bytes_read_;
655
656 DISALLOW_COPY_AND_ASSIGN(ReaderThread);
657 };
658
659 TEST(MessagePipeDispatcherTest, Stress) {
660 static const size_t kNumWriters = 30;
661 static const size_t kNumReaders = kNumWriters;
662
663 scoped_refptr<MessagePipeDispatcher> d_write(
664 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
665 scoped_refptr<MessagePipeDispatcher> d_read(
666 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
667 {
668 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
669 d_write->Init(mp, 0);
670 d_read->Init(mp, 1);
671 }
672
673 size_t messages_written[kNumWriters];
674 size_t bytes_written[kNumWriters];
675 size_t messages_read[kNumReaders];
676 size_t bytes_read[kNumReaders];
677 {
678 // Make writers.
679 ScopedVector<WriterThread> writers;
680 for (size_t i = 0; i < kNumWriters; i++) {
681 writers.push_back(
682 new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
683 }
684
685 // Make readers.
686 ScopedVector<ReaderThread> readers;
687 for (size_t i = 0; i < kNumReaders; i++) {
688 readers.push_back(
689 new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
690 }
691
692 // Start writers.
693 for (size_t i = 0; i < kNumWriters; i++)
694 writers[i]->Start();
695
696 // Start readers.
697 for (size_t i = 0; i < kNumReaders; i++)
698 readers[i]->Start();
699
700 // TODO(vtl): Maybe I should have an event that triggers all the threads to
701 // start doing stuff for real (so that the first ones created/started aren't
702 // advantaged).
703 } // Joins all the threads.
704
705 size_t total_messages_written = 0;
706 size_t total_bytes_written = 0;
707 for (size_t i = 0; i < kNumWriters; i++) {
708 total_messages_written += messages_written[i];
709 total_bytes_written += bytes_written[i];
710 }
711 size_t total_messages_read = 0;
712 size_t total_bytes_read = 0;
713 for (size_t i = 0; i < kNumReaders; i++) {
714 total_messages_read += messages_read[i];
715 total_bytes_read += bytes_read[i];
716 // We'd have to be really unlucky to have read no messages on a thread.
717 EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
718 EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
719 }
720 EXPECT_EQ(total_messages_written, total_messages_read);
721 EXPECT_EQ(total_bytes_written, total_bytes_read);
722
723 EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
724 EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
725 }
726
727 } // namespace
728 } // namespace system
729 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/system/message_pipe_dispatcher.cc ('k') | mojo/system/message_pipe_endpoint.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698