Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(478)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher_unittest.cc

Issue 814543006: Move //mojo/{public, edk} underneath //third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.cc ('k') | mojo/edk/system/message_pipe_endpoint.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to
7 // increase tolerance and reduce observed flakiness (though doing so reduces the
8 // meaningfulness of the test).
9
10 #include "mojo/edk/system/message_pipe_dispatcher.h"
11
12 #include <string.h>
13
14 #include <limits>
15
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/scoped_vector.h"
18 #include "base/rand_util.h"
19 #include "base/threading/platform_thread.h" // For |Sleep()|.
20 #include "base/threading/simple_thread.h"
21 #include "base/time/time.h"
22 #include "mojo/edk/system/message_pipe.h"
23 #include "mojo/edk/system/test_utils.h"
24 #include "mojo/edk/system/waiter.h"
25 #include "mojo/edk/system/waiter_test_utils.h"
26 #include "testing/gtest/include/gtest/gtest.h"
27
28 namespace mojo {
29 namespace system {
30 namespace {
31
32 const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE |
33 MOJO_HANDLE_SIGNAL_WRITABLE |
34 MOJO_HANDLE_SIGNAL_PEER_CLOSED;
35
36 TEST(MessagePipeDispatcherTest, Basic) {
37 test::Stopwatch stopwatch;
38 int32_t buffer[1];
39 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
40 uint32_t buffer_size;
41
42 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
43 for (unsigned i = 0; i < 2; i++) {
44 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
45 MessagePipeDispatcher::kDefaultCreateOptions));
46 EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType());
47 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
48 MessagePipeDispatcher::kDefaultCreateOptions));
49 {
50 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
51 d0->Init(mp, i); // 0, 1.
52 d1->Init(mp, i ^ 1); // 1, 0.
53 }
54 Waiter w;
55 uint32_t context = 0;
56 HandleSignalsState hss;
57
58 // Try adding a writable waiter when already writable.
59 w.Init();
60 hss = HandleSignalsState();
61 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
62 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
63 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
64 EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
65 // Shouldn't need to remove the waiter (it was not added).
66
67 // Add a readable waiter to |d0|, then make it readable (by writing to
68 // |d1|), then wait.
69 w.Init();
70 ASSERT_EQ(MOJO_RESULT_OK,
71 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr));
72 buffer[0] = 123456789;
73 EXPECT_EQ(MOJO_RESULT_OK,
74 d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize,
75 nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
76 stopwatch.Start();
77 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context));
78 EXPECT_EQ(1u, context);
79 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
80 hss = HandleSignalsState();
81 d0->RemoveAwakable(&w, &hss);
82 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
83 hss.satisfied_signals);
84 EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
85
86 // Try adding a readable waiter when already readable (from above).
87 w.Init();
88 hss = HandleSignalsState();
89 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
90 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
91 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
92 hss.satisfied_signals);
93 EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
94 // Shouldn't need to remove the waiter (it was not added).
95
96 // Make |d0| no longer readable (by reading from it).
97 buffer[0] = 0;
98 buffer_size = kBufferSize;
99 EXPECT_EQ(MOJO_RESULT_OK,
100 d0->ReadMessage(UserPointer<void>(buffer),
101 MakeUserPointer(&buffer_size), 0, nullptr,
102 MOJO_READ_MESSAGE_FLAG_NONE));
103 EXPECT_EQ(kBufferSize, buffer_size);
104 EXPECT_EQ(123456789, buffer[0]);
105
106 // Wait for zero time for readability on |d0| (will time out).
107 w.Init();
108 ASSERT_EQ(MOJO_RESULT_OK,
109 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr));
110 stopwatch.Start();
111 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, nullptr));
112 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
113 hss = HandleSignalsState();
114 d0->RemoveAwakable(&w, &hss);
115 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
116 EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
117
118 // Wait for non-zero, finite time for readability on |d0| (will time out).
119 w.Init();
120 ASSERT_EQ(MOJO_RESULT_OK,
121 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, nullptr));
122 stopwatch.Start();
123 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
124 w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), nullptr));
125 base::TimeDelta elapsed = stopwatch.Elapsed();
126 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
127 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
128 hss = HandleSignalsState();
129 d0->RemoveAwakable(&w, &hss);
130 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
131 EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
132
133 // Check the peer closed signal.
134 w.Init();
135 ASSERT_EQ(MOJO_RESULT_OK,
136 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr));
137
138 // Close the peer.
139 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
140
141 // It should be signaled.
142 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(1000, &context));
143 EXPECT_EQ(12u, context);
144 hss = HandleSignalsState();
145 d0->RemoveAwakable(&w, &hss);
146 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
147 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
148
149 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
150 }
151 }
152
153 TEST(MessagePipeDispatcherTest, InvalidParams) {
154 char buffer[1];
155
156 scoped_refptr<MessagePipeDispatcher> d0(
157 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
158 scoped_refptr<MessagePipeDispatcher> d1(
159 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
160 {
161 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
162 d0->Init(mp, 0);
163 d1->Init(mp, 1);
164 }
165
166 // |WriteMessage|:
167 // Huge buffer size.
168 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
169 d0->WriteMessage(UserPointer<const void>(buffer),
170 std::numeric_limits<uint32_t>::max(), nullptr,
171 MOJO_WRITE_MESSAGE_FLAG_NONE));
172
173 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
174 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
175 }
176
177 // These test invalid arguments that should cause death if we're being paranoid
178 // about checking arguments (which we would want to do if, e.g., we were in a
179 // true "kernel" situation, but we might not want to do otherwise for
180 // performance reasons). Probably blatant errors like passing in null pointers
181 // (for required pointer arguments) will still cause death, but perhaps not
182 // predictably.
183 TEST(MessagePipeDispatcherTest, InvalidParamsDeath) {
184 const char kMemoryCheckFailedRegex[] = "Check failed";
185
186 scoped_refptr<MessagePipeDispatcher> d0(
187 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
188 scoped_refptr<MessagePipeDispatcher> d1(
189 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
190 {
191 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
192 d0->Init(mp, 0);
193 d1->Init(mp, 1);
194 }
195
196 // |WriteMessage|:
197 // Null buffer with nonzero buffer size.
198 EXPECT_DEATH_IF_SUPPORTED(d0->WriteMessage(NullUserPointer(), 1, nullptr,
199 MOJO_WRITE_MESSAGE_FLAG_NONE),
200 kMemoryCheckFailedRegex);
201
202 // |ReadMessage|:
203 // Null buffer with nonzero buffer size.
204 // First write something so that we actually have something to read.
205 EXPECT_EQ(MOJO_RESULT_OK,
206 d1->WriteMessage(UserPointer<const void>("x"), 1, nullptr,
207 MOJO_WRITE_MESSAGE_FLAG_NONE));
208 uint32_t buffer_size = 1;
209 EXPECT_DEATH_IF_SUPPORTED(
210 d0->ReadMessage(NullUserPointer(), MakeUserPointer(&buffer_size), 0,
211 nullptr, MOJO_READ_MESSAGE_FLAG_NONE),
212 kMemoryCheckFailedRegex);
213
214 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
215 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
216 }
217
218 // Test what happens when one end is closed (single-threaded test).
219 TEST(MessagePipeDispatcherTest, BasicClosed) {
220 int32_t buffer[1];
221 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
222 uint32_t buffer_size;
223
224 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
225 for (unsigned i = 0; i < 2; i++) {
226 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
227 MessagePipeDispatcher::kDefaultCreateOptions));
228 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
229 MessagePipeDispatcher::kDefaultCreateOptions));
230 {
231 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
232 d0->Init(mp, i); // 0, 1.
233 d1->Init(mp, i ^ 1); // 1, 0.
234 }
235 Waiter w;
236 HandleSignalsState hss;
237
238 // Write (twice) to |d1|.
239 buffer[0] = 123456789;
240 EXPECT_EQ(MOJO_RESULT_OK,
241 d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize,
242 nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
243 buffer[0] = 234567890;
244 EXPECT_EQ(MOJO_RESULT_OK,
245 d1->WriteMessage(UserPointer<const void>(buffer), kBufferSize,
246 nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
247
248 // Try waiting for readable on |d0|; should fail (already satisfied).
249 w.Init();
250 hss = HandleSignalsState();
251 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
252 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
253 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
254 hss.satisfied_signals);
255 EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
256
257 // Try reading from |d1|; should fail (nothing to read).
258 buffer[0] = 0;
259 buffer_size = kBufferSize;
260 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
261 d1->ReadMessage(UserPointer<void>(buffer),
262 MakeUserPointer(&buffer_size), 0, nullptr,
263 MOJO_READ_MESSAGE_FLAG_NONE));
264
265 // Close |d1|.
266 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
267
268 // Try waiting for readable on |d0|; should fail (already satisfied).
269 w.Init();
270 hss = HandleSignalsState();
271 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
272 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss));
273 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
274 hss.satisfied_signals);
275 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
276 hss.satisfiable_signals);
277
278 // Read from |d0|.
279 buffer[0] = 0;
280 buffer_size = kBufferSize;
281 EXPECT_EQ(MOJO_RESULT_OK,
282 d0->ReadMessage(UserPointer<void>(buffer),
283 MakeUserPointer(&buffer_size), 0, nullptr,
284 MOJO_READ_MESSAGE_FLAG_NONE));
285 EXPECT_EQ(kBufferSize, buffer_size);
286 EXPECT_EQ(123456789, buffer[0]);
287
288 // Try waiting for readable on |d0|; should fail (already satisfied).
289 w.Init();
290 hss = HandleSignalsState();
291 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
292 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
293 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
294 hss.satisfied_signals);
295 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
296 hss.satisfiable_signals);
297
298 // Read again from |d0|.
299 buffer[0] = 0;
300 buffer_size = kBufferSize;
301 EXPECT_EQ(MOJO_RESULT_OK,
302 d0->ReadMessage(UserPointer<void>(buffer),
303 MakeUserPointer(&buffer_size), 0, nullptr,
304 MOJO_READ_MESSAGE_FLAG_NONE));
305 EXPECT_EQ(kBufferSize, buffer_size);
306 EXPECT_EQ(234567890, buffer[0]);
307
308 // Try waiting for readable on |d0|; should fail (unsatisfiable).
309 w.Init();
310 hss = HandleSignalsState();
311 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
312 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss));
313 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
314 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
315
316 // Try waiting for writable on |d0|; should fail (unsatisfiable).
317 w.Init();
318 hss = HandleSignalsState();
319 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
320 d0->AddAwakable(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss));
321 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
322 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
323
324 // Try reading from |d0|; should fail (nothing to read and other end
325 // closed).
326 buffer[0] = 0;
327 buffer_size = kBufferSize;
328 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
329 d0->ReadMessage(UserPointer<void>(buffer),
330 MakeUserPointer(&buffer_size), 0, nullptr,
331 MOJO_READ_MESSAGE_FLAG_NONE));
332
333 // Try writing to |d0|; should fail (other end closed).
334 buffer[0] = 345678901;
335 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
336 d0->WriteMessage(UserPointer<const void>(buffer), kBufferSize,
337 nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
338
339 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
340 }
341 }
342
343 #if defined(OS_WIN)
344 // http://crbug.com/396386
345 #define MAYBE_BasicThreaded DISABLED_BasicThreaded
346 #else
347 #define MAYBE_BasicThreaded BasicThreaded
348 #endif
349 TEST(MessagePipeDispatcherTest, MAYBE_BasicThreaded) {
350 test::Stopwatch stopwatch;
351 int32_t buffer[1];
352 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
353 uint32_t buffer_size;
354 base::TimeDelta elapsed;
355 bool did_wait;
356 MojoResult result;
357 uint32_t context;
358 HandleSignalsState hss;
359
360 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
361 for (unsigned i = 0; i < 2; i++) {
362 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
363 MessagePipeDispatcher::kDefaultCreateOptions));
364 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
365 MessagePipeDispatcher::kDefaultCreateOptions));
366 {
367 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
368 d0->Init(mp, i); // 0, 1.
369 d1->Init(mp, i ^ 1); // 1, 0.
370 }
371
372 // Wait for readable on |d1|, which will become readable after some time.
373 {
374 test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE,
375 MOJO_DEADLINE_INDEFINITE, 1, &did_wait, &result,
376 &context, &hss);
377 stopwatch.Start();
378 thread.Start();
379 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
380 // Wake it up by writing to |d0|.
381 buffer[0] = 123456789;
382 EXPECT_EQ(MOJO_RESULT_OK,
383 d0->WriteMessage(UserPointer<const void>(buffer), kBufferSize,
384 nullptr, MOJO_WRITE_MESSAGE_FLAG_NONE));
385 } // Joins the thread.
386 elapsed = stopwatch.Elapsed();
387 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
388 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
389 EXPECT_TRUE(did_wait);
390 EXPECT_EQ(MOJO_RESULT_OK, result);
391 EXPECT_EQ(1u, context);
392 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
393 hss.satisfied_signals);
394 EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
395
396 // Now |d1| is already readable. Try waiting for it again.
397 {
398 test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE,
399 MOJO_DEADLINE_INDEFINITE, 2, &did_wait, &result,
400 &context, &hss);
401 stopwatch.Start();
402 thread.Start();
403 } // Joins the thread.
404 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
405 EXPECT_FALSE(did_wait);
406 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
407 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
408 hss.satisfied_signals);
409 EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
410
411 // Consume what we wrote to |d0|.
412 buffer[0] = 0;
413 buffer_size = kBufferSize;
414 EXPECT_EQ(MOJO_RESULT_OK,
415 d1->ReadMessage(UserPointer<void>(buffer),
416 MakeUserPointer(&buffer_size), 0, nullptr,
417 MOJO_READ_MESSAGE_FLAG_NONE));
418 EXPECT_EQ(kBufferSize, buffer_size);
419 EXPECT_EQ(123456789, buffer[0]);
420
421 // Wait for readable on |d1| and close |d0| after some time, which should
422 // cancel that wait.
423 {
424 test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE,
425 MOJO_DEADLINE_INDEFINITE, 3, &did_wait, &result,
426 &context, &hss);
427 stopwatch.Start();
428 thread.Start();
429 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
430 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
431 } // Joins the thread.
432 elapsed = stopwatch.Elapsed();
433 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
434 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
435 EXPECT_TRUE(did_wait);
436 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
437 EXPECT_EQ(3u, context);
438 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
439 EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
440
441 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
442 }
443
444 for (unsigned i = 0; i < 2; i++) {
445 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
446 MessagePipeDispatcher::kDefaultCreateOptions));
447 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
448 MessagePipeDispatcher::kDefaultCreateOptions));
449 {
450 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
451 d0->Init(mp, i); // 0, 1.
452 d1->Init(mp, i ^ 1); // 1, 0.
453 }
454
455 // Wait for readable on |d1| and close |d1| after some time, which should
456 // cancel that wait.
457 {
458 test::WaiterThread thread(d1, MOJO_HANDLE_SIGNAL_READABLE,
459 MOJO_DEADLINE_INDEFINITE, 4, &did_wait, &result,
460 &context, &hss);
461 stopwatch.Start();
462 thread.Start();
463 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
464 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
465 } // Joins the thread.
466 elapsed = stopwatch.Elapsed();
467 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
468 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
469 EXPECT_TRUE(did_wait);
470 EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
471 EXPECT_EQ(4u, context);
472 EXPECT_EQ(0u, hss.satisfied_signals);
473 EXPECT_EQ(0u, hss.satisfiable_signals);
474
475 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
476 }
477 }
478
479 // Stress test -----------------------------------------------------------------
480
481 const size_t kMaxMessageSize = 2000;
482
483 class WriterThread : public base::SimpleThread {
484 public:
485 // |*messages_written| and |*bytes_written| belong to the thread while it's
486 // alive.
487 WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
488 size_t* messages_written,
489 size_t* bytes_written)
490 : base::SimpleThread("writer_thread"),
491 write_dispatcher_(write_dispatcher),
492 messages_written_(messages_written),
493 bytes_written_(bytes_written) {
494 *messages_written_ = 0;
495 *bytes_written_ = 0;
496 }
497
498 ~WriterThread() override { Join(); }
499
500 private:
501 void Run() override {
502 // Make some data to write.
503 unsigned char buffer[kMaxMessageSize];
504 for (size_t i = 0; i < kMaxMessageSize; i++)
505 buffer[i] = static_cast<unsigned char>(i);
506
507 // Number of messages to write.
508 *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
509
510 // Write messages.
511 for (size_t i = 0; i < *messages_written_; i++) {
512 uint32_t bytes_to_write = static_cast<uint32_t>(
513 base::RandInt(1, static_cast<int>(kMaxMessageSize)));
514 EXPECT_EQ(MOJO_RESULT_OK,
515 write_dispatcher_->WriteMessage(UserPointer<const void>(buffer),
516 bytes_to_write, nullptr,
517 MOJO_WRITE_MESSAGE_FLAG_NONE));
518 *bytes_written_ += bytes_to_write;
519 }
520
521 // Write one last "quit" message.
522 EXPECT_EQ(MOJO_RESULT_OK, write_dispatcher_->WriteMessage(
523 UserPointer<const void>("quit"), 4, nullptr,
524 MOJO_WRITE_MESSAGE_FLAG_NONE));
525 }
526
527 const scoped_refptr<Dispatcher> write_dispatcher_;
528 size_t* const messages_written_;
529 size_t* const bytes_written_;
530
531 DISALLOW_COPY_AND_ASSIGN(WriterThread);
532 };
533
534 class ReaderThread : public base::SimpleThread {
535 public:
536 // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
537 ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
538 size_t* messages_read,
539 size_t* bytes_read)
540 : base::SimpleThread("reader_thread"),
541 read_dispatcher_(read_dispatcher),
542 messages_read_(messages_read),
543 bytes_read_(bytes_read) {
544 *messages_read_ = 0;
545 *bytes_read_ = 0;
546 }
547
548 ~ReaderThread() override { Join(); }
549
550 private:
551 void Run() override {
552 unsigned char buffer[kMaxMessageSize];
553 Waiter w;
554 HandleSignalsState hss;
555 MojoResult result;
556
557 // Read messages.
558 for (;;) {
559 // Wait for it to be readable.
560 w.Init();
561 hss = HandleSignalsState();
562 result = read_dispatcher_->AddAwakable(&w, MOJO_HANDLE_SIGNAL_READABLE, 0,
563 &hss);
564 EXPECT_TRUE(result == MOJO_RESULT_OK ||
565 result == MOJO_RESULT_ALREADY_EXISTS)
566 << "result: " << result;
567 if (result == MOJO_RESULT_OK) {
568 // Actually need to wait.
569 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, nullptr));
570 read_dispatcher_->RemoveAwakable(&w, &hss);
571 }
572 // We may not actually be readable, since we're racing with other threads.
573 EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
574
575 // Now, try to do the read.
576 // Clear the buffer so that we can check the result.
577 memset(buffer, 0, sizeof(buffer));
578 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
579 result = read_dispatcher_->ReadMessage(
580 UserPointer<void>(buffer), MakeUserPointer(&buffer_size), 0, nullptr,
581 MOJO_READ_MESSAGE_FLAG_NONE);
582 EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT)
583 << "result: " << result;
584 // We're racing with others to read, so maybe we failed.
585 if (result == MOJO_RESULT_SHOULD_WAIT)
586 continue; // In which case, try again.
587 // Check for quit.
588 if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
589 return;
590 EXPECT_GE(buffer_size, 1u);
591 EXPECT_LE(buffer_size, kMaxMessageSize);
592 EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
593
594 (*messages_read_)++;
595 *bytes_read_ += buffer_size;
596 }
597 }
598
599 static bool IsValidMessage(const unsigned char* buffer,
600 uint32_t message_size) {
601 size_t i;
602 for (i = 0; i < message_size; i++) {
603 if (buffer[i] != static_cast<unsigned char>(i))
604 return false;
605 }
606 // Check that the remaining bytes weren't stomped on.
607 for (; i < kMaxMessageSize; i++) {
608 if (buffer[i] != 0)
609 return false;
610 }
611 return true;
612 }
613
614 const scoped_refptr<Dispatcher> read_dispatcher_;
615 size_t* const messages_read_;
616 size_t* const bytes_read_;
617
618 DISALLOW_COPY_AND_ASSIGN(ReaderThread);
619 };
620
621 TEST(MessagePipeDispatcherTest, Stress) {
622 static const size_t kNumWriters = 30;
623 static const size_t kNumReaders = kNumWriters;
624
625 scoped_refptr<MessagePipeDispatcher> d_write(
626 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
627 scoped_refptr<MessagePipeDispatcher> d_read(
628 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
629 {
630 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
631 d_write->Init(mp, 0);
632 d_read->Init(mp, 1);
633 }
634
635 size_t messages_written[kNumWriters];
636 size_t bytes_written[kNumWriters];
637 size_t messages_read[kNumReaders];
638 size_t bytes_read[kNumReaders];
639 {
640 // Make writers.
641 ScopedVector<WriterThread> writers;
642 for (size_t i = 0; i < kNumWriters; i++) {
643 writers.push_back(
644 new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
645 }
646
647 // Make readers.
648 ScopedVector<ReaderThread> readers;
649 for (size_t i = 0; i < kNumReaders; i++) {
650 readers.push_back(
651 new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
652 }
653
654 // Start writers.
655 for (size_t i = 0; i < kNumWriters; i++)
656 writers[i]->Start();
657
658 // Start readers.
659 for (size_t i = 0; i < kNumReaders; i++)
660 readers[i]->Start();
661
662 // TODO(vtl): Maybe I should have an event that triggers all the threads to
663 // start doing stuff for real (so that the first ones created/started aren't
664 // advantaged).
665 } // Joins all the threads.
666
667 size_t total_messages_written = 0;
668 size_t total_bytes_written = 0;
669 for (size_t i = 0; i < kNumWriters; i++) {
670 total_messages_written += messages_written[i];
671 total_bytes_written += bytes_written[i];
672 }
673 size_t total_messages_read = 0;
674 size_t total_bytes_read = 0;
675 for (size_t i = 0; i < kNumReaders; i++) {
676 total_messages_read += messages_read[i];
677 total_bytes_read += bytes_read[i];
678 // We'd have to be really unlucky to have read no messages on a thread.
679 EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
680 EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
681 }
682 EXPECT_EQ(total_messages_written, total_messages_read);
683 EXPECT_EQ(total_bytes_written, total_bytes_read);
684
685 EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
686 EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
687 }
688
689 } // namespace
690 } // namespace system
691 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/message_pipe_dispatcher.cc ('k') | mojo/edk/system/message_pipe_endpoint.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698