Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Unified Diff: third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc

Issue 1676913002: [mojo] Delete third_party/mojo (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: let's try that again Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc
diff --git a/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc b/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc
deleted file mode 100644
index efe8bb5df30fe9916f940ed35e775eaa8bb1163b..0000000000000000000000000000000000000000
--- a/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc
+++ /dev/null
@@ -1,2438 +0,0 @@
-// Copyright 2015 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-// This file contains tests that are shared between different implementations of
-// |DataPipeImpl|.
-
-#include "third_party/mojo/src/mojo/edk/system/data_pipe_impl.h"
-
-#include <stdint.h>
-
-#include "base/bind.h"
-#include "base/location.h"
-#include "base/logging.h"
-#include "base/memory/scoped_ptr.h"
-#include "base/message_loop/message_loop.h"
-#include "base/test/test_io_thread.h"
-#include "mojo/public/cpp/system/macros.h"
-#include "testing/gtest/include/gtest/gtest.h"
-#include "third_party/mojo/src/mojo/edk/embedder/platform_channel_pair.h"
-#include "third_party/mojo/src/mojo/edk/embedder/simple_platform_support.h"
-#include "third_party/mojo/src/mojo/edk/system/channel.h"
-#include "third_party/mojo/src/mojo/edk/system/channel_endpoint.h"
-#include "third_party/mojo/src/mojo/edk/system/data_pipe.h"
-#include "third_party/mojo/src/mojo/edk/system/data_pipe_consumer_dispatcher.h"
-#include "third_party/mojo/src/mojo/edk/system/data_pipe_producer_dispatcher.h"
-#include "third_party/mojo/src/mojo/edk/system/memory.h"
-#include "third_party/mojo/src/mojo/edk/system/message_pipe.h"
-#include "third_party/mojo/src/mojo/edk/system/raw_channel.h"
-#include "third_party/mojo/src/mojo/edk/system/test_utils.h"
-#include "third_party/mojo/src/mojo/edk/system/waiter.h"
-
-namespace mojo {
-namespace system {
-namespace {
-
-const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE |
- MOJO_HANDLE_SIGNAL_WRITABLE |
- MOJO_HANDLE_SIGNAL_PEER_CLOSED;
-const uint32_t kSizeOfOptions =
- static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
-
-// In various places, we have to poll (since, e.g., we can't yet wait for a
-// certain amount of data to be available). This is the maximum number of
-// iterations (separated by a short sleep).
-// TODO(vtl): Get rid of this.
-const size_t kMaxPoll = 100;
-
-// DataPipeImplTestHelper ------------------------------------------------------
-
-class DataPipeImplTestHelper {
- public:
- virtual ~DataPipeImplTestHelper() {}
-
- virtual void SetUp() = 0;
- virtual void TearDown() = 0;
-
- virtual void Create(const MojoCreateDataPipeOptions& validated_options) = 0;
-
- // Returns true if the producer and consumer exhibit the behavior that you'd
- // expect from a pure circular buffer implementation (reflected to two-phase
- // reads and writes).
- virtual bool IsStrictCircularBuffer() const = 0;
-
- // Possibly transfers the producer/consumer.
- virtual void DoTransfer() = 0;
-
- // Returns the |DataPipe| object for the producer and consumer, respectively.
- virtual DataPipe* DataPipeForProducer() = 0;
- virtual DataPipe* DataPipeForConsumer() = 0;
-
- // Closes the producer and consumer, respectively. (Other operations go
- // through the above accessors; closing is special since it may require that a
- // dispatcher be closed.)
- virtual void ProducerClose() = 0;
- virtual void ConsumerClose() = 0;
-
- protected:
- DataPipeImplTestHelper() {}
-
- private:
- MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeImplTestHelper);
-};
-
-// DataPipeImplTest ------------------------------------------------------------
-
-template <class Helper>
-class DataPipeImplTest : public testing::Test {
- public:
- DataPipeImplTest() {}
- ~DataPipeImplTest() override {}
-
- void SetUp() override { Reset(); }
- void TearDown() override { helper_->TearDown(); }
-
- protected:
- void Create(const MojoCreateDataPipeOptions& options) {
- MojoCreateDataPipeOptions validated_options = {};
- ASSERT_EQ(MOJO_RESULT_OK,
- DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
- &validated_options));
- helper_->Create(validated_options);
- }
-
- bool IsStrictCircularBuffer() const {
- return helper_->IsStrictCircularBuffer();
- }
-
- void DoTransfer() { return helper_->DoTransfer(); }
-
- void Reset() {
- if (helper_)
- helper_->TearDown();
-
- helper_.reset(new Helper());
- helper_->SetUp();
- }
-
- void ProducerClose() { helper_->ProducerClose(); }
- MojoResult ProducerWriteData(UserPointer<const void> elements,
- UserPointer<uint32_t> num_bytes,
- bool all_or_none) {
- return dpp()->ProducerWriteData(elements, num_bytes, all_or_none);
- }
- MojoResult ProducerBeginWriteData(UserPointer<void*> buffer,
- UserPointer<uint32_t> buffer_num_bytes) {
- return dpp()->ProducerBeginWriteData(buffer, buffer_num_bytes);
- }
- MojoResult ProducerEndWriteData(uint32_t num_bytes_written) {
- return dpp()->ProducerEndWriteData(num_bytes_written);
- }
- MojoResult ProducerAddAwakable(Awakable* awakable,
- MojoHandleSignals signals,
- uintptr_t context,
- HandleSignalsState* signals_state) {
- return dpp()->ProducerAddAwakable(awakable, signals, context,
- signals_state);
- }
- void ProducerRemoveAwakable(Awakable* awakable,
- HandleSignalsState* signals_state) {
- return dpp()->ProducerRemoveAwakable(awakable, signals_state);
- }
-
- void ConsumerClose() { helper_->ConsumerClose(); }
- MojoResult ConsumerReadData(UserPointer<void> elements,
- UserPointer<uint32_t> num_bytes,
- bool all_or_none,
- bool peek) {
- return dpc()->ConsumerReadData(elements, num_bytes, all_or_none, peek);
- }
- MojoResult ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
- bool all_or_none) {
- return dpc()->ConsumerDiscardData(num_bytes, all_or_none);
- }
- MojoResult ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
- return dpc()->ConsumerQueryData(num_bytes);
- }
- MojoResult ConsumerBeginReadData(UserPointer<const void*> buffer,
- UserPointer<uint32_t> buffer_num_bytes) {
- return dpc()->ConsumerBeginReadData(buffer, buffer_num_bytes);
- }
- MojoResult ConsumerEndReadData(uint32_t num_bytes_read) {
- return dpc()->ConsumerEndReadData(num_bytes_read);
- }
- MojoResult ConsumerAddAwakable(Awakable* awakable,
- MojoHandleSignals signals,
- uintptr_t context,
- HandleSignalsState* signals_state) {
- return dpc()->ConsumerAddAwakable(awakable, signals, context,
- signals_state);
- }
- void ConsumerRemoveAwakable(Awakable* awakable,
- HandleSignalsState* signals_state) {
- return dpc()->ConsumerRemoveAwakable(awakable, signals_state);
- }
-
- private:
- DataPipe* dpp() { return helper_->DataPipeForProducer(); }
- DataPipe* dpc() { return helper_->DataPipeForConsumer(); }
-
- scoped_ptr<Helper> helper_;
-
- MOJO_DISALLOW_COPY_AND_ASSIGN(DataPipeImplTest);
-};
-
-// LocalDataPipeImplTestHelper -------------------------------------------------
-
-class LocalDataPipeImplTestHelper : public DataPipeImplTestHelper {
- public:
- LocalDataPipeImplTestHelper() {}
- ~LocalDataPipeImplTestHelper() override {}
-
- void SetUp() override {}
- void TearDown() override {}
-
- void Create(const MojoCreateDataPipeOptions& validated_options) override {
- CHECK(!dp_);
- dp_ = DataPipe::CreateLocal(validated_options);
- }
-
- bool IsStrictCircularBuffer() const override { return true; }
-
- void DoTransfer() override {}
-
- // Returns the |DataPipe| object for the producer and consumer, respectively.
- DataPipe* DataPipeForProducer() override { return dp_.get(); }
- DataPipe* DataPipeForConsumer() override { return dp_.get(); }
-
- void ProducerClose() override { dp_->ProducerClose(); }
- void ConsumerClose() override { dp_->ConsumerClose(); }
-
- private:
- scoped_refptr<DataPipe> dp_;
-
- MOJO_DISALLOW_COPY_AND_ASSIGN(LocalDataPipeImplTestHelper);
-};
-
-// RemoteDataPipeImplTestHelper ------------------------------------------------
-
-// Base class for |Remote{Producer,Consumer}DataPipeImplTestHelper|.
-class RemoteDataPipeImplTestHelper : public DataPipeImplTestHelper {
- public:
- RemoteDataPipeImplTestHelper() : io_thread_(base::TestIOThread::kAutoStart) {}
- ~RemoteDataPipeImplTestHelper() override {}
-
- void SetUp() override {
- scoped_refptr<ChannelEndpoint> ep[2];
- message_pipes_[0] = MessagePipe::CreateLocalProxy(&ep[0]);
- message_pipes_[1] = MessagePipe::CreateLocalProxy(&ep[1]);
-
- io_thread_.PostTaskAndWait(
- FROM_HERE, base::Bind(&RemoteDataPipeImplTestHelper::SetUpOnIOThread,
- base::Unretained(this), ep[0], ep[1]));
- }
-
- void TearDown() override {
- EnsureMessagePipeClosed(0);
- EnsureMessagePipeClosed(1);
- io_thread_.PostTaskAndWait(
- FROM_HERE, base::Bind(&RemoteDataPipeImplTestHelper::TearDownOnIOThread,
- base::Unretained(this)));
- }
-
- void Create(const MojoCreateDataPipeOptions& validated_options) override {
- CHECK(!dp_);
- dp_ = DataPipe::CreateLocal(validated_options);
- }
-
- bool IsStrictCircularBuffer() const override { return false; }
-
- protected:
- void SendDispatcher(size_t source_i,
- scoped_refptr<Dispatcher> to_send,
- scoped_refptr<Dispatcher>* to_receive) {
- DCHECK(source_i == 0 || source_i == 1);
- size_t dest_i = source_i ^ 1;
-
- // Write the dispatcher to MP |source_i| (port 0). Wait and receive on MP
- // |dest_i| (port 0). (Add the waiter first, to avoid any handling the case
- // where it's already readable.)
- Waiter waiter;
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- message_pipe(dest_i)->AddAwakable(
- 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 987, nullptr));
- {
- DispatcherTransport transport(
- test::DispatcherTryStartTransport(to_send.get()));
- ASSERT_TRUE(transport.is_valid());
-
- std::vector<DispatcherTransport> transports;
- transports.push_back(transport);
- ASSERT_EQ(MOJO_RESULT_OK, message_pipe(source_i)->WriteMessage(
- 0, NullUserPointer(), 0, &transports,
- MOJO_WRITE_MESSAGE_FLAG_NONE));
- transport.End();
- }
- uintptr_t context = 0;
- ASSERT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
- EXPECT_EQ(987u, context);
- HandleSignalsState hss = HandleSignalsState();
- message_pipe(dest_i)->RemoveAwakable(0, &waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
- hss.satisfied_signals);
- EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
- char read_buffer[100] = {};
- uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
- DispatcherVector read_dispatchers;
- uint32_t read_num_dispatchers = 10; // Maximum to get.
- ASSERT_EQ(MOJO_RESULT_OK,
- message_pipe(dest_i)->ReadMessage(
- 0, UserPointer<void>(read_buffer),
- MakeUserPointer(&read_buffer_size), &read_dispatchers,
- &read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE));
- EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size));
- ASSERT_EQ(1u, read_dispatchers.size());
- ASSERT_EQ(1u, read_num_dispatchers);
- ASSERT_TRUE(read_dispatchers[0]);
- EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
-
- *to_receive = read_dispatchers[0];
- }
-
- scoped_refptr<MessagePipe> message_pipe(size_t i) {
- return message_pipes_[i];
- }
- scoped_refptr<DataPipe> dp() { return dp_; }
-
- private:
- void EnsureMessagePipeClosed(size_t i) {
- if (!message_pipes_[i])
- return;
- message_pipes_[i]->Close(0);
- message_pipes_[i] = nullptr;
- }
-
- void SetUpOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
- scoped_refptr<ChannelEndpoint> ep1) {
- CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop());
-
- embedder::PlatformChannelPair channel_pair;
- channels_[0] = new Channel(&platform_support_);
- channels_[0]->Init(RawChannel::Create(channel_pair.PassServerHandle()));
- channels_[0]->SetBootstrapEndpoint(ep0);
- channels_[1] = new Channel(&platform_support_);
- channels_[1]->Init(RawChannel::Create(channel_pair.PassClientHandle()));
- channels_[1]->SetBootstrapEndpoint(ep1);
- }
-
- void TearDownOnIOThread() {
- CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop());
-
- if (channels_[0]) {
- channels_[0]->Shutdown();
- channels_[0] = nullptr;
- }
- if (channels_[1]) {
- channels_[1]->Shutdown();
- channels_[1] = nullptr;
- }
- }
-
- embedder::SimplePlatformSupport platform_support_;
- base::TestIOThread io_thread_;
- scoped_refptr<Channel> channels_[2];
- scoped_refptr<MessagePipe> message_pipes_[2];
-
- scoped_refptr<DataPipe> dp_;
-
- MOJO_DISALLOW_COPY_AND_ASSIGN(RemoteDataPipeImplTestHelper);
-};
-
-// RemoteProducerDataPipeImplTestHelper ----------------------------------------
-
-// Note about naming confusion: This class is named after the "local" class,
-// i.e., |dp_| will have a |RemoteProducerDataPipeImpl|. The remote side, of
-// course, will have a |RemoteConsumerDataPipeImpl|.
-class RemoteProducerDataPipeImplTestHelper
- : public RemoteDataPipeImplTestHelper {
- public:
- RemoteProducerDataPipeImplTestHelper() {}
- ~RemoteProducerDataPipeImplTestHelper() override {}
-
- void DoTransfer() override {
- // This is the producer dispatcher we'll send.
- scoped_refptr<DataPipeProducerDispatcher> to_send =
- DataPipeProducerDispatcher::Create();
- to_send->Init(dp());
- scoped_refptr<Dispatcher> to_receive;
- SendDispatcher(0, to_send, &to_receive);
- // |to_send| should have been closed. This is |DCHECK()|ed when it is
- // destroyed.
- EXPECT_TRUE(to_send->HasOneRef());
- to_send = nullptr;
-
- ASSERT_EQ(Dispatcher::Type::DATA_PIPE_PRODUCER, to_receive->GetType());
- producer_dispatcher_ =
- static_cast<DataPipeProducerDispatcher*>(to_receive.get());
- }
-
- DataPipe* DataPipeForProducer() override {
- if (producer_dispatcher_)
- return producer_dispatcher_->GetDataPipeForTest();
- return dp().get();
- }
- DataPipe* DataPipeForConsumer() override { return dp().get(); }
-
- void ProducerClose() override {
- if (producer_dispatcher_)
- ASSERT_EQ(MOJO_RESULT_OK, producer_dispatcher_->Close());
- else
- dp()->ProducerClose();
- }
- void ConsumerClose() override { dp()->ConsumerClose(); }
-
- protected:
- scoped_refptr<DataPipeProducerDispatcher> producer_dispatcher_;
-
- private:
- MOJO_DISALLOW_COPY_AND_ASSIGN(RemoteProducerDataPipeImplTestHelper);
-};
-
-// RemoteConsumerDataPipeImplTestHelper ----------------------------------------
-
-// Note about naming confusion: This class is named after the "local" class,
-// i.e., |dp_| will have a |RemoteConsumerDataPipeImpl|. The remote side, of
-// course, will have a |RemoteProducerDataPipeImpl|.
-class RemoteConsumerDataPipeImplTestHelper
- : public RemoteDataPipeImplTestHelper {
- public:
- RemoteConsumerDataPipeImplTestHelper() {}
- ~RemoteConsumerDataPipeImplTestHelper() override {}
-
- void DoTransfer() override {
- // This is the consumer dispatcher we'll send.
- scoped_refptr<DataPipeConsumerDispatcher> to_send =
- DataPipeConsumerDispatcher::Create();
- to_send->Init(dp());
- scoped_refptr<Dispatcher> to_receive;
- SendDispatcher(0, to_send, &to_receive);
- // |to_send| should have been closed. This is |DCHECK()|ed when it is
- // destroyed.
- EXPECT_TRUE(to_send->HasOneRef());
- to_send = nullptr;
-
- ASSERT_EQ(Dispatcher::Type::DATA_PIPE_CONSUMER, to_receive->GetType());
- consumer_dispatcher_ =
- static_cast<DataPipeConsumerDispatcher*>(to_receive.get());
- }
-
- DataPipe* DataPipeForProducer() override { return dp().get(); }
- DataPipe* DataPipeForConsumer() override {
- if (consumer_dispatcher_)
- return consumer_dispatcher_->GetDataPipeForTest();
- return dp().get();
- }
-
- void ProducerClose() override { dp()->ProducerClose(); }
- void ConsumerClose() override {
- if (consumer_dispatcher_)
- ASSERT_EQ(MOJO_RESULT_OK, consumer_dispatcher_->Close());
- else
- dp()->ConsumerClose();
- }
-
- protected:
- scoped_refptr<DataPipeConsumerDispatcher> consumer_dispatcher_;
-
- private:
- MOJO_DISALLOW_COPY_AND_ASSIGN(RemoteConsumerDataPipeImplTestHelper);
-};
-
-// RemoteProducerDataPipeImplTestHelper2 ---------------------------------------
-
-// This is like |RemoteProducerDataPipeImplTestHelper|, but |DoTransfer()| does
-// a second transfer. This thus tests passing a producer handle twice, and in
-// particular tests (some of) |RemoteConsumerDataPipeImpl|'s
-// |ProducerEndSerialize()| (instead of |LocalDataPipeImpl|'s).
-//
-// Note about naming confusion: This class is named after the "local" class,
-// i.e., |dp_| will have a |RemoteProducerDataPipeImpl|. The remote side, of
-// course, will have a |RemoteConsumerDataPipeImpl|.
-class RemoteProducerDataPipeImplTestHelper2
- : public RemoteProducerDataPipeImplTestHelper {
- public:
- RemoteProducerDataPipeImplTestHelper2() {}
- ~RemoteProducerDataPipeImplTestHelper2() override {}
-
- void DoTransfer() override {
- // This is the producer dispatcher we'll send.
- scoped_refptr<DataPipeProducerDispatcher> to_send =
- DataPipeProducerDispatcher::Create();
- to_send->Init(dp());
- scoped_refptr<Dispatcher> to_receive;
- SendDispatcher(0, to_send, &to_receive);
- // |to_send| should have been closed. This is |DCHECK()|ed when it is
- // destroyed.
- EXPECT_TRUE(to_send->HasOneRef());
- to_send = nullptr;
- ASSERT_EQ(Dispatcher::Type::DATA_PIPE_PRODUCER, to_receive->GetType());
- to_send = static_cast<DataPipeProducerDispatcher*>(to_receive.get());
- to_receive = nullptr;
-
- // Now send it back the other way.
- SendDispatcher(1, to_send, &to_receive);
- // |producer_dispatcher_| should have been closed. This is |DCHECK()|ed when
- // it is destroyed.
- EXPECT_TRUE(to_send->HasOneRef());
- to_send = nullptr;
-
- ASSERT_EQ(Dispatcher::Type::DATA_PIPE_PRODUCER, to_receive->GetType());
- producer_dispatcher_ =
- static_cast<DataPipeProducerDispatcher*>(to_receive.get());
- }
-
- private:
- MOJO_DISALLOW_COPY_AND_ASSIGN(RemoteProducerDataPipeImplTestHelper2);
-};
-
-// RemoteConsumerDataPipeImplTestHelper2 ---------------------------------------
-
-// This is like |RemoteConsumerDataPipeImplTestHelper|, but |DoTransfer()| does
-// a second transfer. This thus tests passing a consumer handle twice, and in
-// particular tests (some of) |RemoteProducerDataPipeImpl|'s
-// |ConsumerEndSerialize()| (instead of |LocalDataPipeImpl|'s).
-//
-// Note about naming confusion: This class is named after the "local" class,
-// i.e., |dp_| will have a |RemoteConsumerDataPipeImpl|. The remote side, of
-// course, will have a |RemoteProducerDataPipeImpl|.
-class RemoteConsumerDataPipeImplTestHelper2
- : public RemoteConsumerDataPipeImplTestHelper {
- public:
- RemoteConsumerDataPipeImplTestHelper2() {}
- ~RemoteConsumerDataPipeImplTestHelper2() override {}
-
- void DoTransfer() override {
- // This is the consumer dispatcher we'll send.
- scoped_refptr<DataPipeConsumerDispatcher> to_send =
- DataPipeConsumerDispatcher::Create();
- to_send->Init(dp());
- scoped_refptr<Dispatcher> to_receive;
- SendDispatcher(0, to_send, &to_receive);
- // |to_send| should have been closed. This is |DCHECK()|ed when it is
- // destroyed.
- EXPECT_TRUE(to_send->HasOneRef());
- to_send = nullptr;
- ASSERT_EQ(Dispatcher::Type::DATA_PIPE_CONSUMER, to_receive->GetType());
- to_send = static_cast<DataPipeConsumerDispatcher*>(to_receive.get());
- to_receive = nullptr;
-
- // Now send it back the other way.
- SendDispatcher(1, to_send, &to_receive);
- // |consumer_dispatcher_| should have been closed. This is |DCHECK()|ed when
- // it is destroyed.
- EXPECT_TRUE(to_send->HasOneRef());
- to_send = nullptr;
-
- ASSERT_EQ(Dispatcher::Type::DATA_PIPE_CONSUMER, to_receive->GetType());
- consumer_dispatcher_ =
- static_cast<DataPipeConsumerDispatcher*>(to_receive.get());
- }
-
- private:
- MOJO_DISALLOW_COPY_AND_ASSIGN(RemoteConsumerDataPipeImplTestHelper2);
-};
-
-// Test case instantiation -----------------------------------------------------
-
-using HelperTypes = testing::Types<LocalDataPipeImplTestHelper,
- RemoteProducerDataPipeImplTestHelper,
- RemoteConsumerDataPipeImplTestHelper,
- RemoteProducerDataPipeImplTestHelper2,
- RemoteConsumerDataPipeImplTestHelper2>;
-
-TYPED_TEST_CASE(DataPipeImplTest, HelperTypes);
-
-// Tests -----------------------------------------------------------------------
-
-// Tests creation (and possibly also transferring) of data pipes with various
-// (valid) options.
-TYPED_TEST(DataPipeImplTest, CreateAndMaybeTransfer) {
- MojoCreateDataPipeOptions test_options[] = {
- // Default options -- we'll initialize this below.
- {},
- // Trivial element size, non-default capacity.
- {kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- 1, // |element_num_bytes|.
- 1000}, // |capacity_num_bytes|.
- // Nontrivial element size, non-default capacity.
- {kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- 4, // |element_num_bytes|.
- 4000}, // |capacity_num_bytes|.
- // Nontrivial element size, default capacity.
- {kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- 100, // |element_num_bytes|.
- 0} // |capacity_num_bytes|.
- };
-
- // Initialize the first element of |test_options| to the default options.
- EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateCreateOptions(NullUserPointer(),
- &test_options[0]));
-
- for (size_t i = 0; i < MOJO_ARRAYSIZE(test_options); i++) {
- this->Create(test_options[i]);
- this->DoTransfer();
- this->ProducerClose();
- this->ConsumerClose();
- this->Reset();
- }
-}
-
-TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
- 1000 * sizeof(int32_t) // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
- uintptr_t context;
-
- int32_t elements[10] = {};
- uint32_t num_bytes = 0u;
-
- // Try reading; nothing there yet.
- num_bytes =
- static_cast<uint32_t>(MOJO_ARRAYSIZE(elements) * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), false, false));
-
- // Query; nothing there yet.
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(0u, num_bytes);
-
- // Discard; nothing there yet.
- num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
- this->ConsumerDiscardData(MakeUserPointer(&num_bytes), false));
-
- // Read with invalid |num_bytes|.
- num_bytes = sizeof(elements[0]) + 1;
- EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), false, false));
-
- // For remote data pipes, we'll have to wait; add the waiter before writing.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 123,
- nullptr));
-
- // Write two elements.
- elements[0] = 123;
- elements[1] = 456;
- num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(elements),
- MakeUserPointer(&num_bytes), false));
- // It should have written everything (even without "all or none").
- EXPECT_EQ(2u * sizeof(elements[0]), num_bytes);
-
- // Wait.
- context = 0;
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
- EXPECT_EQ(123u, context);
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Query.
- // TODO(vtl): It's theoretically possible (though not with the current
- // implementation/configured limits) that not all the data has arrived yet.
- // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
- // or |2 * ...|.)
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(2 * sizeof(elements[0]), num_bytes);
-
- // Read one element.
- elements[0] = -1;
- elements[1] = -1;
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), false, false));
- EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
- EXPECT_EQ(123, elements[0]);
- EXPECT_EQ(-1, elements[1]);
-
- // Query.
- // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
- // should get 1 here.)
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(1 * sizeof(elements[0]), num_bytes);
-
- // Peek one element.
- elements[0] = -1;
- elements[1] = -1;
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), false, true));
- EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
- EXPECT_EQ(456, elements[0]);
- EXPECT_EQ(-1, elements[1]);
-
- // Query. Still has 1 element remaining.
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(1 * sizeof(elements[0]), num_bytes);
-
- // Try to read two elements, with "all or none".
- elements[0] = -1;
- elements[1] = -1;
- num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, false));
- EXPECT_EQ(-1, elements[0]);
- EXPECT_EQ(-1, elements[1]);
-
- // Try to read two elements, without "all or none".
- elements[0] = -1;
- elements[1] = -1;
- num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), false, false));
- EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
- EXPECT_EQ(456, elements[0]);
- EXPECT_EQ(-1, elements[1]);
-
- // Query.
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(0u, num_bytes);
-
- this->ProducerClose();
- this->ConsumerClose();
-}
-
-// Note: The "basic" waiting tests test that the "wait states" are correct in
-// various situations; they don't test that waiters are properly awoken on state
-// changes. (For that, we need to use multiple threads.)
-TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
- // Note: We take advantage of the fact that current for current
- // implementations capacities are strict maximums. This is not guaranteed by
- // the API.
-
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
- 2 * sizeof(int32_t) // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter pwaiter; // For producer.
- Waiter cwaiter; // For consumer.
- HandleSignalsState hss;
- uintptr_t context;
-
- // Never readable.
- pwaiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_READABLE, 12,
- &hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Already writable.
- pwaiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 34,
- &hss));
-
- // We'll need to wait for readability for the remote cases.
- cwaiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE,
- 1234, nullptr));
-
- // Write two elements.
- int32_t elements[2] = {123, 456};
- uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(elements),
- MakeUserPointer(&num_bytes), true));
- EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
-
- // Adding a waiter should now succeed.
- pwaiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56,
- nullptr));
- // And it shouldn't be writable yet.
- EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
- hss = HandleSignalsState();
- this->ProducerRemoveAwakable(&pwaiter, &hss);
- EXPECT_EQ(0u, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Wait for data to become available to the consumer.
- context = 0;
- EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(1234u, context);
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&cwaiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Peek one element.
- elements[0] = -1;
- elements[1] = -1;
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, true));
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
- EXPECT_EQ(123, elements[0]);
- EXPECT_EQ(-1, elements[1]);
-
- // Add a waiter.
- pwaiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56,
- nullptr));
- // And it still shouldn't be writable yet.
- EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
- hss = HandleSignalsState();
- this->ProducerRemoveAwakable(&pwaiter, &hss);
- EXPECT_EQ(0u, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Do it again.
- pwaiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 78,
- nullptr));
-
- // Read one element.
- elements[0] = -1;
- elements[1] = -1;
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, false));
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
- EXPECT_EQ(123, elements[0]);
- EXPECT_EQ(-1, elements[1]);
-
- // Waiting should now succeed.
- context = 0;
- EXPECT_EQ(MOJO_RESULT_OK, pwaiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(78u, context);
- hss = HandleSignalsState();
- this->ProducerRemoveAwakable(&pwaiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Try writing, using a two-phase write.
- void* buffer = nullptr;
- num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&buffer),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(buffer);
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
-
- static_cast<int32_t*>(buffer)[0] = 789;
- EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>(
- 1u * sizeof(elements[0]))));
-
- // Add a waiter.
- pwaiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 90,
- nullptr));
-
- // Read one element, using a two-phase read.
- const void* read_buffer = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(read_buffer);
- // Since we only read one element (after having written three in all), the
- // two-phase read should only allow us to read one. This checks an
- // implementation detail!
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
- EXPECT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
- EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
- 1u * sizeof(elements[0]))));
-
- // Waiting should succeed.
- context = 0;
- EXPECT_EQ(MOJO_RESULT_OK, pwaiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(90u, context);
- hss = HandleSignalsState();
- this->ProducerRemoveAwakable(&pwaiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Write one element.
- elements[0] = 123;
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(elements),
- MakeUserPointer(&num_bytes), false));
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
-
- // Add a waiter.
- pwaiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12,
- nullptr));
-
- // Close the consumer.
- this->ConsumerClose();
-
- // It should now be never-writable.
- context = 0;
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- pwaiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(12u, context);
- hss = HandleSignalsState();
- this->ProducerRemoveAwakable(&pwaiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
-
- this->ProducerClose();
-}
-
-TYPED_TEST(DataPipeImplTest, PeerClosedProducerWaiting) {
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
- 2 * sizeof(int32_t) // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
- uintptr_t context;
-
- // Add a waiter.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- 12, nullptr));
-
- // Close the consumer.
- this->ConsumerClose();
-
- // It should be signaled.
- context = 0;
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(12u, context);
- hss = HandleSignalsState();
- this->ProducerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
-
- this->ProducerClose();
-}
-
-TYPED_TEST(DataPipeImplTest, PeerClosedConsumerWaiting) {
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
- 2 * sizeof(int32_t) // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
- uintptr_t context;
-
- // Add a waiter.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- 12, nullptr));
-
- // Close the producer.
- this->ProducerClose();
-
- // It should be signaled.
- context = 0;
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(12u, context);
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
-
- this->ConsumerClose();
-}
-
-TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
- 1000 * sizeof(int32_t) // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- Waiter waiter2;
- HandleSignalsState hss;
- uintptr_t context;
-
- // Never writable.
- waiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12,
- &hss));
- EXPECT_EQ(0u, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Add waiter: not yet readable.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34,
- nullptr));
-
- // Write two elements.
- int32_t elements[2] = {123, 456};
- uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(elements),
- MakeUserPointer(&num_bytes), true));
-
- // Wait for readability (needed for remote cases).
- context = 0;
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(34u, context);
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Discard one element.
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
-
- // Should still be readable.
- waiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78,
- &hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Peek one element.
- elements[0] = -1;
- elements[1] = -1;
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, true));
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
- EXPECT_EQ(456, elements[0]);
- EXPECT_EQ(-1, elements[1]);
-
- // Should still be readable.
- waiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78,
- &hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Read one element.
- elements[0] = -1;
- elements[1] = -1;
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, false));
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
- EXPECT_EQ(456, elements[0]);
- EXPECT_EQ(-1, elements[1]);
-
- // Adding a waiter should now succeed.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 90,
- nullptr));
-
- // Write one element.
- elements[0] = 789;
- elements[1] = -1;
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(elements),
- MakeUserPointer(&num_bytes), true));
-
- // Waiting should now succeed.
- context = 0;
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(90u, context);
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // We'll want to wait for the peer closed signal to propagate.
- waiter.Init();
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- 12, nullptr));
-
- // Close the producer.
- this->ProducerClose();
-
- // Should still be readable, even if the peer closed signal hasn't propagated
- // yet.
- waiter2.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->ConsumerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_READABLE, 34,
- &hss));
- // We don't know if the peer closed signal has propagated yet (for the remote
- // cases).
- EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Wait for the peer closed signal.
- context = 0;
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(12u, context);
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Read one element.
- elements[0] = -1;
- elements[1] = -1;
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, false));
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
- EXPECT_EQ(789, elements[0]);
- EXPECT_EQ(-1, elements[1]);
-
- // Should be never-readable.
- waiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 56,
- &hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
-
- this->ConsumerClose();
-}
-
-// Test with two-phase APIs and also closing the producer with an active
-// consumer waiter.
-TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
- 1000 * sizeof(int32_t) // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
- uintptr_t context;
-
- // Add waiter: not yet readable.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 12,
- nullptr));
-
- // Write two elements.
- int32_t* elements = nullptr;
- void* buffer = nullptr;
- uint32_t num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&buffer),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(buffer);
- EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
- elements = static_cast<int32_t*>(buffer);
- elements[0] = 123;
- elements[1] = 456;
- EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>(
- 2u * sizeof(elements[0]))));
-
- // Wait for readability (needed for remote cases).
- context = 0;
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(12u, context);
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Read one element.
- const void* read_buffer = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(read_buffer);
- EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
- const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
- EXPECT_EQ(123, read_elements[0]);
- EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
- 1u * sizeof(elements[0]))));
-
- // Should still be readable.
- waiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34,
- &hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Read one element.
- // Request three, but not in all-or-none mode.
- read_buffer = nullptr;
- num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(read_buffer);
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
- read_elements = static_cast<const int32_t*>(read_buffer);
- EXPECT_EQ(456, read_elements[0]);
- EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
- 1u * sizeof(elements[0]))));
-
- // Adding a waiter should now succeed.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 56,
- nullptr));
-
- // Close the producer.
- this->ProducerClose();
-
- // Should be never-readable.
- context = 0;
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- waiter.Wait(test::TinyDeadline(), &context));
- EXPECT_EQ(56u, context);
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
-
- this->ConsumerClose();
-}
-
-// Tests that data pipes aren't writable/readable during two-phase writes/reads.
-TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
- 1000 * sizeof(int32_t) // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter pwaiter; // For producer.
- Waiter cwaiter; // For consumer.
- HandleSignalsState hss;
-
- // It should be writable.
- pwaiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 0,
- &hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- void* write_ptr = nullptr;
- uint32_t num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(write_ptr);
- EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
-
- // At this point, it shouldn't be writable.
- pwaiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1,
- nullptr));
- EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
- hss = HandleSignalsState();
- this->ProducerRemoveAwakable(&pwaiter, &hss);
- EXPECT_EQ(0u, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // It shouldn't be readable yet either (we'll wait later).
- cwaiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 2,
- nullptr));
-
- static_cast<int32_t*>(write_ptr)[0] = 123;
- EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(
- static_cast<uint32_t>(1u * sizeof(int32_t))));
-
- // It should immediately be writable again.
- pwaiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3,
- &hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // It should become readable.
- EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&cwaiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Start another two-phase write and check that it's readable even in the
- // middle of it.
- write_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(write_ptr);
- EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
-
- // It should be readable.
- cwaiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 5,
- &hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // End the two-phase write without writing anything.
- EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
-
- // Start a two-phase read.
- const void* read_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(read_ptr);
- EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
-
- // At this point, it should still be writable.
- pwaiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 6,
- &hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // But not readable.
- cwaiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 7,
- nullptr));
- EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, cwaiter.Wait(0, nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&cwaiter, &hss);
- EXPECT_EQ(0u, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // End the two-phase read without reading anything.
- EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u));
-
- // It should be readable again.
- cwaiter.Init();
- hss = HandleSignalsState();
- EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 8,
- &hss));
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- this->ProducerClose();
- this->ConsumerClose();
-}
-
-void Seq(int32_t start, size_t count, int32_t* out) {
- for (size_t i = 0; i < count; i++)
- out[i] = start + static_cast<int32_t>(i);
-}
-
-TYPED_TEST(DataPipeImplTest, AllOrNone) {
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
- 10 * sizeof(int32_t) // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
-
- // Try writing way too much.
- uint32_t num_bytes = 20u * sizeof(int32_t);
- int32_t buffer[100];
- Seq(0, MOJO_ARRAYSIZE(buffer), buffer);
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
- this->ProducerWriteData(UserPointer<const void>(buffer),
- MakeUserPointer(&num_bytes), true));
-
- // Should still be empty.
- num_bytes = ~0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(0u, num_bytes);
-
- // Add waiter.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
- nullptr));
-
- // Write some data.
- num_bytes = 5u * sizeof(int32_t);
- Seq(100, MOJO_ARRAYSIZE(buffer), buffer);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(buffer),
- MakeUserPointer(&num_bytes), true));
- EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
-
- // Wait for data.
- // TODO(vtl): There's no real guarantee that all the data will become
- // available at once (except that in current implementations, with reasonable
- // limits, it will). Eventually, we'll be able to wait for a specified amount
- // of data to become available.
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Half full.
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
-
- // Too much.
- num_bytes = 6u * sizeof(int32_t);
- Seq(200, MOJO_ARRAYSIZE(buffer), buffer);
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
- this->ProducerWriteData(UserPointer<const void>(buffer),
- MakeUserPointer(&num_bytes), true));
-
- // Try reading too much.
- num_bytes = 11u * sizeof(int32_t);
- memset(buffer, 0xab, sizeof(buffer));
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
- this->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), true, false));
- int32_t expected_buffer[100];
- memset(expected_buffer, 0xab, sizeof(expected_buffer));
- EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
-
- // Try discarding too much.
- num_bytes = 11u * sizeof(int32_t);
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
- this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
-
- // Just a little.
- num_bytes = 2u * sizeof(int32_t);
- Seq(300, MOJO_ARRAYSIZE(buffer), buffer);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(buffer),
- MakeUserPointer(&num_bytes), true));
- EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
-
- // Just right.
- num_bytes = 3u * sizeof(int32_t);
- Seq(400, MOJO_ARRAYSIZE(buffer), buffer);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(buffer),
- MakeUserPointer(&num_bytes), true));
- EXPECT_EQ(3u * sizeof(int32_t), num_bytes);
-
- // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
- // specified amount of data to be available, so poll.
- for (size_t i = 0; i < kMaxPoll; i++) {
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- if (num_bytes >= 10u * sizeof(int32_t))
- break;
-
- test::Sleep(test::EpsilonDeadline());
- }
- EXPECT_EQ(10u * sizeof(int32_t), num_bytes);
-
- // Read half.
- num_bytes = 5u * sizeof(int32_t);
- memset(buffer, 0xab, sizeof(buffer));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), true, false));
- EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
- memset(expected_buffer, 0xab, sizeof(expected_buffer));
- Seq(100, 5, expected_buffer);
- EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
-
- // Try reading too much again.
- num_bytes = 6u * sizeof(int32_t);
- memset(buffer, 0xab, sizeof(buffer));
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
- this->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), true, false));
- memset(expected_buffer, 0xab, sizeof(expected_buffer));
- EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
-
- // Try discarding too much again.
- num_bytes = 6u * sizeof(int32_t);
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
- this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
-
- // Discard a little.
- num_bytes = 2u * sizeof(int32_t);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
- EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
-
- // Three left.
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(3u * sizeof(int32_t), num_bytes);
-
- // We'll need to wait for the peer closed to propagate.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- 2, nullptr));
-
- // Close the producer, then test producer-closed cases.
- this->ProducerClose();
-
- // Wait.
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Try reading too much; "failed precondition" since the producer is closed.
- num_bytes = 4u * sizeof(int32_t);
- memset(buffer, 0xab, sizeof(buffer));
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), true, false));
- memset(expected_buffer, 0xab, sizeof(expected_buffer));
- EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
-
- // Try discarding too much; "failed precondition" again.
- num_bytes = 4u * sizeof(int32_t);
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
-
- // Read a little.
- num_bytes = 2u * sizeof(int32_t);
- memset(buffer, 0xab, sizeof(buffer));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), true, false));
- EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
- memset(expected_buffer, 0xab, sizeof(expected_buffer));
- Seq(400, 2, expected_buffer);
- EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
-
- // Discard the remaining element.
- num_bytes = 1u * sizeof(int32_t);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
- EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
-
- // Empty again.
- num_bytes = ~0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(0u, num_bytes);
-
- this->ConsumerClose();
-}
-
-// Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
-// respectively, as much as possible, even if it may have to "wrap around" the
-// internal circular buffer. (Note that the two-phase write and read need not do
-// this.)
-TYPED_TEST(DataPipeImplTest, WrapAround) {
- unsigned char test_data[1000];
- for (size_t i = 0; i < MOJO_ARRAYSIZE(test_data); i++)
- test_data[i] = static_cast<unsigned char>(i);
-
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- 1u, // |element_num_bytes|.
- 100u // |capacity_num_bytes|.
- };
- MojoCreateDataPipeOptions validated_options = {};
- // This test won't be valid if |ValidateCreateOptions()| decides to give the
- // pipe more space.
- EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateCreateOptions(
- MakeUserPointer(&options), &validated_options));
- ASSERT_EQ(100u, validated_options.capacity_num_bytes);
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
-
- // Add waiter.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
- nullptr));
-
- // Write 20 bytes.
- uint32_t num_bytes = 20u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(&test_data[0]),
- MakeUserPointer(&num_bytes), false));
- EXPECT_EQ(20u, num_bytes);
-
- // Wait for data.
- // TODO(vtl): (See corresponding TODO in AllOrNone.)
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Read 10 bytes.
- unsigned char read_buffer[1000] = {0};
- num_bytes = 10u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(read_buffer),
- MakeUserPointer(&num_bytes), false, false));
- EXPECT_EQ(10u, num_bytes);
- EXPECT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
-
- if (this->IsStrictCircularBuffer()) {
- // Check that a two-phase write can now only write (at most) 80 bytes. (This
- // checks an implementation detail; this behavior is not guaranteed.)
- void* write_buffer_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(write_buffer_ptr);
- EXPECT_EQ(80u, num_bytes);
- EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
- }
-
- // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
- size_t total_num_bytes = 0u;
- for (size_t i = 0; i < kMaxPoll; i++) {
- // Write as much data as we can (using |ProducerWriteData()|). We should
- // write 90 bytes (eventually).
- num_bytes = 200u;
- MojoResult result = this->ProducerWriteData(
- UserPointer<const void>(&test_data[20 + total_num_bytes]),
- MakeUserPointer(&num_bytes), false);
- if (result == MOJO_RESULT_OK) {
- total_num_bytes += num_bytes;
- if (total_num_bytes >= 90u)
- break;
- } else {
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, result);
- }
-
- test::Sleep(test::EpsilonDeadline());
- }
- EXPECT_EQ(90u, total_num_bytes);
-
- // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
- for (size_t i = 0; i < kMaxPoll; i++) {
- // We have 100.
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- if (num_bytes >= 100u)
- break;
-
- test::Sleep(test::EpsilonDeadline());
- }
- EXPECT_EQ(100u, num_bytes);
-
- if (this->IsStrictCircularBuffer()) {
- // Check that a two-phase read can now only read (at most) 90 bytes. (This
- // checks an implementation detail; this behavior is not guaranteed.)
- const void* read_buffer_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(read_buffer_ptr);
- EXPECT_EQ(90u, num_bytes);
- EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u));
- }
-
- // Read as much as possible (using |ConsumerReadData()|). We should read 100
- // bytes.
- num_bytes = static_cast<uint32_t>(MOJO_ARRAYSIZE(read_buffer) *
- sizeof(read_buffer[0]));
- memset(read_buffer, 0, num_bytes);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(read_buffer),
- MakeUserPointer(&num_bytes), false, false));
- EXPECT_EQ(100u, num_bytes);
- EXPECT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
-
- this->ProducerClose();
- this->ConsumerClose();
-}
-
-// Tests the behavior of writing (simple and two-phase), closing the producer,
-// then reading (simple and two-phase).
-TYPED_TEST(DataPipeImplTest, WriteCloseProducerRead) {
- const char kTestData[] = "hello world";
- const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
-
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- 1u, // |element_num_bytes|.
- 1000u // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- // Write some data, so we'll have something to read.
- uint32_t num_bytes = kTestDataSize;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(kTestData),
- MakeUserPointer(&num_bytes), false));
- EXPECT_EQ(kTestDataSize, num_bytes);
-
- // Write it again, so we'll have something left over.
- num_bytes = kTestDataSize;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(kTestData),
- MakeUserPointer(&num_bytes), false));
- EXPECT_EQ(kTestDataSize, num_bytes);
-
- // Start two-phase write.
- void* write_buffer_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(write_buffer_ptr);
- EXPECT_GT(num_bytes, 0u);
-
- // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
- for (size_t i = 0; i < kMaxPoll; i++) {
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- if (num_bytes >= 2u * kTestDataSize)
- break;
-
- test::Sleep(test::EpsilonDeadline());
- }
- EXPECT_EQ(2u * kTestDataSize, num_bytes);
-
- // Start two-phase read.
- const void* read_buffer_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(read_buffer_ptr);
- EXPECT_EQ(2u * kTestDataSize, num_bytes);
-
- // Close the producer.
- this->ProducerClose();
-
- // The consumer can finish its two-phase read.
- EXPECT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
- EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(kTestDataSize));
-
- // And start another.
- read_buffer_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(read_buffer_ptr);
- EXPECT_EQ(kTestDataSize, num_bytes);
-
- // Close the consumer, which cancels the two-phase read.
- this->ConsumerClose();
-}
-
-// Tests the behavior of interrupting a two-phase read and write by closing the
-// consumer.
-TYPED_TEST(DataPipeImplTest, TwoPhaseWriteReadCloseConsumer) {
- const char kTestData[] = "hello world";
- const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
-
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- 1u, // |element_num_bytes|.
- 1000u // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
-
- // Add waiter.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
- nullptr));
-
- // Write some data, so we'll have something to read.
- uint32_t num_bytes = kTestDataSize;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(kTestData),
- MakeUserPointer(&num_bytes), false));
- EXPECT_EQ(kTestDataSize, num_bytes);
-
- // Start two-phase write.
- void* write_buffer_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(write_buffer_ptr);
- ASSERT_GT(num_bytes, kTestDataSize);
-
- // Wait for data.
- // TODO(vtl): (See corresponding TODO in AllOrNone.)
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Start two-phase read.
- const void* read_buffer_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(read_buffer_ptr);
- EXPECT_EQ(kTestDataSize, num_bytes);
-
- // Add waiter.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- 1, nullptr));
-
- // Close the consumer.
- this->ConsumerClose();
-
- // Wait for producer to know that the consumer is closed.
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ProducerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
-
- // Actually write some data. (Note: Premature freeing of the buffer would
- // probably only be detected under ASAN or similar.)
- memcpy(write_buffer_ptr, kTestData, kTestDataSize);
- // Note: Even though the consumer has been closed, ending the two-phase
- // write will report success.
- EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(kTestDataSize));
-
- // But trying to write should result in failure.
- num_bytes = kTestDataSize;
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ProducerWriteData(UserPointer<const void>(kTestData),
- MakeUserPointer(&num_bytes), false));
-
- // As will trying to start another two-phase write.
- write_buffer_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
- MakeUserPointer(&num_bytes)));
-
- this->ProducerClose();
-}
-
-// Tests the behavior of "interrupting" a two-phase write by closing both the
-// producer and the consumer.
-TYPED_TEST(DataPipeImplTest, TwoPhaseWriteCloseBoth) {
- const uint32_t kTestDataSize = 15u;
-
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- 1u, // |element_num_bytes|.
- 1000u // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- // Start two-phase write.
- void* write_buffer_ptr = nullptr;
- uint32_t num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_TRUE(write_buffer_ptr);
- ASSERT_GT(num_bytes, kTestDataSize);
-
- this->ConsumerClose();
- this->ProducerClose();
-}
-
-// Tests the behavior of writing, closing the producer, and then reading (with
-// and without data remaining).
-TYPED_TEST(DataPipeImplTest, WriteCloseProducerReadNoData) {
- const char kTestData[] = "hello world";
- const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
-
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- 1u, // |element_num_bytes|.
- 1000u // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
-
- // Write some data, so we'll have something to read.
- uint32_t num_bytes = kTestDataSize;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(kTestData),
- MakeUserPointer(&num_bytes), false));
- EXPECT_EQ(kTestDataSize, num_bytes);
-
- // Add waiter.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- 1, nullptr));
-
- // Close the producer.
- this->ProducerClose();
-
- // Wait. (Note that once the consumer knows that the producer is closed, it
- // must also know about all the data that was sent.)
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Peek that data.
- char buffer[1000];
- num_bytes = static_cast<uint32_t>(sizeof(buffer));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), false, true));
- EXPECT_EQ(kTestDataSize, num_bytes);
- EXPECT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
-
- // Read that data.
- memset(buffer, 0, 1000);
- num_bytes = static_cast<uint32_t>(sizeof(buffer));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), false, false));
- EXPECT_EQ(kTestDataSize, num_bytes);
- EXPECT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
-
- // A second read should fail.
- num_bytes = static_cast<uint32_t>(sizeof(buffer));
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), false, false));
-
- // A two-phase read should also fail.
- const void* read_buffer_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
- MakeUserPointer(&num_bytes)));
-
- // Ditto for discard.
- num_bytes = 10u;
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ConsumerDiscardData(MakeUserPointer(&num_bytes), false));
-
- this->ConsumerClose();
-}
-
-// Tests the behavior of writing, reading (all the data), closing the producer,
-// and then waiting for more data (with no data remaining).
-TYPED_TEST(DataPipeImplTest, WriteReadCloseProducerWaitNoData) {
- const int64_t kTestData = 123456789012345LL;
- const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
-
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- kTestDataSize, // |element_num_bytes|.
- 100u * kTestDataSize // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
-
- // Add waiter.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 0,
- nullptr));
-
- // Write some data, so we'll have something to read.
- uint32_t num_bytes = kTestDataSize;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(&kTestData),
- MakeUserPointer(&num_bytes), false));
- EXPECT_EQ(kTestDataSize, num_bytes);
-
- // Wait.
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Read that data.
- int64_t data[10] = {};
- num_bytes = static_cast<uint32_t>(sizeof(data));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerReadData(UserPointer<void>(data),
- MakeUserPointer(&num_bytes), false, false));
- EXPECT_EQ(kTestDataSize, num_bytes);
- EXPECT_EQ(kTestData, data[0]);
-
- // Add waiter again.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 0,
- nullptr));
-
- // Close the producer.
- this->ProducerClose();
-
- // Wait.
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
-
- this->ConsumerClose();
-}
-
-// During a two-phase read, the consumer is not readable so it may be waited
-// upon (to become readable again). If the producer is closed and the two-phase
-// read consumes the remaining data, that wait should become unsatisfiable.
-TYPED_TEST(DataPipeImplTest, BeginReadCloseProducerWaitEndReadNoData) {
- const int64_t kTestData = 123456789012345LL;
- const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
-
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- kTestDataSize, // |element_num_bytes|.
- 100u * kTestDataSize // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
-
- // Add waiter (for the consumer to become readable).
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 0,
- nullptr));
-
- // Write some data, so we'll have something to read.
- uint32_t num_bytes = kTestDataSize;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(&kTestData),
- MakeUserPointer(&num_bytes), false));
- EXPECT_EQ(kTestDataSize, num_bytes);
-
- // Wait.
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Start a two-phase read.
- num_bytes = 0u;
- const void* read_ptr = nullptr;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_EQ(kTestDataSize, num_bytes);
- EXPECT_EQ(kTestData, static_cast<const int64_t*>(read_ptr)[0]);
-
- // Add waiter (for the producer to be closed).
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- 0, nullptr));
-
- // Close the producer.
- this->ProducerClose();
-
- // Wait for producer close to be detected.
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // Add waiter (for the consumer to become readable).
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 0,
- nullptr));
-
- // Complete the two-phase read.
- EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(kTestDataSize));
-
- // Wait.
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
-
- this->ConsumerClose();
-}
-
-// During a two-phase write, the producer is not writable so it may be waited
-// upon (to become writable again). If the consumer is closed, that wait should
-// become unsatisfiable.
-TYPED_TEST(DataPipeImplTest, BeginWriteCloseConsumerWaitEndWrite) {
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- 1u, // |element_num_bytes|.
- 100u // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter1;
- Waiter waiter2;
- HandleSignalsState hss;
-
- // Start a two-phase write.
- void* write_ptr = nullptr;
- uint32_t num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes)));
-
- // Add waiter (for the consumer to be closed).
- waiter1.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ProducerAddAwakable(&waiter1, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- 0, nullptr));
-
- // Add a separate waiter (for the producer to become writable).
- waiter2.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ProducerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_WRITABLE, 0,
- nullptr));
-
- // Close the consumer.
- this->ConsumerClose();
-
- // Wait for the consumer close to be detected.
- // Note: If we didn't wait for the consumer close to be detected before
- // completing the two-phase write, wait might succeed (in the remote cases).
- // This is because the first |Awake()| "wins".
- EXPECT_EQ(MOJO_RESULT_OK, waiter1.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ProducerRemoveAwakable(&waiter1, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
-
- // Complete the two-phase write (with nothing written).
- EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
-
- // Wait.
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- waiter2.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ProducerRemoveAwakable(&waiter2, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
-
- this->ProducerClose();
-}
-
-// Test that two-phase reads/writes behave correctly when given invalid
-// arguments.
-TYPED_TEST(DataPipeImplTest, TwoPhaseMoreInvalidArguments) {
- const MojoCreateDataPipeOptions options = {
- kSizeOfOptions, // |struct_size|.
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
- static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
- 10 * sizeof(int32_t) // |capacity_num_bytes|.
- };
- this->Create(options);
- this->DoTransfer();
-
- Waiter waiter;
- HandleSignalsState hss;
-
- // No data.
- uint32_t num_bytes = 1000u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(0u, num_bytes);
-
- // Try "ending" a two-phase write when one isn't active.
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ProducerEndWriteData(1u * sizeof(int32_t)));
-
- // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
- // have time to propagate.
- test::Sleep(test::EpsilonDeadline());
-
- // Still no data.
- num_bytes = 1000u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(0u, num_bytes);
-
- // Try ending a two-phase write with an invalid amount (too much).
- void* write_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
- this->ProducerEndWriteData(num_bytes +
- static_cast<uint32_t>(sizeof(int32_t))));
-
- // But the two-phase write still ended.
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, this->ProducerEndWriteData(0u));
-
- // Wait a bit (as above).
- test::Sleep(test::EpsilonDeadline());
-
- // Still no data.
- num_bytes = 1000u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(0u, num_bytes);
-
- // Try ending a two-phase write with an invalid amount (not a multiple of the
- // element size).
- write_ptr = nullptr;
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_GE(num_bytes, 1u);
- EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, this->ProducerEndWriteData(1u));
-
- // But the two-phase write still ended.
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, this->ProducerEndWriteData(0u));
-
- // Wait a bit (as above).
- test::Sleep(test::EpsilonDeadline());
-
- // Still no data.
- num_bytes = 1000u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(0u, num_bytes);
-
- // Add waiter.
- waiter.Init();
- ASSERT_EQ(MOJO_RESULT_OK,
- this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
- nullptr));
-
- // Now write some data, so we'll be able to try reading.
- int32_t element = 123;
- num_bytes = 1u * sizeof(int32_t);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ProducerWriteData(UserPointer<const void>(&element),
- MakeUserPointer(&num_bytes), false));
-
- // Wait for data.
- // TODO(vtl): (See corresponding TODO in AllOrNone.)
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
- hss = HandleSignalsState();
- this->ConsumerRemoveAwakable(&waiter, &hss);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
- hss.satisfiable_signals);
-
- // One element available.
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
-
- // Try "ending" a two-phase read when one isn't active.
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->ConsumerEndReadData(1u * sizeof(int32_t)));
-
- // Still one element available.
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
-
- // Try ending a two-phase read with an invalid amount (too much).
- num_bytes = 0u;
- const void* read_ptr = nullptr;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
- this->ConsumerEndReadData(num_bytes +
- static_cast<uint32_t>(sizeof(int32_t))));
-
- // Still one element available.
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
-
- // Try ending a two-phase read with an invalid amount (not a multiple of the
- // element size).
- num_bytes = 0u;
- read_ptr = nullptr;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
- MakeUserPointer(&num_bytes)));
- EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
- EXPECT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
- EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, this->ConsumerEndReadData(1u));
-
- // Still one element available.
- num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
- EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
-
- this->ProducerClose();
- this->ConsumerClose();
-}
-
-} // namespace
-} // namespace system
-} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698