| Index: third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc
|
| diff --git a/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc b/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..b14d3af4c4c7d2c78c3873b3bb6f082bc7e468e7
|
| --- /dev/null
|
| +++ b/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc
|
| @@ -0,0 +1,1720 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +// This file contains tests that are shared between different implementations of
|
| +// |DataPipeImpl|.
|
| +
|
| +#include "mojo/edk/system/data_pipe_impl.h"
|
| +
|
| +#include <stdint.h>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/location.h"
|
| +#include "base/logging.h"
|
| +#include "base/macros.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "base/test/test_io_thread.h"
|
| +#include "base/threading/platform_thread.h" // For |Sleep()|.
|
| +#include "mojo/edk/embedder/platform_channel_pair.h"
|
| +#include "mojo/edk/embedder/simple_platform_support.h"
|
| +#include "mojo/edk/system/channel.h"
|
| +#include "mojo/edk/system/channel_endpoint.h"
|
| +#include "mojo/edk/system/data_pipe.h"
|
| +#include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
|
| +#include "mojo/edk/system/data_pipe_producer_dispatcher.h"
|
| +#include "mojo/edk/system/memory.h"
|
| +#include "mojo/edk/system/message_pipe.h"
|
| +#include "mojo/edk/system/raw_channel.h"
|
| +#include "mojo/edk/system/test_utils.h"
|
| +#include "mojo/edk/system/waiter.h"
|
| +#include "testing/gtest/include/gtest/gtest.h"
|
| +
|
| +namespace mojo {
|
| +namespace system {
|
| +namespace {
|
| +
|
| +const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE |
|
| + MOJO_HANDLE_SIGNAL_WRITABLE |
|
| + MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| +const uint32_t kSizeOfOptions =
|
| + static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
|
| +
|
| +// DataPipeImplTestHelper ------------------------------------------------------
|
| +
|
| +class DataPipeImplTestHelper {
|
| + public:
|
| + virtual ~DataPipeImplTestHelper() {}
|
| +
|
| + virtual void SetUp() = 0;
|
| + virtual void TearDown() = 0;
|
| +
|
| + virtual void Create(const MojoCreateDataPipeOptions& validated_options) = 0;
|
| +
|
| + // Possibly transfers the producer/consumer.
|
| + virtual void DoTransfer() = 0;
|
| +
|
| + // Returns the |DataPipe| object for the producer and consumer, respectively.
|
| + virtual DataPipe* dpp() = 0;
|
| + virtual DataPipe* dpc() = 0;
|
| +
|
| + virtual void ProducerClose() = 0;
|
| + virtual void ConsumerClose() = 0;
|
| +
|
| + protected:
|
| + DataPipeImplTestHelper() {}
|
| +
|
| + private:
|
| + DISALLOW_COPY_AND_ASSIGN(DataPipeImplTestHelper);
|
| +};
|
| +
|
| +// DataPipeImplTest ------------------------------------------------------------
|
| +
|
| +template <class Helper>
|
| +class DataPipeImplTest : public testing::Test {
|
| + public:
|
| + DataPipeImplTest() {}
|
| + ~DataPipeImplTest() override {}
|
| +
|
| + void SetUp() override { helper_.SetUp(); }
|
| + void TearDown() override { helper_.TearDown(); }
|
| +
|
| + protected:
|
| + void Create(const MojoCreateDataPipeOptions& options) {
|
| + MojoCreateDataPipeOptions validated_options = {};
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
|
| + &validated_options));
|
| + helper_.Create(validated_options);
|
| + }
|
| +
|
| + void DoTransfer() { return helper_.DoTransfer(); }
|
| +
|
| + DataPipe* dpp() { return helper_.dpp(); }
|
| + DataPipe* dpc() { return helper_.dpc(); }
|
| +
|
| + void ProducerClose() { helper_.ProducerClose(); }
|
| + void ConsumerClose() { helper_.ConsumerClose(); }
|
| +
|
| + private:
|
| + Helper helper_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(DataPipeImplTest);
|
| +};
|
| +
|
| +// LocalDataPipeImplTestHelper -------------------------------------------------
|
| +
|
| +class LocalDataPipeImplTestHelper : public DataPipeImplTestHelper {
|
| + public:
|
| + LocalDataPipeImplTestHelper() {}
|
| + ~LocalDataPipeImplTestHelper() override {}
|
| +
|
| + void SetUp() override {}
|
| + void TearDown() override {}
|
| +
|
| + void Create(const MojoCreateDataPipeOptions& validated_options) override {
|
| + CHECK(!dp_);
|
| + dp_ = DataPipe::CreateLocal(validated_options);
|
| + }
|
| +
|
| + void DoTransfer() override {}
|
| +
|
| + // Returns the |DataPipe| object for the producer and consumer, respectively.
|
| + DataPipe* dpp() override { return dp_.get(); }
|
| + DataPipe* dpc() override { return dp_.get(); }
|
| +
|
| + void ProducerClose() override { dp_->ProducerClose(); }
|
| + void ConsumerClose() override { dp_->ConsumerClose(); }
|
| +
|
| + private:
|
| + scoped_refptr<DataPipe> dp_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(LocalDataPipeImplTestHelper);
|
| +};
|
| +
|
| +// RemoteDataPipeImplTestHelper ------------------------------------------------
|
| +
|
| +// Base class for |Remote{Producer,Consumer}DataPipeImplTestHelper|.
|
| +class RemoteDataPipeImplTestHelper : public DataPipeImplTestHelper {
|
| + public:
|
| + RemoteDataPipeImplTestHelper() : io_thread_(base::TestIOThread::kAutoStart) {}
|
| + ~RemoteDataPipeImplTestHelper() override {}
|
| +
|
| + void SetUp() override {
|
| + scoped_refptr<ChannelEndpoint> ep[2];
|
| + message_pipes_[0] = MessagePipe::CreateLocalProxy(&ep[0]);
|
| + message_pipes_[1] = MessagePipe::CreateLocalProxy(&ep[1]);
|
| +
|
| + io_thread_.PostTaskAndWait(
|
| + FROM_HERE, base::Bind(&RemoteDataPipeImplTestHelper::SetUpOnIOThread,
|
| + base::Unretained(this), ep[0], ep[1]));
|
| + }
|
| +
|
| + void TearDown() override {
|
| + EnsureMessagePipeClosed(0);
|
| + EnsureMessagePipeClosed(1);
|
| + io_thread_.PostTaskAndWait(
|
| + FROM_HERE, base::Bind(&RemoteDataPipeImplTestHelper::TearDownOnIOThread,
|
| + base::Unretained(this)));
|
| + }
|
| +
|
| + void Create(const MojoCreateDataPipeOptions& validated_options) override {
|
| + CHECK(!dp_);
|
| + dp_ = DataPipe::CreateLocal(validated_options);
|
| + }
|
| +
|
| + protected:
|
| + void SendDispatcher(size_t source_i,
|
| + scoped_refptr<Dispatcher> to_send,
|
| + scoped_refptr<Dispatcher>* to_receive) {
|
| + DCHECK(source_i == 0 || source_i == 1);
|
| + size_t dest_i = source_i ^ 1;
|
| +
|
| + // Write the dispatcher to MP |source_i| (port 0). Wait and receive on MP
|
| + // |dest_i| (port 0). (Add the waiter first, to avoid any handling the case
|
| + // where it's already readable.)
|
| + Waiter waiter;
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + message_pipe(dest_i)->AddAwakable(
|
| + 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 987, nullptr));
|
| + {
|
| + DispatcherTransport transport(
|
| + test::DispatcherTryStartTransport(to_send.get()));
|
| + ASSERT_TRUE(transport.is_valid());
|
| +
|
| + std::vector<DispatcherTransport> transports;
|
| + transports.push_back(transport);
|
| + ASSERT_EQ(MOJO_RESULT_OK, message_pipe(source_i)->WriteMessage(
|
| + 0, NullUserPointer(), 0, &transports,
|
| + MOJO_WRITE_MESSAGE_FLAG_NONE));
|
| + transport.End();
|
| + }
|
| + uint32_t context = 0;
|
| + ASSERT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
|
| + EXPECT_EQ(987u, context);
|
| + HandleSignalsState hss = HandleSignalsState();
|
| + message_pipe(dest_i)->RemoveAwakable(0, &waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
|
| + char read_buffer[100] = {};
|
| + uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
|
| + DispatcherVector read_dispatchers;
|
| + uint32_t read_num_dispatchers = 10; // Maximum to get.
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + message_pipe(dest_i)->ReadMessage(
|
| + 0, UserPointer<void>(read_buffer),
|
| + MakeUserPointer(&read_buffer_size), &read_dispatchers,
|
| + &read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE));
|
| + EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size));
|
| + ASSERT_EQ(1u, read_dispatchers.size());
|
| + ASSERT_EQ(1u, read_num_dispatchers);
|
| + ASSERT_TRUE(read_dispatchers[0]);
|
| + EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
|
| +
|
| + *to_receive = read_dispatchers[0];
|
| + }
|
| +
|
| + scoped_refptr<MessagePipe> message_pipe(size_t i) {
|
| + return message_pipes_[i];
|
| + }
|
| + scoped_refptr<DataPipe> dp() { return dp_; }
|
| +
|
| + private:
|
| + void EnsureMessagePipeClosed(size_t i) {
|
| + if (!message_pipes_[i])
|
| + return;
|
| + message_pipes_[i]->Close(0);
|
| + message_pipes_[i] = nullptr;
|
| + }
|
| +
|
| + void SetUpOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
|
| + scoped_refptr<ChannelEndpoint> ep1) {
|
| + CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop());
|
| +
|
| + embedder::PlatformChannelPair channel_pair;
|
| + channels_[0] = new Channel(&platform_support_);
|
| + channels_[0]->Init(RawChannel::Create(channel_pair.PassServerHandle()));
|
| + channels_[0]->SetBootstrapEndpoint(ep0);
|
| + channels_[1] = new Channel(&platform_support_);
|
| + channels_[1]->Init(RawChannel::Create(channel_pair.PassClientHandle()));
|
| + channels_[1]->SetBootstrapEndpoint(ep1);
|
| + }
|
| +
|
| + void TearDownOnIOThread() {
|
| + CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop());
|
| +
|
| + if (channels_[0]) {
|
| + channels_[0]->Shutdown();
|
| + channels_[0] = nullptr;
|
| + }
|
| + if (channels_[1]) {
|
| + channels_[1]->Shutdown();
|
| + channels_[1] = nullptr;
|
| + }
|
| + }
|
| +
|
| + embedder::SimplePlatformSupport platform_support_;
|
| + base::TestIOThread io_thread_;
|
| + scoped_refptr<Channel> channels_[2];
|
| + scoped_refptr<MessagePipe> message_pipes_[2];
|
| +
|
| + scoped_refptr<DataPipe> dp_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(RemoteDataPipeImplTestHelper);
|
| +};
|
| +
|
| +// RemoteProducerDataPipeImplTestHelper ----------------------------------------
|
| +
|
| +// Note about naming confusion: This class is named after the "local" class,
|
| +// i.e., |dp_| will have a |RemoteProducerDataPipeImpl|. The remote side, of
|
| +// course, will have a |RemoteConsumerDataPipeImpl|.
|
| +class RemoteProducerDataPipeImplTestHelper
|
| + : public RemoteDataPipeImplTestHelper {
|
| + public:
|
| + RemoteProducerDataPipeImplTestHelper() {}
|
| + ~RemoteProducerDataPipeImplTestHelper() override {}
|
| +
|
| + void DoTransfer() override {
|
| + // This is the producer dispatcher we'll send.
|
| + scoped_refptr<DataPipeProducerDispatcher> to_send =
|
| + new DataPipeProducerDispatcher();
|
| + to_send->Init(dp());
|
| + scoped_refptr<Dispatcher> to_receive;
|
| + SendDispatcher(0, to_send, &to_receive);
|
| + // |to_send| should have been closed. This is |DCHECK()|ed when it is
|
| + // destroyed.
|
| + EXPECT_TRUE(to_send->HasOneRef());
|
| + to_send = nullptr;
|
| +
|
| + ASSERT_EQ(Dispatcher::kTypeDataPipeProducer, to_receive->GetType());
|
| + producer_dispatcher_ =
|
| + static_cast<DataPipeProducerDispatcher*>(to_receive.get());
|
| + }
|
| +
|
| + DataPipe* dpp() override {
|
| + if (producer_dispatcher_)
|
| + return producer_dispatcher_->GetDataPipeForTest();
|
| + return dp().get();
|
| + }
|
| + DataPipe* dpc() override { return dp().get(); }
|
| +
|
| + void ProducerClose() override {
|
| + if (producer_dispatcher_)
|
| + ASSERT_EQ(MOJO_RESULT_OK, producer_dispatcher_->Close());
|
| + else
|
| + dp()->ProducerClose();
|
| + }
|
| + void ConsumerClose() override { dp()->ConsumerClose(); }
|
| +
|
| + protected:
|
| + scoped_refptr<DataPipeProducerDispatcher> producer_dispatcher_;
|
| +
|
| + private:
|
| + DISALLOW_COPY_AND_ASSIGN(RemoteProducerDataPipeImplTestHelper);
|
| +};
|
| +
|
| +// RemoteConsumerDataPipeImplTestHelper ----------------------------------------
|
| +
|
| +// Note about naming confusion: This class is named after the "local" class,
|
| +// i.e., |dp_| will have a |RemoteConsumerDataPipeImpl|. The remote side, of
|
| +// course, will have a |RemoteProducerDataPipeImpl|.
|
| +class RemoteConsumerDataPipeImplTestHelper
|
| + : public RemoteDataPipeImplTestHelper {
|
| + public:
|
| + RemoteConsumerDataPipeImplTestHelper() {}
|
| + ~RemoteConsumerDataPipeImplTestHelper() override {}
|
| +
|
| + void DoTransfer() override {
|
| + // This is the consumer dispatcher we'll send.
|
| + scoped_refptr<DataPipeConsumerDispatcher> to_send =
|
| + new DataPipeConsumerDispatcher();
|
| + to_send->Init(dp());
|
| + scoped_refptr<Dispatcher> to_receive;
|
| + SendDispatcher(0, to_send, &to_receive);
|
| + // |to_send| should have been closed. This is |DCHECK()|ed when it is
|
| + // destroyed.
|
| + EXPECT_TRUE(to_send->HasOneRef());
|
| + to_send = nullptr;
|
| +
|
| + ASSERT_EQ(Dispatcher::kTypeDataPipeConsumer, to_receive->GetType());
|
| + consumer_dispatcher_ =
|
| + static_cast<DataPipeConsumerDispatcher*>(to_receive.get());
|
| + }
|
| +
|
| + DataPipe* dpp() override { return dp().get(); }
|
| + DataPipe* dpc() override {
|
| + if (consumer_dispatcher_)
|
| + return consumer_dispatcher_->GetDataPipeForTest();
|
| + return dp().get();
|
| + }
|
| +
|
| + void ProducerClose() override { dp()->ProducerClose(); }
|
| + void ConsumerClose() override {
|
| + if (consumer_dispatcher_)
|
| + ASSERT_EQ(MOJO_RESULT_OK, consumer_dispatcher_->Close());
|
| + else
|
| + dp()->ConsumerClose();
|
| + }
|
| +
|
| + protected:
|
| + scoped_refptr<DataPipeConsumerDispatcher> consumer_dispatcher_;
|
| +
|
| + private:
|
| + DISALLOW_COPY_AND_ASSIGN(RemoteConsumerDataPipeImplTestHelper);
|
| +};
|
| +
|
| +// RemoteProducerDataPipeImplTestHelper2 ---------------------------------------
|
| +
|
| +// This is like |RemoteProducerDataPipeImplTestHelper|, but |DoTransfer()| does
|
| +// a second transfer. This thus tests passing a producer handle twice, and in
|
| +// particular tests (some of) |RemoteConsumerDataPipeImpl|'s
|
| +// |ProducerEndSerialize()| (instead of |LocalDataPipeImpl|'s).
|
| +//
|
| +// Note about naming confusion: This class is named after the "local" class,
|
| +// i.e., |dp_| will have a |RemoteProducerDataPipeImpl|. The remote side, of
|
| +// course, will have a |RemoteConsumerDataPipeImpl|.
|
| +class RemoteProducerDataPipeImplTestHelper2
|
| + : public RemoteProducerDataPipeImplTestHelper {
|
| + public:
|
| + RemoteProducerDataPipeImplTestHelper2() {}
|
| + ~RemoteProducerDataPipeImplTestHelper2() override {}
|
| +
|
| + void DoTransfer() override {
|
| + // This is the producer dispatcher we'll send.
|
| + scoped_refptr<DataPipeProducerDispatcher> to_send =
|
| + new DataPipeProducerDispatcher();
|
| + to_send->Init(dp());
|
| + scoped_refptr<Dispatcher> to_receive;
|
| + SendDispatcher(0, to_send, &to_receive);
|
| + // |to_send| should have been closed. This is |DCHECK()|ed when it is
|
| + // destroyed.
|
| + EXPECT_TRUE(to_send->HasOneRef());
|
| + to_send = nullptr;
|
| + ASSERT_EQ(Dispatcher::kTypeDataPipeProducer, to_receive->GetType());
|
| + to_send = static_cast<DataPipeProducerDispatcher*>(to_receive.get());
|
| + to_receive = nullptr;
|
| +
|
| + // Now send it back the other way.
|
| + SendDispatcher(1, to_send, &to_receive);
|
| + // |producer_dispatcher_| should have been closed. This is |DCHECK()|ed when
|
| + // it is destroyed.
|
| + EXPECT_TRUE(to_send->HasOneRef());
|
| + to_send = nullptr;
|
| +
|
| + ASSERT_EQ(Dispatcher::kTypeDataPipeProducer, to_receive->GetType());
|
| + producer_dispatcher_ =
|
| + static_cast<DataPipeProducerDispatcher*>(to_receive.get());
|
| + }
|
| +
|
| + private:
|
| + DISALLOW_COPY_AND_ASSIGN(RemoteProducerDataPipeImplTestHelper2);
|
| +};
|
| +
|
| +// RemoteConsumerDataPipeImplTestHelper2 ---------------------------------------
|
| +
|
| +// This is like |RemoteConsumerDataPipeImplTestHelper|, but |DoTransfer()| does
|
| +// a second transfer. This thus tests passing a consumer handle twice, and in
|
| +// particular tests (some of) |RemoteProducerDataPipeImpl|'s
|
| +// |ConsumerEndSerialize()| (instead of |LocalDataPipeImpl|'s).
|
| +//
|
| +// Note about naming confusion: This class is named after the "local" class,
|
| +// i.e., |dp_| will have a |RemoteConsumerDataPipeImpl|. The remote side, of
|
| +// course, will have a |RemoteProducerDataPipeImpl|.
|
| +class RemoteConsumerDataPipeImplTestHelper2
|
| + : public RemoteConsumerDataPipeImplTestHelper {
|
| + public:
|
| + RemoteConsumerDataPipeImplTestHelper2() {}
|
| + ~RemoteConsumerDataPipeImplTestHelper2() override {}
|
| +
|
| + void DoTransfer() override {
|
| + // This is the consumer dispatcher we'll send.
|
| + scoped_refptr<DataPipeConsumerDispatcher> to_send =
|
| + new DataPipeConsumerDispatcher();
|
| + to_send->Init(dp());
|
| + scoped_refptr<Dispatcher> to_receive;
|
| + SendDispatcher(0, to_send, &to_receive);
|
| + // |to_send| should have been closed. This is |DCHECK()|ed when it is
|
| + // destroyed.
|
| + EXPECT_TRUE(to_send->HasOneRef());
|
| + to_send = nullptr;
|
| + ASSERT_EQ(Dispatcher::kTypeDataPipeConsumer, to_receive->GetType());
|
| + to_send = static_cast<DataPipeConsumerDispatcher*>(to_receive.get());
|
| + to_receive = nullptr;
|
| +
|
| + // Now send it back the other way.
|
| + SendDispatcher(1, to_send, &to_receive);
|
| + // |consumer_dispatcher_| should have been closed. This is |DCHECK()|ed when
|
| + // it is destroyed.
|
| + EXPECT_TRUE(to_send->HasOneRef());
|
| + to_send = nullptr;
|
| +
|
| + ASSERT_EQ(Dispatcher::kTypeDataPipeConsumer, to_receive->GetType());
|
| + consumer_dispatcher_ =
|
| + static_cast<DataPipeConsumerDispatcher*>(to_receive.get());
|
| + }
|
| +
|
| + private:
|
| + DISALLOW_COPY_AND_ASSIGN(RemoteConsumerDataPipeImplTestHelper2);
|
| +};
|
| +
|
| +// Test case instantiation -----------------------------------------------------
|
| +
|
| +typedef testing::Types<LocalDataPipeImplTestHelper,
|
| + RemoteProducerDataPipeImplTestHelper,
|
| + RemoteConsumerDataPipeImplTestHelper,
|
| + RemoteProducerDataPipeImplTestHelper2,
|
| + RemoteConsumerDataPipeImplTestHelper2> HelperTypes;
|
| +
|
| +TYPED_TEST_CASE(DataPipeImplTest, HelperTypes);
|
| +
|
| +// Tests -----------------------------------------------------------------------
|
| +
|
| +TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
|
| + const MojoCreateDataPipeOptions options = {
|
| + kSizeOfOptions, // |struct_size|.
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
|
| + static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
| + 1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
| + };
|
| + this->Create(options);
|
| + this->DoTransfer();
|
| +
|
| + Waiter waiter;
|
| + HandleSignalsState hss;
|
| + uint32_t context;
|
| +
|
| + int32_t elements[10] = {};
|
| + uint32_t num_bytes = 0;
|
| +
|
| + // Try reading; nothing there yet.
|
| + num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
|
| + EXPECT_EQ(
|
| + MOJO_RESULT_SHOULD_WAIT,
|
| + this->dpc()->ConsumerReadData(UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), false, false));
|
| +
|
| + // Query; nothing there yet.
|
| + num_bytes = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + EXPECT_EQ(0u, num_bytes);
|
| +
|
| + // Discard; nothing there yet.
|
| + num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, this->dpc()->ConsumerDiscardData(
|
| + MakeUserPointer(&num_bytes), false));
|
| +
|
| + // Read with invalid |num_bytes|.
|
| + num_bytes = sizeof(elements[0]) + 1;
|
| + EXPECT_EQ(
|
| + MOJO_RESULT_INVALID_ARGUMENT,
|
| + this->dpc()->ConsumerReadData(UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), false, false));
|
| +
|
| + // For remote data pipes, we'll have to wait; add the waiter before writing.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
|
| +
|
| + // Write two elements.
|
| + elements[0] = 123;
|
| + elements[1] = 456;
|
| + num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerWriteData(UserPointer<const void>(elements),
|
| + MakeUserPointer(&num_bytes), false));
|
| + // It should have written everything (even without "all or none").
|
| + EXPECT_EQ(2u * sizeof(elements[0]), num_bytes);
|
| +
|
| + // Wait.
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
|
| + EXPECT_EQ(123u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Query.
|
| + // TODO(vtl): It's theoretically possible (though not with the current
|
| + // implementation/configured limits) that not all the data has arrived yet.
|
| + // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
|
| + // or |2 * ...|.)
|
| + num_bytes = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + EXPECT_EQ(2 * sizeof(elements[0]), num_bytes);
|
| +
|
| + // Read one element.
|
| + elements[0] = -1;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
|
| + UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), false, false));
|
| + EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
|
| + EXPECT_EQ(123, elements[0]);
|
| + EXPECT_EQ(-1, elements[1]);
|
| +
|
| + // Query.
|
| + // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
|
| + // should get 1 here.)
|
| + num_bytes = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + EXPECT_EQ(1 * sizeof(elements[0]), num_bytes);
|
| +
|
| + // Peek one element.
|
| + elements[0] = -1;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
|
| + UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), false, true));
|
| + EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
|
| + EXPECT_EQ(456, elements[0]);
|
| + EXPECT_EQ(-1, elements[1]);
|
| +
|
| + // Query. Still has 1 element remaining.
|
| + num_bytes = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + EXPECT_EQ(1 * sizeof(elements[0]), num_bytes);
|
| +
|
| + // Try to read two elements, with "all or none".
|
| + elements[0] = -1;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
| + EXPECT_EQ(
|
| + MOJO_RESULT_OUT_OF_RANGE,
|
| + this->dpc()->ConsumerReadData(UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + EXPECT_EQ(-1, elements[0]);
|
| + EXPECT_EQ(-1, elements[1]);
|
| +
|
| + // Try to read two elements, without "all or none".
|
| + elements[0] = -1;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
|
| + UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), false, false));
|
| + EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
|
| + EXPECT_EQ(456, elements[0]);
|
| + EXPECT_EQ(-1, elements[1]);
|
| +
|
| + // Query.
|
| + num_bytes = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + EXPECT_EQ(0u, num_bytes);
|
| +
|
| + this->ProducerClose();
|
| + this->ConsumerClose();
|
| +}
|
| +
|
| +// Note: The "basic" waiting tests test that the "wait states" are correct in
|
| +// various situations; they don't test that waiters are properly awoken on state
|
| +// changes. (For that, we need to use multiple threads.)
|
| +TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
|
| + // Note: We take advantage of the fact that current for current
|
| + // implementations capacities are strict maximums. This is not guaranteed by
|
| + // the API.
|
| +
|
| + const MojoCreateDataPipeOptions options = {
|
| + kSizeOfOptions, // |struct_size|.
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
|
| + static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
| + 2 * sizeof(int32_t) // |capacity_num_bytes|.
|
| + };
|
| + this->Create(options);
|
| + this->DoTransfer();
|
| +
|
| + Waiter pwaiter; // For producer.
|
| + Waiter cwaiter; // For consumer.
|
| + HandleSignalsState hss;
|
| + uint32_t context;
|
| +
|
| + // Never readable.
|
| + pwaiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_READABLE, 12, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Already writable.
|
| + pwaiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 34, &hss));
|
| +
|
| + // We'll need to wait for readability for the remote cases.
|
| + cwaiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 1234, nullptr));
|
| +
|
| + // Write two elements.
|
| + int32_t elements[2] = {123, 456};
|
| + uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerWriteData(UserPointer<const void>(elements),
|
| + MakeUserPointer(&num_bytes), true));
|
| + EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
|
| +
|
| + // Adding a waiter should now succeed.
|
| + pwaiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56, nullptr));
|
| + // And it shouldn't be writable yet.
|
| + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
|
| + hss = HandleSignalsState();
|
| + this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
|
| + EXPECT_EQ(0u, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Wait for data to become available to the consumer.
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(1234u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&cwaiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Peek one element.
|
| + elements[0] = -1;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
|
| + UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), true, true));
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
| + EXPECT_EQ(123, elements[0]);
|
| + EXPECT_EQ(-1, elements[1]);
|
| +
|
| + // Add a waiter.
|
| + pwaiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56, nullptr));
|
| + // And it still shouldn't be writable yet.
|
| + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
|
| + hss = HandleSignalsState();
|
| + this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
|
| + EXPECT_EQ(0u, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Do it again.
|
| + pwaiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 78, nullptr));
|
| +
|
| + // Read one element.
|
| + elements[0] = -1;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
|
| + UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
| + EXPECT_EQ(123, elements[0]);
|
| + EXPECT_EQ(-1, elements[1]);
|
| +
|
| + // Waiting should now succeed.
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, pwaiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(78u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Try writing, using a two-phase write.
|
| + void* buffer = nullptr;
|
| + num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerBeginWriteData(
|
| + MakeUserPointer(&buffer), MakeUserPointer(&num_bytes), false));
|
| + EXPECT_TRUE(buffer);
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
| +
|
| + static_cast<int32_t*>(buffer)[0] = 789;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerEndWriteData(
|
| + static_cast<uint32_t>(1u * sizeof(elements[0]))));
|
| +
|
| + // Add a waiter.
|
| + pwaiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 90, nullptr));
|
| +
|
| + // Read one element, using a two-phase read.
|
| + const void* read_buffer = nullptr;
|
| + num_bytes = 0u;
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData(
|
| + MakeUserPointer(&read_buffer),
|
| + MakeUserPointer(&num_bytes), false));
|
| + EXPECT_TRUE(read_buffer);
|
| + // Since we only read one element (after having written three in all), the
|
| + // two-phase read should only allow us to read one. This checks an
|
| + // implementation detail!
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
| + EXPECT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerEndReadData(
|
| + static_cast<uint32_t>(1u * sizeof(elements[0]))));
|
| +
|
| + // Waiting should succeed.
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, pwaiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(90u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Write one element.
|
| + elements[0] = 123;
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerWriteData(UserPointer<const void>(elements),
|
| + MakeUserPointer(&num_bytes), false));
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
| +
|
| + // Add a waiter.
|
| + pwaiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, nullptr));
|
| +
|
| + // Close the consumer.
|
| + this->ConsumerClose();
|
| +
|
| + // It should now be never-writable.
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + pwaiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(12u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
| +
|
| + this->ProducerClose();
|
| +}
|
| +
|
| +TYPED_TEST(DataPipeImplTest, PeerClosedProducerWaiting) {
|
| + const MojoCreateDataPipeOptions options = {
|
| + kSizeOfOptions, // |struct_size|.
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
|
| + static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
| + 2 * sizeof(int32_t) // |capacity_num_bytes|.
|
| + };
|
| + this->Create(options);
|
| + this->DoTransfer();
|
| +
|
| + Waiter waiter;
|
| + HandleSignalsState hss;
|
| + uint32_t context;
|
| +
|
| + // Add a waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr));
|
| +
|
| + // Close the consumer.
|
| + this->ConsumerClose();
|
| +
|
| + // It should be signaled.
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(12u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpp()->ProducerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
| +
|
| + this->ProducerClose();
|
| +}
|
| +
|
| +TYPED_TEST(DataPipeImplTest, PeerClosedConsumerWaiting) {
|
| + const MojoCreateDataPipeOptions options = {
|
| + kSizeOfOptions, // |struct_size|.
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
|
| + static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
| + 2 * sizeof(int32_t) // |capacity_num_bytes|.
|
| + };
|
| + this->Create(options);
|
| + this->DoTransfer();
|
| +
|
| + Waiter waiter;
|
| + HandleSignalsState hss;
|
| + uint32_t context;
|
| +
|
| + // Add a waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr));
|
| +
|
| + // Close the producer.
|
| + this->ProducerClose();
|
| +
|
| + // It should be signaled.
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(12u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
| +
|
| + this->ConsumerClose();
|
| +}
|
| +
|
| +TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
|
| + const MojoCreateDataPipeOptions options = {
|
| + kSizeOfOptions, // |struct_size|.
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
|
| + static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
| + 1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
| + };
|
| + this->Create(options);
|
| + this->DoTransfer();
|
| +
|
| + Waiter waiter;
|
| + Waiter waiter2;
|
| + HandleSignalsState hss;
|
| + uint32_t context;
|
| +
|
| + // Never writable.
|
| + waiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, &hss));
|
| + EXPECT_EQ(0u, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Add waiter: not yet readable.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, nullptr));
|
| +
|
| + // Write two elements.
|
| + int32_t elements[2] = {123, 456};
|
| + uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerWriteData(UserPointer<const void>(elements),
|
| + MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Wait for readability (needed for remote cases).
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(34u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Discard one element.
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerDiscardData(
|
| + MakeUserPointer(&num_bytes), true));
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
| +
|
| + // Should still be readable.
|
| + waiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Peek one element.
|
| + elements[0] = -1;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
|
| + UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), true, true));
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
| + EXPECT_EQ(456, elements[0]);
|
| + EXPECT_EQ(-1, elements[1]);
|
| +
|
| + // Should still be readable.
|
| + waiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Read one element.
|
| + elements[0] = -1;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
|
| + UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
| + EXPECT_EQ(456, elements[0]);
|
| + EXPECT_EQ(-1, elements[1]);
|
| +
|
| + // Adding a waiter should now succeed.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 90, nullptr));
|
| +
|
| + // Write one element.
|
| + elements[0] = 789;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerWriteData(UserPointer<const void>(elements),
|
| + MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Waiting should now succeed.
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(90u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // We'll want to wait for the peer closed signal to propagate.
|
| + waiter.Init();
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr));
|
| +
|
| + // Close the producer.
|
| + this->ProducerClose();
|
| +
|
| + // Should still be readable, even if the peer closed signal hasn't propagated
|
| + // yet.
|
| + waiter2.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter2, MOJO_HANDLE_SIGNAL_READABLE, 34, &hss));
|
| + // We don't know if the peer closed signal has propagated yet (for the remote
|
| + // cases).
|
| + EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Wait for the peer closed signal.
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(12u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Read one element.
|
| + elements[0] = -1;
|
| + elements[1] = -1;
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
|
| + UserPointer<void>(elements),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
| + EXPECT_EQ(789, elements[0]);
|
| + EXPECT_EQ(-1, elements[1]);
|
| +
|
| + // Should be never-readable.
|
| + waiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 56, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
| +
|
| + this->ConsumerClose();
|
| +}
|
| +
|
| +// Test with two-phase APIs and also closing the producer with an active
|
| +// consumer waiter.
|
| +TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
|
| + const MojoCreateDataPipeOptions options = {
|
| + kSizeOfOptions, // |struct_size|.
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
|
| + static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
| + 1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
| + };
|
| + this->Create(options);
|
| + this->DoTransfer();
|
| +
|
| + Waiter waiter;
|
| + HandleSignalsState hss;
|
| + uint32_t context;
|
| +
|
| + // Add waiter: not yet readable.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 12, nullptr));
|
| +
|
| + // Write two elements.
|
| + int32_t* elements = nullptr;
|
| + void* buffer = nullptr;
|
| + // Request room for three (but we'll only write two).
|
| + uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerBeginWriteData(
|
| + MakeUserPointer(&buffer), MakeUserPointer(&num_bytes), true));
|
| + EXPECT_TRUE(buffer);
|
| + EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
|
| + elements = static_cast<int32_t*>(buffer);
|
| + elements[0] = 123;
|
| + elements[1] = 456;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerEndWriteData(
|
| + static_cast<uint32_t>(2u * sizeof(elements[0]))));
|
| +
|
| + // Wait for readability (needed for remote cases).
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(12u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Read one element.
|
| + // Request two in all-or-none mode, but only read one.
|
| + const void* read_buffer = nullptr;
|
| + num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData(
|
| + MakeUserPointer(&read_buffer),
|
| + MakeUserPointer(&num_bytes), true));
|
| + EXPECT_TRUE(read_buffer);
|
| + EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
|
| + const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
|
| + EXPECT_EQ(123, read_elements[0]);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerEndReadData(
|
| + static_cast<uint32_t>(1u * sizeof(elements[0]))));
|
| +
|
| + // Should still be readable.
|
| + waiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Read one element.
|
| + // Request three, but not in all-or-none mode.
|
| + read_buffer = nullptr;
|
| + num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData(
|
| + MakeUserPointer(&read_buffer),
|
| + MakeUserPointer(&num_bytes), false));
|
| + EXPECT_TRUE(read_buffer);
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
| + read_elements = static_cast<const int32_t*>(read_buffer);
|
| + EXPECT_EQ(456, read_elements[0]);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerEndReadData(
|
| + static_cast<uint32_t>(1u * sizeof(elements[0]))));
|
| +
|
| + // Adding a waiter should now succeed.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 56, nullptr));
|
| +
|
| + // Close the producer.
|
| + this->ProducerClose();
|
| +
|
| + // Should be never-readable.
|
| + context = 0;
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + waiter.Wait(test::TinyDeadline(), &context));
|
| + EXPECT_EQ(56u, context);
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
| +
|
| + this->ConsumerClose();
|
| +}
|
| +
|
| +// Tests that data pipes aren't writable/readable during two-phase writes/reads.
|
| +TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
|
| + const MojoCreateDataPipeOptions options = {
|
| + kSizeOfOptions, // |struct_size|.
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
|
| + static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
| + 1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
| + };
|
| + this->Create(options);
|
| + this->DoTransfer();
|
| +
|
| + Waiter pwaiter; // For producer.
|
| + Waiter cwaiter; // For consumer.
|
| + HandleSignalsState hss;
|
| +
|
| + // It should be writable.
|
| + pwaiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
|
| + void* write_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerBeginWriteData(
|
| + MakeUserPointer(&write_ptr),
|
| + MakeUserPointer(&num_bytes), false));
|
| + EXPECT_TRUE(write_ptr);
|
| + EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
|
| +
|
| + // At this point, it shouldn't be writable.
|
| + pwaiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1, nullptr));
|
| + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
|
| + hss = HandleSignalsState();
|
| + this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
|
| + EXPECT_EQ(0u, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // It shouldn't be readable yet either (we'll wait later).
|
| + cwaiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 2, nullptr));
|
| +
|
| + static_cast<int32_t*>(write_ptr)[0] = 123;
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerEndWriteData(
|
| + static_cast<uint32_t>(1u * sizeof(int32_t))));
|
| +
|
| + // It should immediately be writable again.
|
| + pwaiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // It should become readable.
|
| + EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyDeadline(), nullptr));
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&cwaiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Start another two-phase write and check that it's readable even in the
|
| + // middle of it.
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
|
| + write_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerBeginWriteData(
|
| + MakeUserPointer(&write_ptr),
|
| + MakeUserPointer(&num_bytes), false));
|
| + EXPECT_TRUE(write_ptr);
|
| + EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
|
| +
|
| + // It should be readable.
|
| + cwaiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 5, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // End the two-phase write without writing anything.
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerEndWriteData(0u));
|
| +
|
| + // Start a two-phase read.
|
| + num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
|
| + const void* read_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData(
|
| + MakeUserPointer(&read_ptr),
|
| + MakeUserPointer(&num_bytes), false));
|
| + EXPECT_TRUE(read_ptr);
|
| + EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
|
| +
|
| + // At this point, it should still be writable.
|
| + pwaiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->dpp()->ProducerAddAwakable(
|
| + &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 6, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // But not readable.
|
| + cwaiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 7, nullptr));
|
| + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, cwaiter.Wait(0, nullptr));
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&cwaiter, &hss);
|
| + EXPECT_EQ(0u, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // End the two-phase read without reading anything.
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerEndReadData(0u));
|
| +
|
| + // It should be readable again.
|
| + cwaiter.Init();
|
| + hss = HandleSignalsState();
|
| + EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 8, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + this->ProducerClose();
|
| + this->ConsumerClose();
|
| +}
|
| +
|
| +void Seq(int32_t start, size_t count, int32_t* out) {
|
| + for (size_t i = 0; i < count; i++)
|
| + out[i] = start + static_cast<int32_t>(i);
|
| +}
|
| +
|
| +TYPED_TEST(DataPipeImplTest, AllOrNone) {
|
| + const MojoCreateDataPipeOptions options = {
|
| + kSizeOfOptions, // |struct_size|.
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
|
| + static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
| + 10 * sizeof(int32_t) // |capacity_num_bytes|.
|
| + };
|
| + this->Create(options);
|
| + this->DoTransfer();
|
| +
|
| + Waiter waiter;
|
| + HandleSignalsState hss;
|
| +
|
| + // Try writing way too much.
|
| + uint32_t num_bytes = 20u * sizeof(int32_t);
|
| + int32_t buffer[100];
|
| + Seq(0, arraysize(buffer), buffer);
|
| + EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
|
| + this->dpp()->ProducerWriteData(UserPointer<const void>(buffer),
|
| + MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Should still be empty.
|
| + num_bytes = ~0u;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + EXPECT_EQ(0u, num_bytes);
|
| +
|
| + // Add waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr));
|
| +
|
| + // Write some data.
|
| + num_bytes = 5u * sizeof(int32_t);
|
| + Seq(100, arraysize(buffer), buffer);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerWriteData(UserPointer<const void>(buffer),
|
| + MakeUserPointer(&num_bytes), true));
|
| + EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
|
| +
|
| + // Wait for data.
|
| + // TODO(vtl): There's no real guarantee that all the data will become
|
| + // available at once (except that in current implementations, with reasonable
|
| + // limits, it will). Eventually, we'll be able to wait for a specified amount
|
| + // of data to become available.
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Half full.
|
| + num_bytes = 0u;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
|
| +
|
| + // Too much.
|
| + num_bytes = 6u * sizeof(int32_t);
|
| + Seq(200, arraysize(buffer), buffer);
|
| + EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
|
| + this->dpp()->ProducerWriteData(UserPointer<const void>(buffer),
|
| + MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Try reading too much.
|
| + num_bytes = 11u * sizeof(int32_t);
|
| + memset(buffer, 0xab, sizeof(buffer));
|
| + EXPECT_EQ(
|
| + MOJO_RESULT_OUT_OF_RANGE,
|
| + this->dpc()->ConsumerReadData(UserPointer<void>(buffer),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + int32_t expected_buffer[100];
|
| + memset(expected_buffer, 0xab, sizeof(expected_buffer));
|
| + EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
|
| +
|
| + // Try discarding too much.
|
| + num_bytes = 11u * sizeof(int32_t);
|
| + EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpc()->ConsumerDiscardData(
|
| + MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Just a little.
|
| + num_bytes = 2u * sizeof(int32_t);
|
| + Seq(300, arraysize(buffer), buffer);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerWriteData(UserPointer<const void>(buffer),
|
| + MakeUserPointer(&num_bytes), true));
|
| + EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
|
| +
|
| + // Just right.
|
| + num_bytes = 3u * sizeof(int32_t);
|
| + Seq(400, arraysize(buffer), buffer);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerWriteData(UserPointer<const void>(buffer),
|
| + MakeUserPointer(&num_bytes), true));
|
| + EXPECT_EQ(3u * sizeof(int32_t), num_bytes);
|
| +
|
| + // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
|
| + // specified amount of data to be available, so poll.
|
| + const size_t kMaxPoll = 100;
|
| + for (size_t i = 0; i < kMaxPoll; i++) {
|
| + num_bytes = 0u;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + if (num_bytes >= 10u * sizeof(int32_t))
|
| + break;
|
| +
|
| + base::PlatformThread::Sleep(test::EpsilonTimeout());
|
| + }
|
| + EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
|
| +
|
| + // Read half.
|
| + num_bytes = 5u * sizeof(int32_t);
|
| + memset(buffer, 0xab, sizeof(buffer));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
|
| + UserPointer<void>(buffer),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
|
| + memset(expected_buffer, 0xab, sizeof(expected_buffer));
|
| + Seq(100, 5, expected_buffer);
|
| + EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
|
| +
|
| + // Try reading too much again.
|
| + num_bytes = 6u * sizeof(int32_t);
|
| + memset(buffer, 0xab, sizeof(buffer));
|
| + EXPECT_EQ(
|
| + MOJO_RESULT_OUT_OF_RANGE,
|
| + this->dpc()->ConsumerReadData(UserPointer<void>(buffer),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + memset(expected_buffer, 0xab, sizeof(expected_buffer));
|
| + EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
|
| +
|
| + // Try discarding too much again.
|
| + num_bytes = 6u * sizeof(int32_t);
|
| + EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpc()->ConsumerDiscardData(
|
| + MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Discard a little.
|
| + num_bytes = 2u * sizeof(int32_t);
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerDiscardData(
|
| + MakeUserPointer(&num_bytes), true));
|
| + EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
|
| +
|
| + // Three left.
|
| + num_bytes = 0u;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + EXPECT_EQ(3u * sizeof(int32_t), num_bytes);
|
| +
|
| + // We'll need to wait for the peer closed to propagate.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2, nullptr));
|
| +
|
| + // Close the producer, then test producer-closed cases.
|
| + this->ProducerClose();
|
| +
|
| + // Wait.
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Try reading too much; "failed precondition" since the producer is closed.
|
| + num_bytes = 4u * sizeof(int32_t);
|
| + memset(buffer, 0xab, sizeof(buffer));
|
| + EXPECT_EQ(
|
| + MOJO_RESULT_FAILED_PRECONDITION,
|
| + this->dpc()->ConsumerReadData(UserPointer<void>(buffer),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + memset(expected_buffer, 0xab, sizeof(expected_buffer));
|
| + EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
|
| +
|
| + // Try discarding too much; "failed precondition" again.
|
| + num_bytes = 4u * sizeof(int32_t);
|
| + EXPECT_EQ(
|
| + MOJO_RESULT_FAILED_PRECONDITION,
|
| + this->dpc()->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Read a little.
|
| + num_bytes = 2u * sizeof(int32_t);
|
| + memset(buffer, 0xab, sizeof(buffer));
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
|
| + UserPointer<void>(buffer),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
|
| + memset(expected_buffer, 0xab, sizeof(expected_buffer));
|
| + Seq(400, 2, expected_buffer);
|
| + EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
|
| +
|
| + // Discard the remaining element.
|
| + num_bytes = 1u * sizeof(int32_t);
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerDiscardData(
|
| + MakeUserPointer(&num_bytes), true));
|
| + EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
|
| +
|
| + // Empty again.
|
| + num_bytes = ~0u;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + EXPECT_EQ(0u, num_bytes);
|
| +
|
| + this->ConsumerClose();
|
| +}
|
| +
|
| +TYPED_TEST(DataPipeImplTest, TwoPhaseAllOrNone) {
|
| + const MojoCreateDataPipeOptions options = {
|
| + kSizeOfOptions, // |struct_size|.
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
|
| + static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
| + 10 * sizeof(int32_t) // |capacity_num_bytes|.
|
| + };
|
| + this->Create(options);
|
| + this->DoTransfer();
|
| +
|
| + Waiter waiter;
|
| + HandleSignalsState hss;
|
| +
|
| + // Try writing way too much (two-phase).
|
| + uint32_t num_bytes = 20u * sizeof(int32_t);
|
| + void* write_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpp()->ProducerBeginWriteData(
|
| + MakeUserPointer(&write_ptr),
|
| + MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Try writing an amount which isn't a multiple of the element size
|
| + // (two-phase).
|
| + static_assert(sizeof(int32_t) > 1u, "Wow! int32_t's have size 1");
|
| + num_bytes = 1u;
|
| + write_ptr = nullptr;
|
| + EXPECT_EQ(
|
| + MOJO_RESULT_INVALID_ARGUMENT,
|
| + this->dpp()->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
|
| + MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Try reading way too much (two-phase).
|
| + num_bytes = 20u * sizeof(int32_t);
|
| + const void* read_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
|
| + this->dpc()->ConsumerBeginReadData(
|
| + MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Add waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr));
|
| +
|
| + // Write half (two-phase).
|
| + num_bytes = 5u * sizeof(int32_t);
|
| + write_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerBeginWriteData(
|
| + MakeUserPointer(&write_ptr),
|
| + MakeUserPointer(&num_bytes), true));
|
| + // May provide more space than requested.
|
| + EXPECT_GE(num_bytes, 5u * sizeof(int32_t));
|
| + EXPECT_TRUE(write_ptr);
|
| + Seq(0, 5, static_cast<int32_t*>(write_ptr));
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpp()->ProducerEndWriteData(5u * sizeof(int32_t)));
|
| +
|
| + // Wait for data.
|
| + // TODO(vtl): (See corresponding TODO in AllOrNone.)
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Try reading an amount which isn't a multiple of the element size
|
| + // (two-phase).
|
| + num_bytes = 1u;
|
| + read_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
|
| + this->dpc()->ConsumerBeginReadData(
|
| + MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
|
| +
|
| + // Read one (two-phase).
|
| + num_bytes = 1u * sizeof(int32_t);
|
| + read_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerBeginReadData(
|
| + MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
|
| + EXPECT_GE(num_bytes, 1u * sizeof(int32_t));
|
| + EXPECT_EQ(0, static_cast<const int32_t*>(read_ptr)[0]);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerEndReadData(1u * sizeof(int32_t)));
|
| +
|
| + // We should have four left, leaving room for six.
|
| + num_bytes = 0u;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + EXPECT_EQ(4u * sizeof(int32_t), num_bytes);
|
| +
|
| + // Assuming a tight circular buffer of the specified capacity, we can't do a
|
| + // two-phase write of six now.
|
| + num_bytes = 6u * sizeof(int32_t);
|
| + write_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpp()->ProducerBeginWriteData(
|
| + MakeUserPointer(&write_ptr),
|
| + MakeUserPointer(&num_bytes), true));
|
| +
|
| + // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
|
| + // specified amount of space to be available, so poll.
|
| + const size_t kMaxPoll = 100;
|
| + for (size_t i = 0; i < kMaxPoll; i++) {
|
| + // Write six elements (simple), filling the buffer.
|
| + num_bytes = 6u * sizeof(int32_t);
|
| + int32_t buffer[100];
|
| + Seq(100, 6, buffer);
|
| + MojoResult result = this->dpp()->ProducerWriteData(
|
| + UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true);
|
| + if (result == MOJO_RESULT_OK)
|
| + break;
|
| + EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, result);
|
| +
|
| + base::PlatformThread::Sleep(test::EpsilonTimeout());
|
| + }
|
| + EXPECT_EQ(6u * sizeof(int32_t), num_bytes);
|
| +
|
| + // TODO(vtl): Hack: poll again.
|
| + for (size_t i = 0; i < kMaxPoll; i++) {
|
| + // We have ten.
|
| + num_bytes = 0u;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
|
| + if (num_bytes >= 10u * sizeof(int32_t))
|
| + break;
|
| +
|
| + base::PlatformThread::Sleep(test::EpsilonTimeout());
|
| + }
|
| + EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
|
| +
|
| + // Note: Whether a two-phase read of ten would fail here or not is
|
| + // implementation-dependent.
|
| +
|
| + // Add waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2, nullptr));
|
| +
|
| + // Close the producer.
|
| + this->ProducerClose();
|
| +
|
| + // A two-phase read of nine should work.
|
| + num_bytes = 9u * sizeof(int32_t);
|
| + read_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerBeginReadData(
|
| + MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
|
| + EXPECT_GE(num_bytes, 9u * sizeof(int32_t));
|
| + EXPECT_EQ(1, static_cast<const int32_t*>(read_ptr)[0]);
|
| + EXPECT_EQ(2, static_cast<const int32_t*>(read_ptr)[1]);
|
| + EXPECT_EQ(3, static_cast<const int32_t*>(read_ptr)[2]);
|
| + EXPECT_EQ(4, static_cast<const int32_t*>(read_ptr)[3]);
|
| + EXPECT_EQ(100, static_cast<const int32_t*>(read_ptr)[4]);
|
| + EXPECT_EQ(101, static_cast<const int32_t*>(read_ptr)[5]);
|
| + EXPECT_EQ(102, static_cast<const int32_t*>(read_ptr)[6]);
|
| + EXPECT_EQ(103, static_cast<const int32_t*>(read_ptr)[7]);
|
| + EXPECT_EQ(104, static_cast<const int32_t*>(read_ptr)[8]);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->dpc()->ConsumerEndReadData(9u * sizeof(int32_t)));
|
| +
|
| + // Wait for peer closed.
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
|
| + hss = HandleSignalsState();
|
| + this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // A two-phase read of two should fail, with "failed precondition".
|
| + num_bytes = 2u * sizeof(int32_t);
|
| + read_ptr = nullptr;
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + this->dpc()->ConsumerBeginReadData(
|
| + MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
|
| +
|
| + this->ConsumerClose();
|
| +}
|
| +
|
| +} // namespace
|
| +} // namespace system
|
| +} // namespace mojo
|
|
|