| Index: mojo/system/message_pipe_dispatcher_unittest.cc
|
| diff --git a/mojo/system/message_pipe_dispatcher_unittest.cc b/mojo/system/message_pipe_dispatcher_unittest.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..acafcdaca2d9071267e8ed11fa12a6d7d1ba881e
|
| --- /dev/null
|
| +++ b/mojo/system/message_pipe_dispatcher_unittest.cc
|
| @@ -0,0 +1,539 @@
|
| +// Copyright 2013 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +// NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
|
| +// heavily-loaded system). Sorry. |kEpsilonMicros| may be increased to increase
|
| +// tolerance and reduce observed flakiness.
|
| +
|
| +#include "mojo/system/message_pipe_dispatcher.h"
|
| +
|
| +#include <string.h>
|
| +
|
| +#include "base/memory/ref_counted.h"
|
| +#include "base/memory/scoped_vector.h"
|
| +#include "base/rand_util.h"
|
| +#include "base/threading/platform_thread.h" // For |Sleep()|.
|
| +#include "base/threading/simple_thread.h"
|
| +#include "base/time/time.h"
|
| +#include "mojo/system/message_pipe.h"
|
| +#include "mojo/system/test_utils.h"
|
| +#include "mojo/system/waiter.h"
|
| +#include "mojo/system/waiter_test_utils.h"
|
| +#include "testing/gtest/include/gtest/gtest.h"
|
| +
|
| +namespace mojo {
|
| +namespace system {
|
| +namespace {
|
| +
|
| +const int64_t kMicrosPerMs = 1000;
|
| +const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms.
|
| +
|
| +TEST(MessagePipeDispatcherTest, Basic) {
|
| + test::Stopwatch stopwatch;
|
| + int32_t buffer[1];
|
| + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
|
| + uint32_t buffer_size;
|
| + int64_t elapsed_micros;
|
| +
|
| + // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
|
| + for (unsigned i = 0; i < 2; i++) {
|
| + scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
|
| + scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
|
| + {
|
| + scoped_refptr<MessagePipe> mp(new MessagePipe());
|
| + d_0->Init(mp, i); // 0, 1.
|
| + d_1->Init(mp, i ^ 1); // 1, 0.
|
| + }
|
| + Waiter w;
|
| +
|
| + // Try adding a writable waiter when already writable.
|
| + w.Init();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 0));
|
| + // Shouldn't need to remove the waiter (it was not added).
|
| +
|
| + // Add a readable waiter to |d_0|, then make it readable (by writing to
|
| + // |d_1|), then wait.
|
| + w.Init();
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1));
|
| + buffer[0] = 123456789;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_1->WriteMessage(buffer, kBufferSize,
|
| + NULL, 0,
|
| + MOJO_WRITE_MESSAGE_FLAG_NONE));
|
| + stopwatch.Start();
|
| + EXPECT_EQ(1, w.Wait(MOJO_DEADLINE_INDEFINITE));
|
| + elapsed_micros = stopwatch.Elapsed();
|
| + EXPECT_LT(elapsed_micros, kEpsilonMicros);
|
| + d_0->RemoveWaiter(&w);
|
| +
|
| + // Try adding a readable waiter when already readable (from above).
|
| + w.Init();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2));
|
| + // Shouldn't need to remove the waiter (it was not added).
|
| +
|
| + // Make |d_0| no longer readable (by reading from it).
|
| + buffer[0] = 0;
|
| + buffer_size = kBufferSize;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_0->ReadMessage(buffer, &buffer_size,
|
| + NULL, NULL,
|
| + MOJO_READ_MESSAGE_FLAG_NONE));
|
| + EXPECT_EQ(kBufferSize, buffer_size);
|
| + EXPECT_EQ(123456789, buffer[0]);
|
| +
|
| + // Wait for zero time for readability on |d_0| (will time out).
|
| + w.Init();
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
|
| + stopwatch.Start();
|
| + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0));
|
| + elapsed_micros = stopwatch.Elapsed();
|
| + EXPECT_LT(elapsed_micros, kEpsilonMicros);
|
| + d_0->RemoveWaiter(&w);
|
| +
|
| + // Wait for non-zero, finite time for readability on |d_0| (will time out).
|
| + w.Init();
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
|
| + stopwatch.Start();
|
| + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(2 * kEpsilonMicros));
|
| + elapsed_micros = stopwatch.Elapsed();
|
| + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
|
| + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
|
| + d_0->RemoveWaiter(&w);
|
| +
|
| + EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
|
| + EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
|
| + }
|
| +}
|
| +
|
| +// Test what happens when one end is closed (single-threaded test).
|
| +TEST(MessagePipeDispatcherTest, BasicClosed) {
|
| + int32_t buffer[1];
|
| + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
|
| + uint32_t buffer_size;
|
| +
|
| + // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
|
| + for (unsigned i = 0; i < 2; i++) {
|
| + scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
|
| + scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
|
| + {
|
| + scoped_refptr<MessagePipe> mp(new MessagePipe());
|
| + d_0->Init(mp, i); // 0, 1.
|
| + d_1->Init(mp, i ^ 1); // 1, 0.
|
| + }
|
| + Waiter w;
|
| +
|
| + // Write (twice) to |d_1|.
|
| + buffer[0] = 123456789;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_1->WriteMessage(buffer, kBufferSize,
|
| + NULL, 0,
|
| + MOJO_WRITE_MESSAGE_FLAG_NONE));
|
| + buffer[0] = 234567890;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_1->WriteMessage(buffer, kBufferSize,
|
| + NULL, 0,
|
| + MOJO_WRITE_MESSAGE_FLAG_NONE));
|
| +
|
| + // Try waiting for readable on |d_0|; should fail (already satisfied).
|
| + w.Init();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0));
|
| +
|
| + // Close |d_1|.
|
| + EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
|
| +
|
| + // Try waiting for readable on |d_0|; should fail (already satisfied).
|
| + w.Init();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1));
|
| +
|
| + // Read from |d_0|.
|
| + buffer[0] = 0;
|
| + buffer_size = kBufferSize;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_0->ReadMessage(buffer, &buffer_size,
|
| + NULL, NULL,
|
| + MOJO_READ_MESSAGE_FLAG_NONE));
|
| + EXPECT_EQ(kBufferSize, buffer_size);
|
| + EXPECT_EQ(123456789, buffer[0]);
|
| +
|
| + // Try waiting for readable on |d_0|; should fail (already satisfied).
|
| + w.Init();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2));
|
| +
|
| + // Read again from |d_0|.
|
| + buffer[0] = 0;
|
| + buffer_size = kBufferSize;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_0->ReadMessage(buffer, &buffer_size,
|
| + NULL, NULL,
|
| + MOJO_READ_MESSAGE_FLAG_NONE));
|
| + EXPECT_EQ(kBufferSize, buffer_size);
|
| + EXPECT_EQ(234567890, buffer[0]);
|
| +
|
| + // Try waiting for readable on |d_0|; should fail (unsatisfiable).
|
| + w.Init();
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
|
| +
|
| + // Try waiting for writable on |d_0|; should fail (unsatisfiable).
|
| + w.Init();
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4));
|
| +
|
| + // Try reading from |d_0|; should fail (nothing to read).
|
| + buffer[0] = 0;
|
| + buffer_size = kBufferSize;
|
| + EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
|
| + d_0->ReadMessage(buffer, &buffer_size,
|
| + NULL, NULL,
|
| + MOJO_READ_MESSAGE_FLAG_NONE));
|
| +
|
| + // Try writing to |d_0|; should fail (other end closed).
|
| + buffer[0] = 345678901;
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + d_0->WriteMessage(buffer, kBufferSize,
|
| + NULL, 0,
|
| + MOJO_WRITE_MESSAGE_FLAG_NONE));
|
| +
|
| + EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
|
| + }
|
| +}
|
| +
|
| +TEST(MessagePipeDispatcherTest, BasicThreaded) {
|
| + test::Stopwatch stopwatch;
|
| + int32_t buffer[1];
|
| + const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
|
| + uint32_t buffer_size;
|
| + bool did_wait;
|
| + MojoResult result;
|
| + int64_t elapsed_micros;
|
| +
|
| + // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
|
| + for (unsigned i = 0; i < 2; i++) {
|
| + scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
|
| + scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
|
| + {
|
| + scoped_refptr<MessagePipe> mp(new MessagePipe());
|
| + d_0->Init(mp, i); // 0, 1.
|
| + d_1->Init(mp, i ^ 1); // 1, 0.
|
| + }
|
| +
|
| + // Wait for readable on |d_1|, which will become readable after some time.
|
| + {
|
| + test::WaiterThread thread(d_1,
|
| + MOJO_WAIT_FLAG_READABLE,
|
| + MOJO_DEADLINE_INDEFINITE,
|
| + 0,
|
| + &did_wait, &result);
|
| + stopwatch.Start();
|
| + thread.Start();
|
| + base::PlatformThread::Sleep(
|
| + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
|
| + // Wake it up by writing to |d_0|.
|
| + buffer[0] = 123456789;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_0->WriteMessage(buffer, kBufferSize,
|
| + NULL, 0,
|
| + MOJO_WRITE_MESSAGE_FLAG_NONE));
|
| + } // Joins the thread.
|
| + elapsed_micros = stopwatch.Elapsed();
|
| + EXPECT_TRUE(did_wait);
|
| + EXPECT_EQ(0, result);
|
| + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
|
| + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
|
| +
|
| + // Now |d_1| is already readable. Try waiting for it again.
|
| + {
|
| + test::WaiterThread thread(d_1,
|
| + MOJO_WAIT_FLAG_READABLE,
|
| + MOJO_DEADLINE_INDEFINITE,
|
| + 1,
|
| + &did_wait, &result);
|
| + stopwatch.Start();
|
| + thread.Start();
|
| + } // Joins the thread.
|
| + elapsed_micros = stopwatch.Elapsed();
|
| + EXPECT_FALSE(did_wait);
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
|
| + EXPECT_LT(elapsed_micros, kEpsilonMicros);
|
| +
|
| + // Consume what we wrote to |d_0|.
|
| + buffer[0] = 0;
|
| + buffer_size = kBufferSize;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + d_1->ReadMessage(buffer, &buffer_size,
|
| + NULL, NULL,
|
| + MOJO_READ_MESSAGE_FLAG_NONE));
|
| + EXPECT_EQ(kBufferSize, buffer_size);
|
| + EXPECT_EQ(123456789, buffer[0]);
|
| +
|
| + // Wait for readable on |d_1| and close |d_0| after some time, which should
|
| + // cancel that wait.
|
| + {
|
| + test::WaiterThread thread(d_1,
|
| + MOJO_WAIT_FLAG_READABLE,
|
| + MOJO_DEADLINE_INDEFINITE,
|
| + 0,
|
| + &did_wait, &result);
|
| + stopwatch.Start();
|
| + thread.Start();
|
| + base::PlatformThread::Sleep(
|
| + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
|
| + EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
|
| + } // Joins the thread.
|
| + elapsed_micros = stopwatch.Elapsed();
|
| + EXPECT_TRUE(did_wait);
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
|
| + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
|
| + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
|
| +
|
| + EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
|
| + }
|
| +
|
| + for (unsigned i = 0; i < 2; i++) {
|
| + scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
|
| + scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
|
| + {
|
| + scoped_refptr<MessagePipe> mp(new MessagePipe());
|
| + d_0->Init(mp, i); // 0, 1.
|
| + d_1->Init(mp, i ^ 1); // 1, 0.
|
| + }
|
| +
|
| + // Wait for readable on |d_1| and close |d_1| after some time, which should
|
| + // cancel that wait.
|
| + {
|
| + test::WaiterThread thread(d_1,
|
| + MOJO_WAIT_FLAG_READABLE,
|
| + MOJO_DEADLINE_INDEFINITE,
|
| + 0,
|
| + &did_wait, &result);
|
| + stopwatch.Start();
|
| + thread.Start();
|
| + base::PlatformThread::Sleep(
|
| + base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
|
| + EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
|
| + } // Joins the thread.
|
| + elapsed_micros = stopwatch.Elapsed();
|
| + EXPECT_TRUE(did_wait);
|
| + EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
|
| + EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
|
| + EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
|
| +
|
| + EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
|
| + }
|
| +}
|
| +
|
| +// Stress test -----------------------------------------------------------------
|
| +
|
| +const size_t kMaxMessageSize = 2000;
|
| +
|
| +class WriterThread : public base::SimpleThread {
|
| + public:
|
| + // |*messages_written| and |*bytes_written| belong to the thread while it's
|
| + // alive.
|
| + WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
|
| + size_t* messages_written, size_t* bytes_written)
|
| + : base::SimpleThread("writer_thread"),
|
| + write_dispatcher_(write_dispatcher),
|
| + messages_written_(messages_written),
|
| + bytes_written_(bytes_written) {
|
| + *messages_written_ = 0;
|
| + *bytes_written_ = 0;
|
| + }
|
| +
|
| + virtual ~WriterThread() {
|
| + Join();
|
| + }
|
| +
|
| + private:
|
| + virtual void Run() OVERRIDE {
|
| + // Make some data to write.
|
| + unsigned char buffer[kMaxMessageSize];
|
| + for (size_t i = 0; i < kMaxMessageSize; i++)
|
| + buffer[i] = static_cast<unsigned char>(i);
|
| +
|
| + // Number of messages to write.
|
| + *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
|
| +
|
| + // Write messages.
|
| + for (size_t i = 0; i < *messages_written_; i++) {
|
| + size_t bytes_to_write = base::RandInt(1,
|
| + static_cast<int>(kMaxMessageSize));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + write_dispatcher_->WriteMessage(buffer, bytes_to_write,
|
| + NULL, 0,
|
| + MOJO_WRITE_MESSAGE_FLAG_NONE));
|
| + *bytes_written_ += bytes_to_write;
|
| + }
|
| +
|
| + // Write one last "quit" message.
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + write_dispatcher_->WriteMessage("quit", 4, NULL, 0,
|
| + MOJO_WRITE_MESSAGE_FLAG_NONE));
|
| + }
|
| +
|
| + const scoped_refptr<Dispatcher> write_dispatcher_;
|
| + size_t* const messages_written_;
|
| + size_t* const bytes_written_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(WriterThread);
|
| +};
|
| +
|
| +class ReaderThread : public base::SimpleThread {
|
| + public:
|
| + // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
|
| + ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
|
| + size_t* messages_read, size_t* bytes_read)
|
| + : base::SimpleThread("reader_thread"),
|
| + read_dispatcher_(read_dispatcher),
|
| + messages_read_(messages_read),
|
| + bytes_read_(bytes_read) {
|
| + *messages_read_ = 0;
|
| + *bytes_read_ = 0;
|
| + }
|
| +
|
| + virtual ~ReaderThread() {
|
| + Join();
|
| + }
|
| +
|
| + private:
|
| + virtual void Run() OVERRIDE {
|
| + unsigned char buffer[kMaxMessageSize];
|
| + MojoResult result;
|
| + Waiter w;
|
| +
|
| + // Read messages.
|
| + for (;;) {
|
| + // Wait for it to be readable.
|
| + w.Init();
|
| + result = read_dispatcher_->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0);
|
| + EXPECT_TRUE(result == MOJO_RESULT_OK ||
|
| + result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result;
|
| + if (result == MOJO_RESULT_OK) {
|
| + // Actually need to wait.
|
| + EXPECT_EQ(0, w.Wait(MOJO_DEADLINE_INDEFINITE));
|
| + read_dispatcher_->RemoveWaiter(&w);
|
| + }
|
| +
|
| + // Now, try to do the read.
|
| + // Clear the buffer so that we can check the result.
|
| + memset(buffer, 0, sizeof(buffer));
|
| + uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
|
| + result = read_dispatcher_->ReadMessage(buffer, &buffer_size, NULL, NULL,
|
| + MOJO_READ_MESSAGE_FLAG_NONE);
|
| + EXPECT_TRUE(result == MOJO_RESULT_OK ||
|
| + result == MOJO_RESULT_NOT_FOUND) << "result: " << result;
|
| + // We're racing with others to read, so maybe we failed.
|
| + if (result == MOJO_RESULT_NOT_FOUND)
|
| + continue; // In which case, try again.
|
| + // Check for quit.
|
| + if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
|
| + return;
|
| + EXPECT_GE(buffer_size, 1u);
|
| + EXPECT_LE(buffer_size, kMaxMessageSize);
|
| + EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
|
| +
|
| + (*messages_read_)++;
|
| + *bytes_read_ += buffer_size;
|
| + }
|
| + }
|
| +
|
| + static bool IsValidMessage(const unsigned char* buffer,
|
| + uint32_t message_size) {
|
| + size_t i;
|
| + for (i = 0; i < message_size; i++) {
|
| + if (buffer[i] != static_cast<unsigned char>(i))
|
| + return false;
|
| + }
|
| + // Check that the remaining bytes weren't stomped on.
|
| + for (; i < kMaxMessageSize; i++) {
|
| + if (buffer[i] != 0)
|
| + return false;
|
| + }
|
| + return true;
|
| + }
|
| +
|
| + const scoped_refptr<Dispatcher> read_dispatcher_;
|
| + size_t* const messages_read_;
|
| + size_t* const bytes_read_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(ReaderThread);
|
| +};
|
| +
|
| +TEST(MessagePipeDispatcherTest, Stress) {
|
| + static const size_t kNumWriters = 30;
|
| + static const size_t kNumReaders = kNumWriters;
|
| +
|
| + scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher());
|
| + scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher());
|
| + {
|
| + scoped_refptr<MessagePipe> mp(new MessagePipe());
|
| + d_write->Init(mp, 0);
|
| + d_read->Init(mp, 1);
|
| + }
|
| +
|
| + size_t messages_written[kNumWriters];
|
| + size_t bytes_written[kNumWriters];
|
| + size_t messages_read[kNumReaders];
|
| + size_t bytes_read[kNumReaders];
|
| + {
|
| + // Make writers.
|
| + ScopedVector<WriterThread> writers;
|
| + for (size_t i = 0; i < kNumWriters; i++) {
|
| + writers.push_back(
|
| + new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
|
| + }
|
| +
|
| + // Make readers.
|
| + ScopedVector<ReaderThread> readers;
|
| + for (size_t i = 0; i < kNumReaders; i++) {
|
| + readers.push_back(
|
| + new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
|
| + }
|
| +
|
| + // Start writers.
|
| + for (size_t i = 0; i < kNumWriters; i++)
|
| + writers[i]->Start();
|
| +
|
| + // Start readers.
|
| + for (size_t i = 0; i < kNumReaders; i++)
|
| + readers[i]->Start();
|
| +
|
| + // TODO(vtl): Maybe I should have an event that triggers all the threads to
|
| + // start doing stuff for real (so that the first ones created/started aren't
|
| + // advantaged).
|
| + } // Joins all the threads.
|
| +
|
| + size_t total_messages_written = 0;
|
| + size_t total_bytes_written = 0;
|
| + for (size_t i = 0; i < kNumWriters; i++) {
|
| + total_messages_written += messages_written[i];
|
| + total_bytes_written += bytes_written[i];
|
| + }
|
| + size_t total_messages_read = 0;
|
| + size_t total_bytes_read = 0;
|
| + for (size_t i = 0; i < kNumReaders; i++) {
|
| + total_messages_read += messages_read[i];
|
| + total_bytes_read += bytes_read[i];
|
| + // We'd have to be really unlucky to have read no messages on a thread.
|
| + EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
|
| + EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
|
| + }
|
| + EXPECT_EQ(total_messages_written, total_messages_read);
|
| + EXPECT_EQ(total_bytes_written, total_bytes_read);
|
| +
|
| + EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
|
| + EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
|
| +}
|
| +
|
| +} // namespace
|
| +} // namespace system
|
| +} // namespace mojo
|
|
|