Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(277)

Side by Side Diff: mojo/system/message_pipe_dispatcher_unittest.cc

Issue 23621056: Initial in-process implementation of some Mojo primitives. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: build fix Created 7 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6 // heavily-loaded system). Sorry. |kEpsilonMicros| may be increased to increase
7 // tolerance and reduce observed flakiness.
8
9 #include "mojo/system/message_pipe_dispatcher.h"
10
11 #include <string.h>
12
13 #include "base/memory/ref_counted.h"
14 #include "base/memory/scoped_vector.h"
15 #include "base/rand_util.h"
16 #include "base/threading/platform_thread.h" // For |Sleep()|.
17 #include "base/threading/simple_thread.h"
18 #include "base/time/time.h"
19 #include "mojo/system/message_pipe.h"
20 #include "mojo/system/test_utils.h"
21 #include "mojo/system/waiter.h"
22 #include "mojo/system/waiter_test_utils.h"
23 #include "testing/gtest/include/gtest/gtest.h"
24
25 namespace mojo {
26 namespace system {
27 namespace {
28
29 const int64_t kMicrosPerMs = 1000;
30 const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms.
31
32 TEST(MessagePipeDispatcherTest, Basic) {
33 test::Stopwatch stopwatch;
34 int32_t buffer[1];
35 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
36 uint32_t buffer_size;
37 int64_t elapsed_micros;
38
39 // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
40 for (unsigned i = 0; i < 2; i++) {
41 scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
42 scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
43 {
44 scoped_refptr<MessagePipe> mp(new MessagePipe());
45 d_0->Init(mp, i); // 0, 1.
46 d_1->Init(mp, i ^ 1); // 1, 0.
47 }
48 Waiter w;
49
50 // Try adding a writable waiter when already writable.
51 w.Init();
52 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
53 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 0));
54 // Shouldn't need to remove the waiter (it was not added).
55
56 // Add a readable waiter to |d_0|, then make it readable (by writing to
57 // |d_1|), then wait.
58 w.Init();
59 EXPECT_EQ(MOJO_RESULT_OK,
60 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1));
61 buffer[0] = 123456789;
62 EXPECT_EQ(MOJO_RESULT_OK,
63 d_1->WriteMessage(buffer, kBufferSize,
64 NULL, 0,
65 MOJO_WRITE_MESSAGE_FLAG_NONE));
66 stopwatch.Start();
67 EXPECT_EQ(1, w.Wait(MOJO_DEADLINE_INDEFINITE));
68 elapsed_micros = stopwatch.Elapsed();
69 EXPECT_LT(elapsed_micros, kEpsilonMicros);
70 d_0->RemoveWaiter(&w);
71
72 // Try adding a readable waiter when already readable (from above).
73 w.Init();
74 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
75 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2));
76 // Shouldn't need to remove the waiter (it was not added).
77
78 // Make |d_0| no longer readable (by reading from it).
79 buffer[0] = 0;
80 buffer_size = kBufferSize;
81 EXPECT_EQ(MOJO_RESULT_OK,
82 d_0->ReadMessage(buffer, &buffer_size,
83 NULL, NULL,
84 MOJO_READ_MESSAGE_FLAG_NONE));
85 EXPECT_EQ(kBufferSize, buffer_size);
86 EXPECT_EQ(123456789, buffer[0]);
87
88 // Wait for zero time for readability on |d_0| (will time out).
89 w.Init();
90 EXPECT_EQ(MOJO_RESULT_OK,
91 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
92 stopwatch.Start();
93 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0));
94 elapsed_micros = stopwatch.Elapsed();
95 EXPECT_LT(elapsed_micros, kEpsilonMicros);
96 d_0->RemoveWaiter(&w);
97
98 // Wait for non-zero, finite time for readability on |d_0| (will time out).
99 w.Init();
100 EXPECT_EQ(MOJO_RESULT_OK,
101 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
102 stopwatch.Start();
103 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(2 * kEpsilonMicros));
104 elapsed_micros = stopwatch.Elapsed();
105 EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
106 EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
107 d_0->RemoveWaiter(&w);
108
109 EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
110 EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
111 }
112 }
113
114 // Test what happens when one end is closed (single-threaded test).
115 TEST(MessagePipeDispatcherTest, BasicClosed) {
116 int32_t buffer[1];
117 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
118 uint32_t buffer_size;
119
120 // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
121 for (unsigned i = 0; i < 2; i++) {
122 scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
123 scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
124 {
125 scoped_refptr<MessagePipe> mp(new MessagePipe());
126 d_0->Init(mp, i); // 0, 1.
127 d_1->Init(mp, i ^ 1); // 1, 0.
128 }
129 Waiter w;
130
131 // Write (twice) to |d_1|.
132 buffer[0] = 123456789;
133 EXPECT_EQ(MOJO_RESULT_OK,
134 d_1->WriteMessage(buffer, kBufferSize,
135 NULL, 0,
136 MOJO_WRITE_MESSAGE_FLAG_NONE));
137 buffer[0] = 234567890;
138 EXPECT_EQ(MOJO_RESULT_OK,
139 d_1->WriteMessage(buffer, kBufferSize,
140 NULL, 0,
141 MOJO_WRITE_MESSAGE_FLAG_NONE));
142
143 // Try waiting for readable on |d_0|; should fail (already satisfied).
144 w.Init();
145 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
146 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0));
147
148 // Close |d_1|.
149 EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
150
151 // Try waiting for readable on |d_0|; should fail (already satisfied).
152 w.Init();
153 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
154 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1));
155
156 // Read from |d_0|.
157 buffer[0] = 0;
158 buffer_size = kBufferSize;
159 EXPECT_EQ(MOJO_RESULT_OK,
160 d_0->ReadMessage(buffer, &buffer_size,
161 NULL, NULL,
162 MOJO_READ_MESSAGE_FLAG_NONE));
163 EXPECT_EQ(kBufferSize, buffer_size);
164 EXPECT_EQ(123456789, buffer[0]);
165
166 // Try waiting for readable on |d_0|; should fail (already satisfied).
167 w.Init();
168 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
169 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2));
170
171 // Read again from |d_0|.
172 buffer[0] = 0;
173 buffer_size = kBufferSize;
174 EXPECT_EQ(MOJO_RESULT_OK,
175 d_0->ReadMessage(buffer, &buffer_size,
176 NULL, NULL,
177 MOJO_READ_MESSAGE_FLAG_NONE));
178 EXPECT_EQ(kBufferSize, buffer_size);
179 EXPECT_EQ(234567890, buffer[0]);
180
181 // Try waiting for readable on |d_0|; should fail (unsatisfiable).
182 w.Init();
183 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
184 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
185
186 // Try waiting for writable on |d_0|; should fail (unsatisfiable).
187 w.Init();
188 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
189 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4));
190
191 // Try reading from |d_0|; should fail (nothing to read).
192 buffer[0] = 0;
193 buffer_size = kBufferSize;
194 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
195 d_0->ReadMessage(buffer, &buffer_size,
196 NULL, NULL,
197 MOJO_READ_MESSAGE_FLAG_NONE));
198
199 // Try writing to |d_0|; should fail (other end closed).
200 buffer[0] = 345678901;
201 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
202 d_0->WriteMessage(buffer, kBufferSize,
203 NULL, 0,
204 MOJO_WRITE_MESSAGE_FLAG_NONE));
205
206 EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
207 }
208 }
209
210 TEST(MessagePipeDispatcherTest, BasicThreaded) {
211 test::Stopwatch stopwatch;
212 int32_t buffer[1];
213 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
214 uint32_t buffer_size;
215 bool did_wait;
216 MojoResult result;
217 int64_t elapsed_micros;
218
219 // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
220 for (unsigned i = 0; i < 2; i++) {
221 scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
222 scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
223 {
224 scoped_refptr<MessagePipe> mp(new MessagePipe());
225 d_0->Init(mp, i); // 0, 1.
226 d_1->Init(mp, i ^ 1); // 1, 0.
227 }
228
229 // Wait for readable on |d_1|, which will become readable after some time.
230 {
231 test::WaiterThread thread(d_1,
232 MOJO_WAIT_FLAG_READABLE,
233 MOJO_DEADLINE_INDEFINITE,
234 0,
235 &did_wait, &result);
236 stopwatch.Start();
237 thread.Start();
238 base::PlatformThread::Sleep(
239 base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
240 // Wake it up by writing to |d_0|.
241 buffer[0] = 123456789;
242 EXPECT_EQ(MOJO_RESULT_OK,
243 d_0->WriteMessage(buffer, kBufferSize,
244 NULL, 0,
245 MOJO_WRITE_MESSAGE_FLAG_NONE));
246 } // Joins the thread.
247 elapsed_micros = stopwatch.Elapsed();
248 EXPECT_TRUE(did_wait);
249 EXPECT_EQ(0, result);
250 EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
251 EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
252
253 // Now |d_1| is already readable. Try waiting for it again.
254 {
255 test::WaiterThread thread(d_1,
256 MOJO_WAIT_FLAG_READABLE,
257 MOJO_DEADLINE_INDEFINITE,
258 1,
259 &did_wait, &result);
260 stopwatch.Start();
261 thread.Start();
262 } // Joins the thread.
263 elapsed_micros = stopwatch.Elapsed();
264 EXPECT_FALSE(did_wait);
265 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
266 EXPECT_LT(elapsed_micros, kEpsilonMicros);
267
268 // Consume what we wrote to |d_0|.
269 buffer[0] = 0;
270 buffer_size = kBufferSize;
271 EXPECT_EQ(MOJO_RESULT_OK,
272 d_1->ReadMessage(buffer, &buffer_size,
273 NULL, NULL,
274 MOJO_READ_MESSAGE_FLAG_NONE));
275 EXPECT_EQ(kBufferSize, buffer_size);
276 EXPECT_EQ(123456789, buffer[0]);
277
278 // Wait for readable on |d_1| and close |d_0| after some time, which should
279 // cancel that wait.
280 {
281 test::WaiterThread thread(d_1,
282 MOJO_WAIT_FLAG_READABLE,
283 MOJO_DEADLINE_INDEFINITE,
284 0,
285 &did_wait, &result);
286 stopwatch.Start();
287 thread.Start();
288 base::PlatformThread::Sleep(
289 base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
290 EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
291 } // Joins the thread.
292 elapsed_micros = stopwatch.Elapsed();
293 EXPECT_TRUE(did_wait);
294 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
295 EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
296 EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
297
298 EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
299 }
300
301 for (unsigned i = 0; i < 2; i++) {
302 scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
303 scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
304 {
305 scoped_refptr<MessagePipe> mp(new MessagePipe());
306 d_0->Init(mp, i); // 0, 1.
307 d_1->Init(mp, i ^ 1); // 1, 0.
308 }
309
310 // Wait for readable on |d_1| and close |d_1| after some time, which should
311 // cancel that wait.
312 {
313 test::WaiterThread thread(d_1,
314 MOJO_WAIT_FLAG_READABLE,
315 MOJO_DEADLINE_INDEFINITE,
316 0,
317 &did_wait, &result);
318 stopwatch.Start();
319 thread.Start();
320 base::PlatformThread::Sleep(
321 base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
322 EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
323 } // Joins the thread.
324 elapsed_micros = stopwatch.Elapsed();
325 EXPECT_TRUE(did_wait);
326 EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
327 EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
328 EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
329
330 EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
331 }
332 }
333
334 // Stress test -----------------------------------------------------------------
335
336 const size_t kMaxMessageSize = 2000;
337
338 class WriterThread : public base::SimpleThread {
339 public:
340 // |*messages_written| and |*bytes_written| belong to the thread while it's
341 // alive.
342 WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
343 size_t* messages_written, size_t* bytes_written)
344 : base::SimpleThread("writer_thread"),
345 write_dispatcher_(write_dispatcher),
346 messages_written_(messages_written),
347 bytes_written_(bytes_written) {
348 *messages_written_ = 0;
349 *bytes_written_ = 0;
350 }
351
352 virtual ~WriterThread() {
353 Join();
354 }
355
356 private:
357 virtual void Run() OVERRIDE {
358 // Make some data to write.
359 unsigned char buffer[kMaxMessageSize];
360 for (size_t i = 0; i < kMaxMessageSize; i++)
361 buffer[i] = static_cast<unsigned char>(i);
362
363 // Number of messages to write.
364 *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
365
366 // Write messages.
367 for (size_t i = 0; i < *messages_written_; i++) {
368 uint32_t bytes_to_write = static_cast<uint32_t>(
369 base::RandInt(1, static_cast<int>(kMaxMessageSize)));
370 EXPECT_EQ(MOJO_RESULT_OK,
371 write_dispatcher_->WriteMessage(buffer, bytes_to_write,
372 NULL, 0,
373 MOJO_WRITE_MESSAGE_FLAG_NONE));
374 *bytes_written_ += bytes_to_write;
375 }
376
377 // Write one last "quit" message.
378 EXPECT_EQ(MOJO_RESULT_OK,
379 write_dispatcher_->WriteMessage("quit", 4, NULL, 0,
380 MOJO_WRITE_MESSAGE_FLAG_NONE));
381 }
382
383 const scoped_refptr<Dispatcher> write_dispatcher_;
384 size_t* const messages_written_;
385 size_t* const bytes_written_;
386
387 DISALLOW_COPY_AND_ASSIGN(WriterThread);
388 };
389
390 class ReaderThread : public base::SimpleThread {
391 public:
392 // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
393 ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
394 size_t* messages_read, size_t* bytes_read)
395 : base::SimpleThread("reader_thread"),
396 read_dispatcher_(read_dispatcher),
397 messages_read_(messages_read),
398 bytes_read_(bytes_read) {
399 *messages_read_ = 0;
400 *bytes_read_ = 0;
401 }
402
403 virtual ~ReaderThread() {
404 Join();
405 }
406
407 private:
408 virtual void Run() OVERRIDE {
409 unsigned char buffer[kMaxMessageSize];
410 MojoResult result;
411 Waiter w;
412
413 // Read messages.
414 for (;;) {
415 // Wait for it to be readable.
416 w.Init();
417 result = read_dispatcher_->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0);
418 EXPECT_TRUE(result == MOJO_RESULT_OK ||
419 result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result;
420 if (result == MOJO_RESULT_OK) {
421 // Actually need to wait.
422 EXPECT_EQ(0, w.Wait(MOJO_DEADLINE_INDEFINITE));
423 read_dispatcher_->RemoveWaiter(&w);
424 }
425
426 // Now, try to do the read.
427 // Clear the buffer so that we can check the result.
428 memset(buffer, 0, sizeof(buffer));
429 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
430 result = read_dispatcher_->ReadMessage(buffer, &buffer_size, NULL, NULL,
431 MOJO_READ_MESSAGE_FLAG_NONE);
432 EXPECT_TRUE(result == MOJO_RESULT_OK ||
433 result == MOJO_RESULT_NOT_FOUND) << "result: " << result;
434 // We're racing with others to read, so maybe we failed.
435 if (result == MOJO_RESULT_NOT_FOUND)
436 continue; // In which case, try again.
437 // Check for quit.
438 if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
439 return;
440 EXPECT_GE(buffer_size, 1u);
441 EXPECT_LE(buffer_size, kMaxMessageSize);
442 EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
443
444 (*messages_read_)++;
445 *bytes_read_ += buffer_size;
446 }
447 }
448
449 static bool IsValidMessage(const unsigned char* buffer,
450 uint32_t message_size) {
451 size_t i;
452 for (i = 0; i < message_size; i++) {
453 if (buffer[i] != static_cast<unsigned char>(i))
454 return false;
455 }
456 // Check that the remaining bytes weren't stomped on.
457 for (; i < kMaxMessageSize; i++) {
458 if (buffer[i] != 0)
459 return false;
460 }
461 return true;
462 }
463
464 const scoped_refptr<Dispatcher> read_dispatcher_;
465 size_t* const messages_read_;
466 size_t* const bytes_read_;
467
468 DISALLOW_COPY_AND_ASSIGN(ReaderThread);
469 };
470
471 TEST(MessagePipeDispatcherTest, Stress) {
472 static const size_t kNumWriters = 30;
473 static const size_t kNumReaders = kNumWriters;
474
475 scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher());
476 scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher());
477 {
478 scoped_refptr<MessagePipe> mp(new MessagePipe());
479 d_write->Init(mp, 0);
480 d_read->Init(mp, 1);
481 }
482
483 size_t messages_written[kNumWriters];
484 size_t bytes_written[kNumWriters];
485 size_t messages_read[kNumReaders];
486 size_t bytes_read[kNumReaders];
487 {
488 // Make writers.
489 ScopedVector<WriterThread> writers;
490 for (size_t i = 0; i < kNumWriters; i++) {
491 writers.push_back(
492 new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
493 }
494
495 // Make readers.
496 ScopedVector<ReaderThread> readers;
497 for (size_t i = 0; i < kNumReaders; i++) {
498 readers.push_back(
499 new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
500 }
501
502 // Start writers.
503 for (size_t i = 0; i < kNumWriters; i++)
504 writers[i]->Start();
505
506 // Start readers.
507 for (size_t i = 0; i < kNumReaders; i++)
508 readers[i]->Start();
509
510 // TODO(vtl): Maybe I should have an event that triggers all the threads to
511 // start doing stuff for real (so that the first ones created/started aren't
512 // advantaged).
513 } // Joins all the threads.
514
515 size_t total_messages_written = 0;
516 size_t total_bytes_written = 0;
517 for (size_t i = 0; i < kNumWriters; i++) {
518 total_messages_written += messages_written[i];
519 total_bytes_written += bytes_written[i];
520 }
521 size_t total_messages_read = 0;
522 size_t total_bytes_read = 0;
523 for (size_t i = 0; i < kNumReaders; i++) {
524 total_messages_read += messages_read[i];
525 total_bytes_read += bytes_read[i];
526 // We'd have to be really unlucky to have read no messages on a thread.
527 EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
528 EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
529 }
530 EXPECT_EQ(total_messages_written, total_messages_read);
531 EXPECT_EQ(total_bytes_written, total_bytes_read);
532
533 EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
534 EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
535 }
536
537 } // namespace
538 } // namespace system
539 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698