| Index: third_party/mojo/src/mojo/edk/system/remote_data_pipe_impl_unittest.cc
|
| diff --git a/third_party/mojo/src/mojo/edk/system/remote_data_pipe_impl_unittest.cc b/third_party/mojo/src/mojo/edk/system/remote_data_pipe_impl_unittest.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..7efd40a89d8e1beb1a0cefa31803f409a19be356
|
| --- /dev/null
|
| +++ b/third_party/mojo/src/mojo/edk/system/remote_data_pipe_impl_unittest.cc
|
| @@ -0,0 +1,282 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +// This file tests both |RemoteProducerDataPipeImpl| and
|
| +// |RemoteConsumerDataPipeImpl|.
|
| +
|
| +#include <stdint.h>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/location.h"
|
| +#include "base/logging.h"
|
| +#include "base/macros.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "base/test/test_io_thread.h"
|
| +#include "base/test/test_timeouts.h"
|
| +#include "mojo/edk/embedder/platform_channel_pair.h"
|
| +#include "mojo/edk/embedder/simple_platform_support.h"
|
| +#include "mojo/edk/system/channel.h"
|
| +#include "mojo/edk/system/channel_endpoint.h"
|
| +#include "mojo/edk/system/data_pipe.h"
|
| +#include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
|
| +#include "mojo/edk/system/data_pipe_producer_dispatcher.h"
|
| +#include "mojo/edk/system/memory.h"
|
| +#include "mojo/edk/system/message_pipe.h"
|
| +#include "mojo/edk/system/raw_channel.h"
|
| +#include "mojo/edk/system/test_utils.h"
|
| +#include "mojo/edk/system/waiter.h"
|
| +#include "testing/gtest/include/gtest/gtest.h"
|
| +
|
| +namespace mojo {
|
| +namespace system {
|
| +namespace {
|
| +
|
| +const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE |
|
| + MOJO_HANDLE_SIGNAL_WRITABLE |
|
| + MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| +
|
| +class RemoteDataPipeImplTest : public testing::Test {
|
| + public:
|
| + RemoteDataPipeImplTest() : io_thread_(base::TestIOThread::kAutoStart) {}
|
| + ~RemoteDataPipeImplTest() override {}
|
| +
|
| + void SetUp() override {
|
| + scoped_refptr<ChannelEndpoint> ep[2];
|
| + message_pipes_[0] = MessagePipe::CreateLocalProxy(&ep[0]);
|
| + message_pipes_[1] = MessagePipe::CreateLocalProxy(&ep[1]);
|
| +
|
| + io_thread_.PostTaskAndWait(
|
| + FROM_HERE, base::Bind(&RemoteDataPipeImplTest::SetUpOnIOThread,
|
| + base::Unretained(this), ep[0], ep[1]));
|
| + }
|
| +
|
| + void TearDown() override {
|
| + EnsureMessagePipeClosed(0);
|
| + EnsureMessagePipeClosed(1);
|
| + io_thread_.PostTaskAndWait(
|
| + FROM_HERE, base::Bind(&RemoteDataPipeImplTest::TearDownOnIOThread,
|
| + base::Unretained(this)));
|
| + }
|
| +
|
| + protected:
|
| + static DataPipe* CreateLocal(size_t element_size, size_t num_elements) {
|
| + const MojoCreateDataPipeOptions options = {
|
| + static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,
|
| + static_cast<uint32_t>(element_size),
|
| + static_cast<uint32_t>(num_elements * element_size)};
|
| + MojoCreateDataPipeOptions validated_options = {};
|
| + CHECK_EQ(DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
|
| + &validated_options),
|
| + MOJO_RESULT_OK);
|
| + return DataPipe::CreateLocal(validated_options);
|
| + }
|
| +
|
| + scoped_refptr<MessagePipe> message_pipe(size_t i) {
|
| + return message_pipes_[i];
|
| + }
|
| +
|
| + void EnsureMessagePipeClosed(size_t i) {
|
| + if (!message_pipes_[i])
|
| + return;
|
| + message_pipes_[i]->Close(0);
|
| + message_pipes_[i] = nullptr;
|
| + }
|
| +
|
| + private:
|
| + void SetUpOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
|
| + scoped_refptr<ChannelEndpoint> ep1) {
|
| + CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop());
|
| +
|
| + embedder::PlatformChannelPair channel_pair;
|
| + channels_[0] = new Channel(&platform_support_);
|
| + channels_[0]->Init(RawChannel::Create(channel_pair.PassServerHandle()));
|
| + channels_[0]->SetBootstrapEndpoint(ep0);
|
| + channels_[1] = new Channel(&platform_support_);
|
| + channels_[1]->Init(RawChannel::Create(channel_pair.PassClientHandle()));
|
| + channels_[1]->SetBootstrapEndpoint(ep1);
|
| + }
|
| +
|
| + void TearDownOnIOThread() {
|
| + CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop());
|
| +
|
| + if (channels_[0]) {
|
| + channels_[0]->Shutdown();
|
| + channels_[0] = nullptr;
|
| + }
|
| + if (channels_[1]) {
|
| + channels_[1]->Shutdown();
|
| + channels_[1] = nullptr;
|
| + }
|
| + }
|
| +
|
| + embedder::SimplePlatformSupport platform_support_;
|
| + base::TestIOThread io_thread_;
|
| + scoped_refptr<Channel> channels_[2];
|
| + scoped_refptr<MessagePipe> message_pipes_[2];
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(RemoteDataPipeImplTest);
|
| +};
|
| +
|
| +// These tests are heavier-weight than ideal. They test remote data pipes by
|
| +// passing data pipe (producer/consumer) dispatchers over remote message pipes.
|
| +// Make sure that the test fixture works properly (i.e., that the message pipe
|
| +// works properly, and that things are shut down correctly).
|
| +// TODO(vtl): Make lighter-weight tests. Ideally, we'd have tests for remote
|
| +// data pipes which don't involve message pipes (or even data pipe dispatchers).
|
| +TEST_F(RemoteDataPipeImplTest, Sanity) {
|
| + static const char kHello[] = "hello";
|
| + char read_buffer[100] = {};
|
| + uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
|
| + Waiter waiter;
|
| + HandleSignalsState hss;
|
| + uint32_t context = 0;
|
| +
|
| + // Write on MP 0 (port 0). Wait and receive on MP 1 (port 0). (Add the waiter
|
| + // first, to avoid any handling the case where it's already readable.)
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + message_pipe(1)->AddAwakable(
|
| + 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + message_pipe(0)->WriteMessage(0, UserPointer<const void>(kHello),
|
| + sizeof(kHello), nullptr,
|
| + MOJO_WRITE_MESSAGE_FLAG_NONE));
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
|
| + EXPECT_EQ(123u, context);
|
| + hss = HandleSignalsState();
|
| + message_pipe(1)->RemoveAwakable(0, &waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
|
| + EXPECT_EQ(MOJO_RESULT_OK, message_pipe(1)->ReadMessage(
|
| + 0, UserPointer<void>(read_buffer),
|
| + MakeUserPointer(&read_buffer_size), nullptr,
|
| + nullptr, MOJO_READ_MESSAGE_FLAG_NONE));
|
| + EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
|
| + EXPECT_STREQ(kHello, read_buffer);
|
| +}
|
| +
|
| +// TODO(vtl): This test doesn't have an obvious analogue in
|
| +// |LocalDataPipeImplTest|.
|
| +TEST_F(RemoteDataPipeImplTest, SendConsumerWithClosedProducer) {
|
| + char read_buffer[100] = {};
|
| + uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
|
| + DispatcherVector read_dispatchers;
|
| + uint32_t read_num_dispatchers = 10; // Maximum to get.
|
| + Waiter waiter;
|
| + HandleSignalsState hss;
|
| + uint32_t context = 0;
|
| +
|
| + scoped_refptr<DataPipe> dp(CreateLocal(sizeof(int32_t), 1000));
|
| + // This is the consumer dispatcher we'll send.
|
| + scoped_refptr<DataPipeConsumerDispatcher> consumer =
|
| + new DataPipeConsumerDispatcher();
|
| + consumer->Init(dp);
|
| +
|
| + // Write to the producer and close it, before sending the consumer.
|
| + int32_t elements[10] = {123};
|
| + uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + dp->ProducerWriteData(UserPointer<const void>(elements),
|
| + MakeUserPointer(&num_bytes), false));
|
| + EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
|
| + dp->ProducerClose();
|
| +
|
| + // Write the consumer to MP 0 (port 0). Wait and receive on MP 1 (port 0).
|
| + // (Add the waiter first, to avoid any handling the case where it's already
|
| + // readable.)
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + message_pipe(1)->AddAwakable(
|
| + 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
|
| + {
|
| + DispatcherTransport transport(
|
| + test::DispatcherTryStartTransport(consumer.get()));
|
| + EXPECT_TRUE(transport.is_valid());
|
| +
|
| + std::vector<DispatcherTransport> transports;
|
| + transports.push_back(transport);
|
| + EXPECT_EQ(MOJO_RESULT_OK, message_pipe(0)->WriteMessage(
|
| + 0, NullUserPointer(), 0, &transports,
|
| + MOJO_WRITE_MESSAGE_FLAG_NONE));
|
| + transport.End();
|
| +
|
| + // |consumer| should have been closed. This is |DCHECK()|ed when it is
|
| + // destroyed.
|
| + EXPECT_TRUE(consumer->HasOneRef());
|
| + consumer = nullptr;
|
| + }
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
|
| + EXPECT_EQ(123u, context);
|
| + hss = HandleSignalsState();
|
| + message_pipe(1)->RemoveAwakable(0, &waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + message_pipe(1)->ReadMessage(
|
| + 0, UserPointer<void>(read_buffer),
|
| + MakeUserPointer(&read_buffer_size), &read_dispatchers,
|
| + &read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE));
|
| + EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size));
|
| + EXPECT_EQ(1u, read_dispatchers.size());
|
| + EXPECT_EQ(1u, read_num_dispatchers);
|
| + ASSERT_TRUE(read_dispatchers[0]);
|
| + EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
|
| +
|
| + EXPECT_EQ(Dispatcher::kTypeDataPipeConsumer, read_dispatchers[0]->GetType());
|
| + consumer =
|
| + static_cast<DataPipeConsumerDispatcher*>(read_dispatchers[0].get());
|
| + read_dispatchers.clear();
|
| +
|
| + waiter.Init();
|
| + hss = HandleSignalsState();
|
| + MojoResult result =
|
| + consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, &hss);
|
| + if (result == MOJO_RESULT_OK) {
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
|
| + EXPECT_EQ(456u, context);
|
| + consumer->RemoveAwakable(&waiter, &hss);
|
| + } else {
|
| + ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
|
| + }
|
| + // We don't know if the fact that the producer has been closed is known yet.
|
| + EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
|
| + EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
|
| +
|
| + // Read one element.
|
| + elements[0] = -1;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, consumer->ReadData(UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes),
|
| + MOJO_READ_DATA_FLAG_NONE));
|
| + EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
|
| + EXPECT_EQ(123, elements[0]);
|
| + EXPECT_EQ(-1, elements[1]);
|
| +
|
| + waiter.Init();
|
| + hss = HandleSignalsState();
|
| + result =
|
| + consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
|
| + if (result == MOJO_RESULT_OK) {
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + waiter.Wait(test::ActionDeadline(), &context));
|
| + EXPECT_EQ(789u, context);
|
| + consumer->RemoveAwakable(&waiter, &hss);
|
| + } else {
|
| + ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
|
| + }
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
| +
|
| + consumer->Close();
|
| +}
|
| +
|
| +} // namespace
|
| +} // namespace system
|
| +} // namespace mojo
|
|
|