Index: third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc |
diff --git a/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc b/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b14d3af4c4c7d2c78c3873b3bb6f082bc7e468e7 |
--- /dev/null |
+++ b/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc |
@@ -0,0 +1,1720 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+// This file contains tests that are shared between different implementations of |
+// |DataPipeImpl|. |
+ |
+#include "mojo/edk/system/data_pipe_impl.h" |
+ |
+#include <stdint.h> |
+ |
+#include "base/bind.h" |
+#include "base/location.h" |
+#include "base/logging.h" |
+#include "base/macros.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/test/test_io_thread.h" |
+#include "base/threading/platform_thread.h" // For |Sleep()|. |
+#include "mojo/edk/embedder/platform_channel_pair.h" |
+#include "mojo/edk/embedder/simple_platform_support.h" |
+#include "mojo/edk/system/channel.h" |
+#include "mojo/edk/system/channel_endpoint.h" |
+#include "mojo/edk/system/data_pipe.h" |
+#include "mojo/edk/system/data_pipe_consumer_dispatcher.h" |
+#include "mojo/edk/system/data_pipe_producer_dispatcher.h" |
+#include "mojo/edk/system/memory.h" |
+#include "mojo/edk/system/message_pipe.h" |
+#include "mojo/edk/system/raw_channel.h" |
+#include "mojo/edk/system/test_utils.h" |
+#include "mojo/edk/system/waiter.h" |
+#include "testing/gtest/include/gtest/gtest.h" |
+ |
+namespace mojo { |
+namespace system { |
+namespace { |
+ |
+const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE | |
+ MOJO_HANDLE_SIGNAL_WRITABLE | |
+ MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
+const uint32_t kSizeOfOptions = |
+ static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)); |
+ |
+// DataPipeImplTestHelper ------------------------------------------------------ |
+ |
+class DataPipeImplTestHelper { |
+ public: |
+ virtual ~DataPipeImplTestHelper() {} |
+ |
+ virtual void SetUp() = 0; |
+ virtual void TearDown() = 0; |
+ |
+ virtual void Create(const MojoCreateDataPipeOptions& validated_options) = 0; |
+ |
+ // Possibly transfers the producer/consumer. |
+ virtual void DoTransfer() = 0; |
+ |
+ // Returns the |DataPipe| object for the producer and consumer, respectively. |
+ virtual DataPipe* dpp() = 0; |
+ virtual DataPipe* dpc() = 0; |
+ |
+ virtual void ProducerClose() = 0; |
+ virtual void ConsumerClose() = 0; |
+ |
+ protected: |
+ DataPipeImplTestHelper() {} |
+ |
+ private: |
+ DISALLOW_COPY_AND_ASSIGN(DataPipeImplTestHelper); |
+}; |
+ |
+// DataPipeImplTest ------------------------------------------------------------ |
+ |
+template <class Helper> |
+class DataPipeImplTest : public testing::Test { |
+ public: |
+ DataPipeImplTest() {} |
+ ~DataPipeImplTest() override {} |
+ |
+ void SetUp() override { helper_.SetUp(); } |
+ void TearDown() override { helper_.TearDown(); } |
+ |
+ protected: |
+ void Create(const MojoCreateDataPipeOptions& options) { |
+ MojoCreateDataPipeOptions validated_options = {}; |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ DataPipe::ValidateCreateOptions(MakeUserPointer(&options), |
+ &validated_options)); |
+ helper_.Create(validated_options); |
+ } |
+ |
+ void DoTransfer() { return helper_.DoTransfer(); } |
+ |
+ DataPipe* dpp() { return helper_.dpp(); } |
+ DataPipe* dpc() { return helper_.dpc(); } |
+ |
+ void ProducerClose() { helper_.ProducerClose(); } |
+ void ConsumerClose() { helper_.ConsumerClose(); } |
+ |
+ private: |
+ Helper helper_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(DataPipeImplTest); |
+}; |
+ |
+// LocalDataPipeImplTestHelper ------------------------------------------------- |
+ |
+class LocalDataPipeImplTestHelper : public DataPipeImplTestHelper { |
+ public: |
+ LocalDataPipeImplTestHelper() {} |
+ ~LocalDataPipeImplTestHelper() override {} |
+ |
+ void SetUp() override {} |
+ void TearDown() override {} |
+ |
+ void Create(const MojoCreateDataPipeOptions& validated_options) override { |
+ CHECK(!dp_); |
+ dp_ = DataPipe::CreateLocal(validated_options); |
+ } |
+ |
+ void DoTransfer() override {} |
+ |
+ // Returns the |DataPipe| object for the producer and consumer, respectively. |
+ DataPipe* dpp() override { return dp_.get(); } |
+ DataPipe* dpc() override { return dp_.get(); } |
+ |
+ void ProducerClose() override { dp_->ProducerClose(); } |
+ void ConsumerClose() override { dp_->ConsumerClose(); } |
+ |
+ private: |
+ scoped_refptr<DataPipe> dp_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(LocalDataPipeImplTestHelper); |
+}; |
+ |
+// RemoteDataPipeImplTestHelper ------------------------------------------------ |
+ |
+// Base class for |Remote{Producer,Consumer}DataPipeImplTestHelper|. |
+class RemoteDataPipeImplTestHelper : public DataPipeImplTestHelper { |
+ public: |
+ RemoteDataPipeImplTestHelper() : io_thread_(base::TestIOThread::kAutoStart) {} |
+ ~RemoteDataPipeImplTestHelper() override {} |
+ |
+ void SetUp() override { |
+ scoped_refptr<ChannelEndpoint> ep[2]; |
+ message_pipes_[0] = MessagePipe::CreateLocalProxy(&ep[0]); |
+ message_pipes_[1] = MessagePipe::CreateLocalProxy(&ep[1]); |
+ |
+ io_thread_.PostTaskAndWait( |
+ FROM_HERE, base::Bind(&RemoteDataPipeImplTestHelper::SetUpOnIOThread, |
+ base::Unretained(this), ep[0], ep[1])); |
+ } |
+ |
+ void TearDown() override { |
+ EnsureMessagePipeClosed(0); |
+ EnsureMessagePipeClosed(1); |
+ io_thread_.PostTaskAndWait( |
+ FROM_HERE, base::Bind(&RemoteDataPipeImplTestHelper::TearDownOnIOThread, |
+ base::Unretained(this))); |
+ } |
+ |
+ void Create(const MojoCreateDataPipeOptions& validated_options) override { |
+ CHECK(!dp_); |
+ dp_ = DataPipe::CreateLocal(validated_options); |
+ } |
+ |
+ protected: |
+ void SendDispatcher(size_t source_i, |
+ scoped_refptr<Dispatcher> to_send, |
+ scoped_refptr<Dispatcher>* to_receive) { |
+ DCHECK(source_i == 0 || source_i == 1); |
+ size_t dest_i = source_i ^ 1; |
+ |
+ // Write the dispatcher to MP |source_i| (port 0). Wait and receive on MP |
+ // |dest_i| (port 0). (Add the waiter first, to avoid any handling the case |
+ // where it's already readable.) |
+ Waiter waiter; |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ message_pipe(dest_i)->AddAwakable( |
+ 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 987, nullptr)); |
+ { |
+ DispatcherTransport transport( |
+ test::DispatcherTryStartTransport(to_send.get())); |
+ ASSERT_TRUE(transport.is_valid()); |
+ |
+ std::vector<DispatcherTransport> transports; |
+ transports.push_back(transport); |
+ ASSERT_EQ(MOJO_RESULT_OK, message_pipe(source_i)->WriteMessage( |
+ 0, NullUserPointer(), 0, &transports, |
+ MOJO_WRITE_MESSAGE_FLAG_NONE)); |
+ transport.End(); |
+ } |
+ uint32_t context = 0; |
+ ASSERT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context)); |
+ EXPECT_EQ(987u, context); |
+ HandleSignalsState hss = HandleSignalsState(); |
+ message_pipe(dest_i)->RemoveAwakable(0, &waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, |
+ hss.satisfied_signals); |
+ EXPECT_EQ(kAllSignals, hss.satisfiable_signals); |
+ char read_buffer[100] = {}; |
+ uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); |
+ DispatcherVector read_dispatchers; |
+ uint32_t read_num_dispatchers = 10; // Maximum to get. |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ message_pipe(dest_i)->ReadMessage( |
+ 0, UserPointer<void>(read_buffer), |
+ MakeUserPointer(&read_buffer_size), &read_dispatchers, |
+ &read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE)); |
+ EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size)); |
+ ASSERT_EQ(1u, read_dispatchers.size()); |
+ ASSERT_EQ(1u, read_num_dispatchers); |
+ ASSERT_TRUE(read_dispatchers[0]); |
+ EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); |
+ |
+ *to_receive = read_dispatchers[0]; |
+ } |
+ |
+ scoped_refptr<MessagePipe> message_pipe(size_t i) { |
+ return message_pipes_[i]; |
+ } |
+ scoped_refptr<DataPipe> dp() { return dp_; } |
+ |
+ private: |
+ void EnsureMessagePipeClosed(size_t i) { |
+ if (!message_pipes_[i]) |
+ return; |
+ message_pipes_[i]->Close(0); |
+ message_pipes_[i] = nullptr; |
+ } |
+ |
+ void SetUpOnIOThread(scoped_refptr<ChannelEndpoint> ep0, |
+ scoped_refptr<ChannelEndpoint> ep1) { |
+ CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop()); |
+ |
+ embedder::PlatformChannelPair channel_pair; |
+ channels_[0] = new Channel(&platform_support_); |
+ channels_[0]->Init(RawChannel::Create(channel_pair.PassServerHandle())); |
+ channels_[0]->SetBootstrapEndpoint(ep0); |
+ channels_[1] = new Channel(&platform_support_); |
+ channels_[1]->Init(RawChannel::Create(channel_pair.PassClientHandle())); |
+ channels_[1]->SetBootstrapEndpoint(ep1); |
+ } |
+ |
+ void TearDownOnIOThread() { |
+ CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop()); |
+ |
+ if (channels_[0]) { |
+ channels_[0]->Shutdown(); |
+ channels_[0] = nullptr; |
+ } |
+ if (channels_[1]) { |
+ channels_[1]->Shutdown(); |
+ channels_[1] = nullptr; |
+ } |
+ } |
+ |
+ embedder::SimplePlatformSupport platform_support_; |
+ base::TestIOThread io_thread_; |
+ scoped_refptr<Channel> channels_[2]; |
+ scoped_refptr<MessagePipe> message_pipes_[2]; |
+ |
+ scoped_refptr<DataPipe> dp_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(RemoteDataPipeImplTestHelper); |
+}; |
+ |
+// RemoteProducerDataPipeImplTestHelper ---------------------------------------- |
+ |
+// Note about naming confusion: This class is named after the "local" class, |
+// i.e., |dp_| will have a |RemoteProducerDataPipeImpl|. The remote side, of |
+// course, will have a |RemoteConsumerDataPipeImpl|. |
+class RemoteProducerDataPipeImplTestHelper |
+ : public RemoteDataPipeImplTestHelper { |
+ public: |
+ RemoteProducerDataPipeImplTestHelper() {} |
+ ~RemoteProducerDataPipeImplTestHelper() override {} |
+ |
+ void DoTransfer() override { |
+ // This is the producer dispatcher we'll send. |
+ scoped_refptr<DataPipeProducerDispatcher> to_send = |
+ new DataPipeProducerDispatcher(); |
+ to_send->Init(dp()); |
+ scoped_refptr<Dispatcher> to_receive; |
+ SendDispatcher(0, to_send, &to_receive); |
+ // |to_send| should have been closed. This is |DCHECK()|ed when it is |
+ // destroyed. |
+ EXPECT_TRUE(to_send->HasOneRef()); |
+ to_send = nullptr; |
+ |
+ ASSERT_EQ(Dispatcher::kTypeDataPipeProducer, to_receive->GetType()); |
+ producer_dispatcher_ = |
+ static_cast<DataPipeProducerDispatcher*>(to_receive.get()); |
+ } |
+ |
+ DataPipe* dpp() override { |
+ if (producer_dispatcher_) |
+ return producer_dispatcher_->GetDataPipeForTest(); |
+ return dp().get(); |
+ } |
+ DataPipe* dpc() override { return dp().get(); } |
+ |
+ void ProducerClose() override { |
+ if (producer_dispatcher_) |
+ ASSERT_EQ(MOJO_RESULT_OK, producer_dispatcher_->Close()); |
+ else |
+ dp()->ProducerClose(); |
+ } |
+ void ConsumerClose() override { dp()->ConsumerClose(); } |
+ |
+ protected: |
+ scoped_refptr<DataPipeProducerDispatcher> producer_dispatcher_; |
+ |
+ private: |
+ DISALLOW_COPY_AND_ASSIGN(RemoteProducerDataPipeImplTestHelper); |
+}; |
+ |
+// RemoteConsumerDataPipeImplTestHelper ---------------------------------------- |
+ |
+// Note about naming confusion: This class is named after the "local" class, |
+// i.e., |dp_| will have a |RemoteConsumerDataPipeImpl|. The remote side, of |
+// course, will have a |RemoteProducerDataPipeImpl|. |
+class RemoteConsumerDataPipeImplTestHelper |
+ : public RemoteDataPipeImplTestHelper { |
+ public: |
+ RemoteConsumerDataPipeImplTestHelper() {} |
+ ~RemoteConsumerDataPipeImplTestHelper() override {} |
+ |
+ void DoTransfer() override { |
+ // This is the consumer dispatcher we'll send. |
+ scoped_refptr<DataPipeConsumerDispatcher> to_send = |
+ new DataPipeConsumerDispatcher(); |
+ to_send->Init(dp()); |
+ scoped_refptr<Dispatcher> to_receive; |
+ SendDispatcher(0, to_send, &to_receive); |
+ // |to_send| should have been closed. This is |DCHECK()|ed when it is |
+ // destroyed. |
+ EXPECT_TRUE(to_send->HasOneRef()); |
+ to_send = nullptr; |
+ |
+ ASSERT_EQ(Dispatcher::kTypeDataPipeConsumer, to_receive->GetType()); |
+ consumer_dispatcher_ = |
+ static_cast<DataPipeConsumerDispatcher*>(to_receive.get()); |
+ } |
+ |
+ DataPipe* dpp() override { return dp().get(); } |
+ DataPipe* dpc() override { |
+ if (consumer_dispatcher_) |
+ return consumer_dispatcher_->GetDataPipeForTest(); |
+ return dp().get(); |
+ } |
+ |
+ void ProducerClose() override { dp()->ProducerClose(); } |
+ void ConsumerClose() override { |
+ if (consumer_dispatcher_) |
+ ASSERT_EQ(MOJO_RESULT_OK, consumer_dispatcher_->Close()); |
+ else |
+ dp()->ConsumerClose(); |
+ } |
+ |
+ protected: |
+ scoped_refptr<DataPipeConsumerDispatcher> consumer_dispatcher_; |
+ |
+ private: |
+ DISALLOW_COPY_AND_ASSIGN(RemoteConsumerDataPipeImplTestHelper); |
+}; |
+ |
+// RemoteProducerDataPipeImplTestHelper2 --------------------------------------- |
+ |
+// This is like |RemoteProducerDataPipeImplTestHelper|, but |DoTransfer()| does |
+// a second transfer. This thus tests passing a producer handle twice, and in |
+// particular tests (some of) |RemoteConsumerDataPipeImpl|'s |
+// |ProducerEndSerialize()| (instead of |LocalDataPipeImpl|'s). |
+// |
+// Note about naming confusion: This class is named after the "local" class, |
+// i.e., |dp_| will have a |RemoteProducerDataPipeImpl|. The remote side, of |
+// course, will have a |RemoteConsumerDataPipeImpl|. |
+class RemoteProducerDataPipeImplTestHelper2 |
+ : public RemoteProducerDataPipeImplTestHelper { |
+ public: |
+ RemoteProducerDataPipeImplTestHelper2() {} |
+ ~RemoteProducerDataPipeImplTestHelper2() override {} |
+ |
+ void DoTransfer() override { |
+ // This is the producer dispatcher we'll send. |
+ scoped_refptr<DataPipeProducerDispatcher> to_send = |
+ new DataPipeProducerDispatcher(); |
+ to_send->Init(dp()); |
+ scoped_refptr<Dispatcher> to_receive; |
+ SendDispatcher(0, to_send, &to_receive); |
+ // |to_send| should have been closed. This is |DCHECK()|ed when it is |
+ // destroyed. |
+ EXPECT_TRUE(to_send->HasOneRef()); |
+ to_send = nullptr; |
+ ASSERT_EQ(Dispatcher::kTypeDataPipeProducer, to_receive->GetType()); |
+ to_send = static_cast<DataPipeProducerDispatcher*>(to_receive.get()); |
+ to_receive = nullptr; |
+ |
+ // Now send it back the other way. |
+ SendDispatcher(1, to_send, &to_receive); |
+ // |producer_dispatcher_| should have been closed. This is |DCHECK()|ed when |
+ // it is destroyed. |
+ EXPECT_TRUE(to_send->HasOneRef()); |
+ to_send = nullptr; |
+ |
+ ASSERT_EQ(Dispatcher::kTypeDataPipeProducer, to_receive->GetType()); |
+ producer_dispatcher_ = |
+ static_cast<DataPipeProducerDispatcher*>(to_receive.get()); |
+ } |
+ |
+ private: |
+ DISALLOW_COPY_AND_ASSIGN(RemoteProducerDataPipeImplTestHelper2); |
+}; |
+ |
+// RemoteConsumerDataPipeImplTestHelper2 --------------------------------------- |
+ |
+// This is like |RemoteConsumerDataPipeImplTestHelper|, but |DoTransfer()| does |
+// a second transfer. This thus tests passing a consumer handle twice, and in |
+// particular tests (some of) |RemoteProducerDataPipeImpl|'s |
+// |ConsumerEndSerialize()| (instead of |LocalDataPipeImpl|'s). |
+// |
+// Note about naming confusion: This class is named after the "local" class, |
+// i.e., |dp_| will have a |RemoteConsumerDataPipeImpl|. The remote side, of |
+// course, will have a |RemoteProducerDataPipeImpl|. |
+class RemoteConsumerDataPipeImplTestHelper2 |
+ : public RemoteConsumerDataPipeImplTestHelper { |
+ public: |
+ RemoteConsumerDataPipeImplTestHelper2() {} |
+ ~RemoteConsumerDataPipeImplTestHelper2() override {} |
+ |
+ void DoTransfer() override { |
+ // This is the consumer dispatcher we'll send. |
+ scoped_refptr<DataPipeConsumerDispatcher> to_send = |
+ new DataPipeConsumerDispatcher(); |
+ to_send->Init(dp()); |
+ scoped_refptr<Dispatcher> to_receive; |
+ SendDispatcher(0, to_send, &to_receive); |
+ // |to_send| should have been closed. This is |DCHECK()|ed when it is |
+ // destroyed. |
+ EXPECT_TRUE(to_send->HasOneRef()); |
+ to_send = nullptr; |
+ ASSERT_EQ(Dispatcher::kTypeDataPipeConsumer, to_receive->GetType()); |
+ to_send = static_cast<DataPipeConsumerDispatcher*>(to_receive.get()); |
+ to_receive = nullptr; |
+ |
+ // Now send it back the other way. |
+ SendDispatcher(1, to_send, &to_receive); |
+ // |consumer_dispatcher_| should have been closed. This is |DCHECK()|ed when |
+ // it is destroyed. |
+ EXPECT_TRUE(to_send->HasOneRef()); |
+ to_send = nullptr; |
+ |
+ ASSERT_EQ(Dispatcher::kTypeDataPipeConsumer, to_receive->GetType()); |
+ consumer_dispatcher_ = |
+ static_cast<DataPipeConsumerDispatcher*>(to_receive.get()); |
+ } |
+ |
+ private: |
+ DISALLOW_COPY_AND_ASSIGN(RemoteConsumerDataPipeImplTestHelper2); |
+}; |
+ |
+// Test case instantiation ----------------------------------------------------- |
+ |
+typedef testing::Types<LocalDataPipeImplTestHelper, |
+ RemoteProducerDataPipeImplTestHelper, |
+ RemoteConsumerDataPipeImplTestHelper, |
+ RemoteProducerDataPipeImplTestHelper2, |
+ RemoteConsumerDataPipeImplTestHelper2> HelperTypes; |
+ |
+TYPED_TEST_CASE(DataPipeImplTest, HelperTypes); |
+ |
+// Tests ----------------------------------------------------------------------- |
+ |
+TYPED_TEST(DataPipeImplTest, SimpleReadWrite) { |
+ const MojoCreateDataPipeOptions options = { |
+ kSizeOfOptions, // |struct_size|. |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. |
+ static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. |
+ 1000 * sizeof(int32_t) // |capacity_num_bytes|. |
+ }; |
+ this->Create(options); |
+ this->DoTransfer(); |
+ |
+ Waiter waiter; |
+ HandleSignalsState hss; |
+ uint32_t context; |
+ |
+ int32_t elements[10] = {}; |
+ uint32_t num_bytes = 0; |
+ |
+ // Try reading; nothing there yet. |
+ num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0])); |
+ EXPECT_EQ( |
+ MOJO_RESULT_SHOULD_WAIT, |
+ this->dpc()->ConsumerReadData(UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), false, false)); |
+ |
+ // Query; nothing there yet. |
+ num_bytes = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ EXPECT_EQ(0u, num_bytes); |
+ |
+ // Discard; nothing there yet. |
+ num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, this->dpc()->ConsumerDiscardData( |
+ MakeUserPointer(&num_bytes), false)); |
+ |
+ // Read with invalid |num_bytes|. |
+ num_bytes = sizeof(elements[0]) + 1; |
+ EXPECT_EQ( |
+ MOJO_RESULT_INVALID_ARGUMENT, |
+ this->dpc()->ConsumerReadData(UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), false, false)); |
+ |
+ // For remote data pipes, we'll have to wait; add the waiter before writing. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); |
+ |
+ // Write two elements. |
+ elements[0] = 123; |
+ elements[1] = 456; |
+ num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerWriteData(UserPointer<const void>(elements), |
+ MakeUserPointer(&num_bytes), false)); |
+ // It should have written everything (even without "all or none"). |
+ EXPECT_EQ(2u * sizeof(elements[0]), num_bytes); |
+ |
+ // Wait. |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context)); |
+ EXPECT_EQ(123u, context); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Query. |
+ // TODO(vtl): It's theoretically possible (though not with the current |
+ // implementation/configured limits) that not all the data has arrived yet. |
+ // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...| |
+ // or |2 * ...|.) |
+ num_bytes = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ EXPECT_EQ(2 * sizeof(elements[0]), num_bytes); |
+ |
+ // Read one element. |
+ elements[0] = -1; |
+ elements[1] = -1; |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData( |
+ UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), false, false)); |
+ EXPECT_EQ(1u * sizeof(elements[0]), num_bytes); |
+ EXPECT_EQ(123, elements[0]); |
+ EXPECT_EQ(-1, elements[1]); |
+ |
+ // Query. |
+ // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we |
+ // should get 1 here.) |
+ num_bytes = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ EXPECT_EQ(1 * sizeof(elements[0]), num_bytes); |
+ |
+ // Peek one element. |
+ elements[0] = -1; |
+ elements[1] = -1; |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData( |
+ UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), false, true)); |
+ EXPECT_EQ(1u * sizeof(elements[0]), num_bytes); |
+ EXPECT_EQ(456, elements[0]); |
+ EXPECT_EQ(-1, elements[1]); |
+ |
+ // Query. Still has 1 element remaining. |
+ num_bytes = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ EXPECT_EQ(1 * sizeof(elements[0]), num_bytes); |
+ |
+ // Try to read two elements, with "all or none". |
+ elements[0] = -1; |
+ elements[1] = -1; |
+ num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); |
+ EXPECT_EQ( |
+ MOJO_RESULT_OUT_OF_RANGE, |
+ this->dpc()->ConsumerReadData(UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), true, false)); |
+ EXPECT_EQ(-1, elements[0]); |
+ EXPECT_EQ(-1, elements[1]); |
+ |
+ // Try to read two elements, without "all or none". |
+ elements[0] = -1; |
+ elements[1] = -1; |
+ num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData( |
+ UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), false, false)); |
+ EXPECT_EQ(1u * sizeof(elements[0]), num_bytes); |
+ EXPECT_EQ(456, elements[0]); |
+ EXPECT_EQ(-1, elements[1]); |
+ |
+ // Query. |
+ num_bytes = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ EXPECT_EQ(0u, num_bytes); |
+ |
+ this->ProducerClose(); |
+ this->ConsumerClose(); |
+} |
+ |
+// Note: The "basic" waiting tests test that the "wait states" are correct in |
+// various situations; they don't test that waiters are properly awoken on state |
+// changes. (For that, we need to use multiple threads.) |
+TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) { |
+ // Note: We take advantage of the fact that current for current |
+ // implementations capacities are strict maximums. This is not guaranteed by |
+ // the API. |
+ |
+ const MojoCreateDataPipeOptions options = { |
+ kSizeOfOptions, // |struct_size|. |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. |
+ static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. |
+ 2 * sizeof(int32_t) // |capacity_num_bytes|. |
+ }; |
+ this->Create(options); |
+ this->DoTransfer(); |
+ |
+ Waiter pwaiter; // For producer. |
+ Waiter cwaiter; // For consumer. |
+ HandleSignalsState hss; |
+ uint32_t context; |
+ |
+ // Never readable. |
+ pwaiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_READABLE, 12, &hss)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Already writable. |
+ pwaiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 34, &hss)); |
+ |
+ // We'll need to wait for readability for the remote cases. |
+ cwaiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 1234, nullptr)); |
+ |
+ // Write two elements. |
+ int32_t elements[2] = {123, 456}; |
+ uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerWriteData(UserPointer<const void>(elements), |
+ MakeUserPointer(&num_bytes), true)); |
+ EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); |
+ |
+ // Adding a waiter should now succeed. |
+ pwaiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56, nullptr)); |
+ // And it shouldn't be writable yet. |
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr)); |
+ hss = HandleSignalsState(); |
+ this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss); |
+ EXPECT_EQ(0u, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Wait for data to become available to the consumer. |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(1234u, context); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&cwaiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Peek one element. |
+ elements[0] = -1; |
+ elements[1] = -1; |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData( |
+ UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), true, true)); |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); |
+ EXPECT_EQ(123, elements[0]); |
+ EXPECT_EQ(-1, elements[1]); |
+ |
+ // Add a waiter. |
+ pwaiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56, nullptr)); |
+ // And it still shouldn't be writable yet. |
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr)); |
+ hss = HandleSignalsState(); |
+ this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss); |
+ EXPECT_EQ(0u, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Do it again. |
+ pwaiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 78, nullptr)); |
+ |
+ // Read one element. |
+ elements[0] = -1; |
+ elements[1] = -1; |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData( |
+ UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), true, false)); |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); |
+ EXPECT_EQ(123, elements[0]); |
+ EXPECT_EQ(-1, elements[1]); |
+ |
+ // Waiting should now succeed. |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, pwaiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(78u, context); |
+ hss = HandleSignalsState(); |
+ this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Try writing, using a two-phase write. |
+ void* buffer = nullptr; |
+ num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerBeginWriteData( |
+ MakeUserPointer(&buffer), MakeUserPointer(&num_bytes), false)); |
+ EXPECT_TRUE(buffer); |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); |
+ |
+ static_cast<int32_t*>(buffer)[0] = 789; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerEndWriteData( |
+ static_cast<uint32_t>(1u * sizeof(elements[0])))); |
+ |
+ // Add a waiter. |
+ pwaiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 90, nullptr)); |
+ |
+ // Read one element, using a two-phase read. |
+ const void* read_buffer = nullptr; |
+ num_bytes = 0u; |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData( |
+ MakeUserPointer(&read_buffer), |
+ MakeUserPointer(&num_bytes), false)); |
+ EXPECT_TRUE(read_buffer); |
+ // Since we only read one element (after having written three in all), the |
+ // two-phase read should only allow us to read one. This checks an |
+ // implementation detail! |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); |
+ EXPECT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerEndReadData( |
+ static_cast<uint32_t>(1u * sizeof(elements[0])))); |
+ |
+ // Waiting should succeed. |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, pwaiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(90u, context); |
+ hss = HandleSignalsState(); |
+ this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Write one element. |
+ elements[0] = 123; |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerWriteData(UserPointer<const void>(elements), |
+ MakeUserPointer(&num_bytes), false)); |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); |
+ |
+ // Add a waiter. |
+ pwaiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, nullptr)); |
+ |
+ // Close the consumer. |
+ this->ConsumerClose(); |
+ |
+ // It should now be never-writable. |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
+ pwaiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(12u, context); |
+ hss = HandleSignalsState(); |
+ this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); |
+ |
+ this->ProducerClose(); |
+} |
+ |
+TYPED_TEST(DataPipeImplTest, PeerClosedProducerWaiting) { |
+ const MojoCreateDataPipeOptions options = { |
+ kSizeOfOptions, // |struct_size|. |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. |
+ static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. |
+ 2 * sizeof(int32_t) // |capacity_num_bytes|. |
+ }; |
+ this->Create(options); |
+ this->DoTransfer(); |
+ |
+ Waiter waiter; |
+ HandleSignalsState hss; |
+ uint32_t context; |
+ |
+ // Add a waiter. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr)); |
+ |
+ // Close the consumer. |
+ this->ConsumerClose(); |
+ |
+ // It should be signaled. |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(12u, context); |
+ hss = HandleSignalsState(); |
+ this->dpp()->ProducerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); |
+ |
+ this->ProducerClose(); |
+} |
+ |
+TYPED_TEST(DataPipeImplTest, PeerClosedConsumerWaiting) { |
+ const MojoCreateDataPipeOptions options = { |
+ kSizeOfOptions, // |struct_size|. |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. |
+ static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. |
+ 2 * sizeof(int32_t) // |capacity_num_bytes|. |
+ }; |
+ this->Create(options); |
+ this->DoTransfer(); |
+ |
+ Waiter waiter; |
+ HandleSignalsState hss; |
+ uint32_t context; |
+ |
+ // Add a waiter. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr)); |
+ |
+ // Close the producer. |
+ this->ProducerClose(); |
+ |
+ // It should be signaled. |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(12u, context); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); |
+ |
+ this->ConsumerClose(); |
+} |
+ |
+TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) { |
+ const MojoCreateDataPipeOptions options = { |
+ kSizeOfOptions, // |struct_size|. |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. |
+ static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. |
+ 1000 * sizeof(int32_t) // |capacity_num_bytes|. |
+ }; |
+ this->Create(options); |
+ this->DoTransfer(); |
+ |
+ Waiter waiter; |
+ Waiter waiter2; |
+ HandleSignalsState hss; |
+ uint32_t context; |
+ |
+ // Never writable. |
+ waiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, &hss)); |
+ EXPECT_EQ(0u, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Add waiter: not yet readable. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, nullptr)); |
+ |
+ // Write two elements. |
+ int32_t elements[2] = {123, 456}; |
+ uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerWriteData(UserPointer<const void>(elements), |
+ MakeUserPointer(&num_bytes), true)); |
+ |
+ // Wait for readability (needed for remote cases). |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(34u, context); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Discard one element. |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerDiscardData( |
+ MakeUserPointer(&num_bytes), true)); |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); |
+ |
+ // Should still be readable. |
+ waiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, &hss)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Peek one element. |
+ elements[0] = -1; |
+ elements[1] = -1; |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData( |
+ UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), true, true)); |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); |
+ EXPECT_EQ(456, elements[0]); |
+ EXPECT_EQ(-1, elements[1]); |
+ |
+ // Should still be readable. |
+ waiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, &hss)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Read one element. |
+ elements[0] = -1; |
+ elements[1] = -1; |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData( |
+ UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), true, false)); |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); |
+ EXPECT_EQ(456, elements[0]); |
+ EXPECT_EQ(-1, elements[1]); |
+ |
+ // Adding a waiter should now succeed. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 90, nullptr)); |
+ |
+ // Write one element. |
+ elements[0] = 789; |
+ elements[1] = -1; |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerWriteData(UserPointer<const void>(elements), |
+ MakeUserPointer(&num_bytes), true)); |
+ |
+ // Waiting should now succeed. |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(90u, context); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // We'll want to wait for the peer closed signal to propagate. |
+ waiter.Init(); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr)); |
+ |
+ // Close the producer. |
+ this->ProducerClose(); |
+ |
+ // Should still be readable, even if the peer closed signal hasn't propagated |
+ // yet. |
+ waiter2.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter2, MOJO_HANDLE_SIGNAL_READABLE, 34, &hss)); |
+ // We don't know if the peer closed signal has propagated yet (for the remote |
+ // cases). |
+ EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Wait for the peer closed signal. |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(12u, context); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Read one element. |
+ elements[0] = -1; |
+ elements[1] = -1; |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData( |
+ UserPointer<void>(elements), |
+ MakeUserPointer(&num_bytes), true, false)); |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); |
+ EXPECT_EQ(789, elements[0]); |
+ EXPECT_EQ(-1, elements[1]); |
+ |
+ // Should be never-readable. |
+ waiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 56, &hss)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); |
+ |
+ this->ConsumerClose(); |
+} |
+ |
+// Test with two-phase APIs and also closing the producer with an active |
+// consumer waiter. |
+TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) { |
+ const MojoCreateDataPipeOptions options = { |
+ kSizeOfOptions, // |struct_size|. |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. |
+ static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. |
+ 1000 * sizeof(int32_t) // |capacity_num_bytes|. |
+ }; |
+ this->Create(options); |
+ this->DoTransfer(); |
+ |
+ Waiter waiter; |
+ HandleSignalsState hss; |
+ uint32_t context; |
+ |
+ // Add waiter: not yet readable. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 12, nullptr)); |
+ |
+ // Write two elements. |
+ int32_t* elements = nullptr; |
+ void* buffer = nullptr; |
+ // Request room for three (but we'll only write two). |
+ uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerBeginWriteData( |
+ MakeUserPointer(&buffer), MakeUserPointer(&num_bytes), true)); |
+ EXPECT_TRUE(buffer); |
+ EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0]))); |
+ elements = static_cast<int32_t*>(buffer); |
+ elements[0] = 123; |
+ elements[1] = 456; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerEndWriteData( |
+ static_cast<uint32_t>(2u * sizeof(elements[0])))); |
+ |
+ // Wait for readability (needed for remote cases). |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(12u, context); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Read one element. |
+ // Request two in all-or-none mode, but only read one. |
+ const void* read_buffer = nullptr; |
+ num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData( |
+ MakeUserPointer(&read_buffer), |
+ MakeUserPointer(&num_bytes), true)); |
+ EXPECT_TRUE(read_buffer); |
+ EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes); |
+ const int32_t* read_elements = static_cast<const int32_t*>(read_buffer); |
+ EXPECT_EQ(123, read_elements[0]); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerEndReadData( |
+ static_cast<uint32_t>(1u * sizeof(elements[0])))); |
+ |
+ // Should still be readable. |
+ waiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, &hss)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Read one element. |
+ // Request three, but not in all-or-none mode. |
+ read_buffer = nullptr; |
+ num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0])); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData( |
+ MakeUserPointer(&read_buffer), |
+ MakeUserPointer(&num_bytes), false)); |
+ EXPECT_TRUE(read_buffer); |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes); |
+ read_elements = static_cast<const int32_t*>(read_buffer); |
+ EXPECT_EQ(456, read_elements[0]); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerEndReadData( |
+ static_cast<uint32_t>(1u * sizeof(elements[0])))); |
+ |
+ // Adding a waiter should now succeed. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 56, nullptr)); |
+ |
+ // Close the producer. |
+ this->ProducerClose(); |
+ |
+ // Should be never-readable. |
+ context = 0; |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
+ waiter.Wait(test::TinyDeadline(), &context)); |
+ EXPECT_EQ(56u, context); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); |
+ |
+ this->ConsumerClose(); |
+} |
+ |
+// Tests that data pipes aren't writable/readable during two-phase writes/reads. |
+TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) { |
+ const MojoCreateDataPipeOptions options = { |
+ kSizeOfOptions, // |struct_size|. |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. |
+ static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. |
+ 1000 * sizeof(int32_t) // |capacity_num_bytes|. |
+ }; |
+ this->Create(options); |
+ this->DoTransfer(); |
+ |
+ Waiter pwaiter; // For producer. |
+ Waiter cwaiter; // For consumer. |
+ HandleSignalsState hss; |
+ |
+ // It should be writable. |
+ pwaiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t)); |
+ void* write_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerBeginWriteData( |
+ MakeUserPointer(&write_ptr), |
+ MakeUserPointer(&num_bytes), false)); |
+ EXPECT_TRUE(write_ptr); |
+ EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); |
+ |
+ // At this point, it shouldn't be writable. |
+ pwaiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1, nullptr)); |
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr)); |
+ hss = HandleSignalsState(); |
+ this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss); |
+ EXPECT_EQ(0u, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // It shouldn't be readable yet either (we'll wait later). |
+ cwaiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 2, nullptr)); |
+ |
+ static_cast<int32_t*>(write_ptr)[0] = 123; |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerEndWriteData( |
+ static_cast<uint32_t>(1u * sizeof(int32_t)))); |
+ |
+ // It should immediately be writable again. |
+ pwaiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3, &hss)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // It should become readable. |
+ EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyDeadline(), nullptr)); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&cwaiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Start another two-phase write and check that it's readable even in the |
+ // middle of it. |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t)); |
+ write_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerBeginWriteData( |
+ MakeUserPointer(&write_ptr), |
+ MakeUserPointer(&num_bytes), false)); |
+ EXPECT_TRUE(write_ptr); |
+ EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t))); |
+ |
+ // It should be readable. |
+ cwaiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ this->dpc()->ConsumerAddAwakable( |
+ &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 5, &hss)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // End the two-phase write without writing anything. |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerEndWriteData(0u)); |
+ |
+ // Start a two-phase read. |
+ num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t)); |
+ const void* read_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData( |
+ MakeUserPointer(&read_ptr), |
+ MakeUserPointer(&num_bytes), false)); |
+ EXPECT_TRUE(read_ptr); |
+ EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes); |
+ |
+ // At this point, it should still be writable. |
+ pwaiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ this->dpp()->ProducerAddAwakable( |
+ &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 6, &hss)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // But not readable. |
+ cwaiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 7, nullptr)); |
+ EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, cwaiter.Wait(0, nullptr)); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&cwaiter, &hss); |
+ EXPECT_EQ(0u, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // End the two-phase read without reading anything. |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerEndReadData(0u)); |
+ |
+ // It should be readable again. |
+ cwaiter.Init(); |
+ hss = HandleSignalsState(); |
+ EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, |
+ this->dpc()->ConsumerAddAwakable( |
+ &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 8, &hss)); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ this->ProducerClose(); |
+ this->ConsumerClose(); |
+} |
+ |
+void Seq(int32_t start, size_t count, int32_t* out) { |
+ for (size_t i = 0; i < count; i++) |
+ out[i] = start + static_cast<int32_t>(i); |
+} |
+ |
+TYPED_TEST(DataPipeImplTest, AllOrNone) { |
+ const MojoCreateDataPipeOptions options = { |
+ kSizeOfOptions, // |struct_size|. |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. |
+ static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. |
+ 10 * sizeof(int32_t) // |capacity_num_bytes|. |
+ }; |
+ this->Create(options); |
+ this->DoTransfer(); |
+ |
+ Waiter waiter; |
+ HandleSignalsState hss; |
+ |
+ // Try writing way too much. |
+ uint32_t num_bytes = 20u * sizeof(int32_t); |
+ int32_t buffer[100]; |
+ Seq(0, arraysize(buffer), buffer); |
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, |
+ this->dpp()->ProducerWriteData(UserPointer<const void>(buffer), |
+ MakeUserPointer(&num_bytes), true)); |
+ |
+ // Should still be empty. |
+ num_bytes = ~0u; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ EXPECT_EQ(0u, num_bytes); |
+ |
+ // Add waiter. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr)); |
+ |
+ // Write some data. |
+ num_bytes = 5u * sizeof(int32_t); |
+ Seq(100, arraysize(buffer), buffer); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerWriteData(UserPointer<const void>(buffer), |
+ MakeUserPointer(&num_bytes), true)); |
+ EXPECT_EQ(5u * sizeof(int32_t), num_bytes); |
+ |
+ // Wait for data. |
+ // TODO(vtl): There's no real guarantee that all the data will become |
+ // available at once (except that in current implementations, with reasonable |
+ // limits, it will). Eventually, we'll be able to wait for a specified amount |
+ // of data to become available. |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr)); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Half full. |
+ num_bytes = 0u; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ EXPECT_EQ(5u * sizeof(int32_t), num_bytes); |
+ |
+ // Too much. |
+ num_bytes = 6u * sizeof(int32_t); |
+ Seq(200, arraysize(buffer), buffer); |
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, |
+ this->dpp()->ProducerWriteData(UserPointer<const void>(buffer), |
+ MakeUserPointer(&num_bytes), true)); |
+ |
+ // Try reading too much. |
+ num_bytes = 11u * sizeof(int32_t); |
+ memset(buffer, 0xab, sizeof(buffer)); |
+ EXPECT_EQ( |
+ MOJO_RESULT_OUT_OF_RANGE, |
+ this->dpc()->ConsumerReadData(UserPointer<void>(buffer), |
+ MakeUserPointer(&num_bytes), true, false)); |
+ int32_t expected_buffer[100]; |
+ memset(expected_buffer, 0xab, sizeof(expected_buffer)); |
+ EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); |
+ |
+ // Try discarding too much. |
+ num_bytes = 11u * sizeof(int32_t); |
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpc()->ConsumerDiscardData( |
+ MakeUserPointer(&num_bytes), true)); |
+ |
+ // Just a little. |
+ num_bytes = 2u * sizeof(int32_t); |
+ Seq(300, arraysize(buffer), buffer); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerWriteData(UserPointer<const void>(buffer), |
+ MakeUserPointer(&num_bytes), true)); |
+ EXPECT_EQ(2u * sizeof(int32_t), num_bytes); |
+ |
+ // Just right. |
+ num_bytes = 3u * sizeof(int32_t); |
+ Seq(400, arraysize(buffer), buffer); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerWriteData(UserPointer<const void>(buffer), |
+ MakeUserPointer(&num_bytes), true)); |
+ EXPECT_EQ(3u * sizeof(int32_t), num_bytes); |
+ |
+ // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a |
+ // specified amount of data to be available, so poll. |
+ const size_t kMaxPoll = 100; |
+ for (size_t i = 0; i < kMaxPoll; i++) { |
+ num_bytes = 0u; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ if (num_bytes >= 10u * sizeof(int32_t)) |
+ break; |
+ |
+ base::PlatformThread::Sleep(test::EpsilonTimeout()); |
+ } |
+ EXPECT_EQ(10u * sizeof(int32_t), num_bytes); |
+ |
+ // Read half. |
+ num_bytes = 5u * sizeof(int32_t); |
+ memset(buffer, 0xab, sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData( |
+ UserPointer<void>(buffer), |
+ MakeUserPointer(&num_bytes), true, false)); |
+ EXPECT_EQ(5u * sizeof(int32_t), num_bytes); |
+ memset(expected_buffer, 0xab, sizeof(expected_buffer)); |
+ Seq(100, 5, expected_buffer); |
+ EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); |
+ |
+ // Try reading too much again. |
+ num_bytes = 6u * sizeof(int32_t); |
+ memset(buffer, 0xab, sizeof(buffer)); |
+ EXPECT_EQ( |
+ MOJO_RESULT_OUT_OF_RANGE, |
+ this->dpc()->ConsumerReadData(UserPointer<void>(buffer), |
+ MakeUserPointer(&num_bytes), true, false)); |
+ memset(expected_buffer, 0xab, sizeof(expected_buffer)); |
+ EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); |
+ |
+ // Try discarding too much again. |
+ num_bytes = 6u * sizeof(int32_t); |
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpc()->ConsumerDiscardData( |
+ MakeUserPointer(&num_bytes), true)); |
+ |
+ // Discard a little. |
+ num_bytes = 2u * sizeof(int32_t); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerDiscardData( |
+ MakeUserPointer(&num_bytes), true)); |
+ EXPECT_EQ(2u * sizeof(int32_t), num_bytes); |
+ |
+ // Three left. |
+ num_bytes = 0u; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ EXPECT_EQ(3u * sizeof(int32_t), num_bytes); |
+ |
+ // We'll need to wait for the peer closed to propagate. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2, nullptr)); |
+ |
+ // Close the producer, then test producer-closed cases. |
+ this->ProducerClose(); |
+ |
+ // Wait. |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr)); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Try reading too much; "failed precondition" since the producer is closed. |
+ num_bytes = 4u * sizeof(int32_t); |
+ memset(buffer, 0xab, sizeof(buffer)); |
+ EXPECT_EQ( |
+ MOJO_RESULT_FAILED_PRECONDITION, |
+ this->dpc()->ConsumerReadData(UserPointer<void>(buffer), |
+ MakeUserPointer(&num_bytes), true, false)); |
+ memset(expected_buffer, 0xab, sizeof(expected_buffer)); |
+ EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); |
+ |
+ // Try discarding too much; "failed precondition" again. |
+ num_bytes = 4u * sizeof(int32_t); |
+ EXPECT_EQ( |
+ MOJO_RESULT_FAILED_PRECONDITION, |
+ this->dpc()->ConsumerDiscardData(MakeUserPointer(&num_bytes), true)); |
+ |
+ // Read a little. |
+ num_bytes = 2u * sizeof(int32_t); |
+ memset(buffer, 0xab, sizeof(buffer)); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData( |
+ UserPointer<void>(buffer), |
+ MakeUserPointer(&num_bytes), true, false)); |
+ EXPECT_EQ(2u * sizeof(int32_t), num_bytes); |
+ memset(expected_buffer, 0xab, sizeof(expected_buffer)); |
+ Seq(400, 2, expected_buffer); |
+ EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer))); |
+ |
+ // Discard the remaining element. |
+ num_bytes = 1u * sizeof(int32_t); |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerDiscardData( |
+ MakeUserPointer(&num_bytes), true)); |
+ EXPECT_EQ(1u * sizeof(int32_t), num_bytes); |
+ |
+ // Empty again. |
+ num_bytes = ~0u; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ EXPECT_EQ(0u, num_bytes); |
+ |
+ this->ConsumerClose(); |
+} |
+ |
+TYPED_TEST(DataPipeImplTest, TwoPhaseAllOrNone) { |
+ const MojoCreateDataPipeOptions options = { |
+ kSizeOfOptions, // |struct_size|. |
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|. |
+ static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|. |
+ 10 * sizeof(int32_t) // |capacity_num_bytes|. |
+ }; |
+ this->Create(options); |
+ this->DoTransfer(); |
+ |
+ Waiter waiter; |
+ HandleSignalsState hss; |
+ |
+ // Try writing way too much (two-phase). |
+ uint32_t num_bytes = 20u * sizeof(int32_t); |
+ void* write_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpp()->ProducerBeginWriteData( |
+ MakeUserPointer(&write_ptr), |
+ MakeUserPointer(&num_bytes), true)); |
+ |
+ // Try writing an amount which isn't a multiple of the element size |
+ // (two-phase). |
+ static_assert(sizeof(int32_t) > 1u, "Wow! int32_t's have size 1"); |
+ num_bytes = 1u; |
+ write_ptr = nullptr; |
+ EXPECT_EQ( |
+ MOJO_RESULT_INVALID_ARGUMENT, |
+ this->dpp()->ProducerBeginWriteData(MakeUserPointer(&write_ptr), |
+ MakeUserPointer(&num_bytes), true)); |
+ |
+ // Try reading way too much (two-phase). |
+ num_bytes = 20u * sizeof(int32_t); |
+ const void* read_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, |
+ this->dpc()->ConsumerBeginReadData( |
+ MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true)); |
+ |
+ // Add waiter. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr)); |
+ |
+ // Write half (two-phase). |
+ num_bytes = 5u * sizeof(int32_t); |
+ write_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerBeginWriteData( |
+ MakeUserPointer(&write_ptr), |
+ MakeUserPointer(&num_bytes), true)); |
+ // May provide more space than requested. |
+ EXPECT_GE(num_bytes, 5u * sizeof(int32_t)); |
+ EXPECT_TRUE(write_ptr); |
+ Seq(0, 5, static_cast<int32_t*>(write_ptr)); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpp()->ProducerEndWriteData(5u * sizeof(int32_t))); |
+ |
+ // Wait for data. |
+ // TODO(vtl): (See corresponding TODO in AllOrNone.) |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr)); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // Try reading an amount which isn't a multiple of the element size |
+ // (two-phase). |
+ num_bytes = 1u; |
+ read_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, |
+ this->dpc()->ConsumerBeginReadData( |
+ MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true)); |
+ |
+ // Read one (two-phase). |
+ num_bytes = 1u * sizeof(int32_t); |
+ read_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerBeginReadData( |
+ MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true)); |
+ EXPECT_GE(num_bytes, 1u * sizeof(int32_t)); |
+ EXPECT_EQ(0, static_cast<const int32_t*>(read_ptr)[0]); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerEndReadData(1u * sizeof(int32_t))); |
+ |
+ // We should have four left, leaving room for six. |
+ num_bytes = 0u; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ EXPECT_EQ(4u * sizeof(int32_t), num_bytes); |
+ |
+ // Assuming a tight circular buffer of the specified capacity, we can't do a |
+ // two-phase write of six now. |
+ num_bytes = 6u * sizeof(int32_t); |
+ write_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpp()->ProducerBeginWriteData( |
+ MakeUserPointer(&write_ptr), |
+ MakeUserPointer(&num_bytes), true)); |
+ |
+ // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a |
+ // specified amount of space to be available, so poll. |
+ const size_t kMaxPoll = 100; |
+ for (size_t i = 0; i < kMaxPoll; i++) { |
+ // Write six elements (simple), filling the buffer. |
+ num_bytes = 6u * sizeof(int32_t); |
+ int32_t buffer[100]; |
+ Seq(100, 6, buffer); |
+ MojoResult result = this->dpp()->ProducerWriteData( |
+ UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true); |
+ if (result == MOJO_RESULT_OK) |
+ break; |
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, result); |
+ |
+ base::PlatformThread::Sleep(test::EpsilonTimeout()); |
+ } |
+ EXPECT_EQ(6u * sizeof(int32_t), num_bytes); |
+ |
+ // TODO(vtl): Hack: poll again. |
+ for (size_t i = 0; i < kMaxPoll; i++) { |
+ // We have ten. |
+ num_bytes = 0u; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes))); |
+ if (num_bytes >= 10u * sizeof(int32_t)) |
+ break; |
+ |
+ base::PlatformThread::Sleep(test::EpsilonTimeout()); |
+ } |
+ EXPECT_EQ(10u * sizeof(int32_t), num_bytes); |
+ |
+ // Note: Whether a two-phase read of ten would fail here or not is |
+ // implementation-dependent. |
+ |
+ // Add waiter. |
+ waiter.Init(); |
+ ASSERT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerAddAwakable( |
+ &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2, nullptr)); |
+ |
+ // Close the producer. |
+ this->ProducerClose(); |
+ |
+ // A two-phase read of nine should work. |
+ num_bytes = 9u * sizeof(int32_t); |
+ read_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerBeginReadData( |
+ MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true)); |
+ EXPECT_GE(num_bytes, 9u * sizeof(int32_t)); |
+ EXPECT_EQ(1, static_cast<const int32_t*>(read_ptr)[0]); |
+ EXPECT_EQ(2, static_cast<const int32_t*>(read_ptr)[1]); |
+ EXPECT_EQ(3, static_cast<const int32_t*>(read_ptr)[2]); |
+ EXPECT_EQ(4, static_cast<const int32_t*>(read_ptr)[3]); |
+ EXPECT_EQ(100, static_cast<const int32_t*>(read_ptr)[4]); |
+ EXPECT_EQ(101, static_cast<const int32_t*>(read_ptr)[5]); |
+ EXPECT_EQ(102, static_cast<const int32_t*>(read_ptr)[6]); |
+ EXPECT_EQ(103, static_cast<const int32_t*>(read_ptr)[7]); |
+ EXPECT_EQ(104, static_cast<const int32_t*>(read_ptr)[8]); |
+ EXPECT_EQ(MOJO_RESULT_OK, |
+ this->dpc()->ConsumerEndReadData(9u * sizeof(int32_t))); |
+ |
+ // Wait for peer closed. |
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr)); |
+ hss = HandleSignalsState(); |
+ this->dpc()->ConsumerRemoveAwakable(&waiter, &hss); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfied_signals); |
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
+ hss.satisfiable_signals); |
+ |
+ // A two-phase read of two should fail, with "failed precondition". |
+ num_bytes = 2u * sizeof(int32_t); |
+ read_ptr = nullptr; |
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
+ this->dpc()->ConsumerBeginReadData( |
+ MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true)); |
+ |
+ this->ConsumerClose(); |
+} |
+ |
+} // namespace |
+} // namespace system |
+} // namespace mojo |