Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(628)

Unified Diff: third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc

Issue 1019173002: Update mojo sdk to rev 7214b7ec7d27563b2666afad86cf1c5895c56c18 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Keep permission service alive if embedder drops requests Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc
diff --git a/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc b/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc
index b14d3af4c4c7d2c78c3873b3bb6f082bc7e468e7..17021f35ca7450e1589349c79ba59bc49cff5c4e 100644
--- a/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc
+++ b/third_party/mojo/src/mojo/edk/system/data_pipe_impl_unittest.cc
@@ -13,6 +13,7 @@
#include "base/location.h"
#include "base/logging.h"
#include "base/macros.h"
+#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/test/test_io_thread.h"
#include "base/threading/platform_thread.h" // For |Sleep()|.
@@ -40,6 +41,12 @@ const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE |
const uint32_t kSizeOfOptions =
static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
+// In various places, we have to poll (since, e.g., we can't yet wait for a
+// certain amount of data to be available). This is the maximum number of
+// iterations (separated by a short sleep).
+// TODO(vtl): Get rid of this.
+const size_t kMaxPoll = 100;
+
// DataPipeImplTestHelper ------------------------------------------------------
class DataPipeImplTestHelper {
@@ -51,13 +58,21 @@ class DataPipeImplTestHelper {
virtual void Create(const MojoCreateDataPipeOptions& validated_options) = 0;
+ // Returns true if the producer and consumer exhibit the behavior that you'd
+ // expect from a pure circular buffer implementation (reflected to two-phase
+ // reads and writes).
+ virtual bool IsStrictCircularBuffer() const = 0;
+
// Possibly transfers the producer/consumer.
virtual void DoTransfer() = 0;
// Returns the |DataPipe| object for the producer and consumer, respectively.
- virtual DataPipe* dpp() = 0;
- virtual DataPipe* dpc() = 0;
+ virtual DataPipe* DataPipeForProducer() = 0;
+ virtual DataPipe* DataPipeForConsumer() = 0;
+ // Closes the producer and consumer, respectively. (Other operations go
+ // through the above accessors; closing is special since it may require that a
+ // dispatcher be closed.)
virtual void ProducerClose() = 0;
virtual void ConsumerClose() = 0;
@@ -76,8 +91,8 @@ class DataPipeImplTest : public testing::Test {
DataPipeImplTest() {}
~DataPipeImplTest() override {}
- void SetUp() override { helper_.SetUp(); }
- void TearDown() override { helper_.TearDown(); }
+ void SetUp() override { Reset(); }
+ void TearDown() override { helper_->TearDown(); }
protected:
void Create(const MojoCreateDataPipeOptions& options) {
@@ -85,19 +100,88 @@ class DataPipeImplTest : public testing::Test {
ASSERT_EQ(MOJO_RESULT_OK,
DataPipe::ValidateCreateOptions(MakeUserPointer(&options),
&validated_options));
- helper_.Create(validated_options);
+ helper_->Create(validated_options);
+ }
+
+ bool IsStrictCircularBuffer() const {
+ return helper_->IsStrictCircularBuffer();
}
- void DoTransfer() { return helper_.DoTransfer(); }
+ void DoTransfer() { return helper_->DoTransfer(); }
+
+ void Reset() {
+ if (helper_)
+ helper_->TearDown();
- DataPipe* dpp() { return helper_.dpp(); }
- DataPipe* dpc() { return helper_.dpc(); }
+ helper_.reset(new Helper());
+ helper_->SetUp();
+ }
- void ProducerClose() { helper_.ProducerClose(); }
- void ConsumerClose() { helper_.ConsumerClose(); }
+ void ProducerClose() { helper_->ProducerClose(); }
+ MojoResult ProducerWriteData(UserPointer<const void> elements,
+ UserPointer<uint32_t> num_bytes,
+ bool all_or_none) {
+ return dpp()->ProducerWriteData(elements, num_bytes, all_or_none);
+ }
+ MojoResult ProducerBeginWriteData(UserPointer<void*> buffer,
+ UserPointer<uint32_t> buffer_num_bytes,
+ bool all_or_none) {
+ return dpp()->ProducerBeginWriteData(buffer, buffer_num_bytes, all_or_none);
+ }
+ MojoResult ProducerEndWriteData(uint32_t num_bytes_written) {
+ return dpp()->ProducerEndWriteData(num_bytes_written);
+ }
+ MojoResult ProducerAddAwakable(Awakable* awakable,
+ MojoHandleSignals signals,
+ uint32_t context,
+ HandleSignalsState* signals_state) {
+ return dpp()->ProducerAddAwakable(awakable, signals, context,
+ signals_state);
+ }
+ void ProducerRemoveAwakable(Awakable* awakable,
+ HandleSignalsState* signals_state) {
+ return dpp()->ProducerRemoveAwakable(awakable, signals_state);
+ }
+
+ void ConsumerClose() { helper_->ConsumerClose(); }
+ MojoResult ConsumerReadData(UserPointer<void> elements,
+ UserPointer<uint32_t> num_bytes,
+ bool all_or_none,
+ bool peek) {
+ return dpc()->ConsumerReadData(elements, num_bytes, all_or_none, peek);
+ }
+ MojoResult ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
+ bool all_or_none) {
+ return dpc()->ConsumerDiscardData(num_bytes, all_or_none);
+ }
+ MojoResult ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
+ return dpc()->ConsumerQueryData(num_bytes);
+ }
+ MojoResult ConsumerBeginReadData(UserPointer<const void*> buffer,
+ UserPointer<uint32_t> buffer_num_bytes,
+ bool all_or_none) {
+ return dpc()->ConsumerBeginReadData(buffer, buffer_num_bytes, all_or_none);
+ }
+ MojoResult ConsumerEndReadData(uint32_t num_bytes_read) {
+ return dpc()->ConsumerEndReadData(num_bytes_read);
+ }
+ MojoResult ConsumerAddAwakable(Awakable* awakable,
+ MojoHandleSignals signals,
+ uint32_t context,
+ HandleSignalsState* signals_state) {
+ return dpc()->ConsumerAddAwakable(awakable, signals, context,
+ signals_state);
+ }
+ void ConsumerRemoveAwakable(Awakable* awakable,
+ HandleSignalsState* signals_state) {
+ return dpc()->ConsumerRemoveAwakable(awakable, signals_state);
+ }
private:
- Helper helper_;
+ DataPipe* dpp() { return helper_->DataPipeForProducer(); }
+ DataPipe* dpc() { return helper_->DataPipeForConsumer(); }
+
+ scoped_ptr<Helper> helper_;
DISALLOW_COPY_AND_ASSIGN(DataPipeImplTest);
};
@@ -117,11 +201,13 @@ class LocalDataPipeImplTestHelper : public DataPipeImplTestHelper {
dp_ = DataPipe::CreateLocal(validated_options);
}
+ bool IsStrictCircularBuffer() const override { return true; }
+
void DoTransfer() override {}
// Returns the |DataPipe| object for the producer and consumer, respectively.
- DataPipe* dpp() override { return dp_.get(); }
- DataPipe* dpc() override { return dp_.get(); }
+ DataPipe* DataPipeForProducer() override { return dp_.get(); }
+ DataPipe* DataPipeForConsumer() override { return dp_.get(); }
void ProducerClose() override { dp_->ProducerClose(); }
void ConsumerClose() override { dp_->ConsumerClose(); }
@@ -163,6 +249,8 @@ class RemoteDataPipeImplTestHelper : public DataPipeImplTestHelper {
dp_ = DataPipe::CreateLocal(validated_options);
}
+ bool IsStrictCircularBuffer() const override { return false; }
+
protected:
void SendDispatcher(size_t source_i,
scoped_refptr<Dispatcher> to_send,
@@ -293,12 +381,12 @@ class RemoteProducerDataPipeImplTestHelper
static_cast<DataPipeProducerDispatcher*>(to_receive.get());
}
- DataPipe* dpp() override {
+ DataPipe* DataPipeForProducer() override {
if (producer_dispatcher_)
return producer_dispatcher_->GetDataPipeForTest();
return dp().get();
}
- DataPipe* dpc() override { return dp().get(); }
+ DataPipe* DataPipeForConsumer() override { return dp().get(); }
void ProducerClose() override {
if (producer_dispatcher_)
@@ -343,8 +431,8 @@ class RemoteConsumerDataPipeImplTestHelper
static_cast<DataPipeConsumerDispatcher*>(to_receive.get());
}
- DataPipe* dpp() override { return dp().get(); }
- DataPipe* dpc() override {
+ DataPipe* DataPipeForProducer() override { return dp().get(); }
+ DataPipe* DataPipeForConsumer() override {
if (consumer_dispatcher_)
return consumer_dispatcher_->GetDataPipeForTest();
return dp().get();
@@ -471,6 +559,42 @@ TYPED_TEST_CASE(DataPipeImplTest, HelperTypes);
// Tests -----------------------------------------------------------------------
+// Tests creation (and possibly also transferring) of data pipes with various
+// (valid) options.
+TYPED_TEST(DataPipeImplTest, CreateAndMaybeTransfer) {
+ MojoCreateDataPipeOptions test_options[] = {
+ // Default options -- we'll initialize this below.
+ {},
+ // Trivial element size, non-default capacity.
+ {kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 1, // |element_num_bytes|.
+ 1000}, // |capacity_num_bytes|.
+ // Nontrivial element size, non-default capacity.
+ {kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 4, // |element_num_bytes|.
+ 4000}, // |capacity_num_bytes|.
+ // Nontrivial element size, default capacity.
+ {kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 100, // |element_num_bytes|.
+ 0} // |capacity_num_bytes|.
+ };
+
+ // Initialize the first element of |test_options| to the default options.
+ EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateCreateOptions(NullUserPointer(),
+ &test_options[0]));
+
+ for (size_t i = 0; i < arraysize(test_options); i++) {
+ this->Create(test_options[i]);
+ this->DoTransfer();
+ this->ProducerClose();
+ this->ConsumerClose();
+ this->Reset();
+ }
+}
+
TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
const MojoCreateDataPipeOptions options = {
kSizeOfOptions, // |struct_size|.
@@ -490,42 +614,40 @@ TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
// Try reading; nothing there yet.
num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
- EXPECT_EQ(
- MOJO_RESULT_SHOULD_WAIT,
- this->dpc()->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), false, false));
+ EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), false, false));
// Query; nothing there yet.
num_bytes = 0;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
EXPECT_EQ(0u, num_bytes);
// Discard; nothing there yet.
num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, this->dpc()->ConsumerDiscardData(
- MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
+ this->ConsumerDiscardData(MakeUserPointer(&num_bytes), false));
// Read with invalid |num_bytes|.
num_bytes = sizeof(elements[0]) + 1;
- EXPECT_EQ(
- MOJO_RESULT_INVALID_ARGUMENT,
- this->dpc()->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), false, false));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), false, false));
// For remote data pipes, we'll have to wait; add the waiter before writing.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 123,
+ nullptr));
// Write two elements.
elements[0] = 123;
elements[1] = 456;
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerWriteData(UserPointer<const void>(elements),
- MakeUserPointer(&num_bytes), false));
+ this->ProducerWriteData(UserPointer<const void>(elements),
+ MakeUserPointer(&num_bytes), false));
// It should have written everything (even without "all or none").
EXPECT_EQ(2u * sizeof(elements[0]), num_bytes);
@@ -534,7 +656,7 @@ TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context));
EXPECT_EQ(123u, context);
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -546,16 +668,16 @@ TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
// or |2 * ...|.)
num_bytes = 0;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
EXPECT_EQ(2 * sizeof(elements[0]), num_bytes);
// Read one element.
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
- UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), false, false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), false, false));
EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
EXPECT_EQ(123, elements[0]);
EXPECT_EQ(-1, elements[1]);
@@ -565,16 +687,16 @@ TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
// should get 1 here.)
num_bytes = 0;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
EXPECT_EQ(1 * sizeof(elements[0]), num_bytes);
// Peek one element.
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
- UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), false, true));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), false, true));
EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
EXPECT_EQ(456, elements[0]);
EXPECT_EQ(-1, elements[1]);
@@ -582,17 +704,16 @@ TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
// Query. Still has 1 element remaining.
num_bytes = 0;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
EXPECT_EQ(1 * sizeof(elements[0]), num_bytes);
// Try to read two elements, with "all or none".
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
- EXPECT_EQ(
- MOJO_RESULT_OUT_OF_RANGE,
- this->dpc()->ConsumerReadData(UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, false));
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), true, false));
EXPECT_EQ(-1, elements[0]);
EXPECT_EQ(-1, elements[1]);
@@ -600,9 +721,9 @@ TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
- UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), false, false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), false, false));
EXPECT_EQ(1u * sizeof(elements[0]), num_bytes);
EXPECT_EQ(456, elements[0]);
EXPECT_EQ(-1, elements[1]);
@@ -610,7 +731,7 @@ TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
// Query.
num_bytes = 0;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
EXPECT_EQ(0u, num_bytes);
this->ProducerClose();
@@ -643,8 +764,8 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
pwaiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_READABLE, 12, &hss));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_READABLE, 12,
+ &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -653,32 +774,32 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
pwaiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 34, &hss));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 34,
+ &hss));
// We'll need to wait for readability for the remote cases.
cwaiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 1234, nullptr));
+ this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE,
+ 1234, nullptr));
// Write two elements.
int32_t elements[2] = {123, 456};
uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerWriteData(UserPointer<const void>(elements),
- MakeUserPointer(&num_bytes), true));
+ this->ProducerWriteData(UserPointer<const void>(elements),
+ MakeUserPointer(&num_bytes), true));
EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
// Adding a waiter should now succeed.
pwaiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56, nullptr));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56,
+ nullptr));
// And it shouldn't be writable yet.
EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
hss = HandleSignalsState();
- this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
+ this->ProducerRemoveAwakable(&pwaiter, &hss);
EXPECT_EQ(0u, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -688,7 +809,7 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(1234u, context);
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&cwaiter, &hss);
+ this->ConsumerRemoveAwakable(&cwaiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -697,9 +818,9 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
- UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, true));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), true, true));
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
EXPECT_EQ(123, elements[0]);
EXPECT_EQ(-1, elements[1]);
@@ -707,12 +828,12 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
// Add a waiter.
pwaiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56, nullptr));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 56,
+ nullptr));
// And it still shouldn't be writable yet.
EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
hss = HandleSignalsState();
- this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
+ this->ProducerRemoveAwakable(&pwaiter, &hss);
EXPECT_EQ(0u, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -720,16 +841,16 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
// Do it again.
pwaiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 78, nullptr));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 78,
+ nullptr));
// Read one element.
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
- UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), true, false));
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
EXPECT_EQ(123, elements[0]);
EXPECT_EQ(-1, elements[1]);
@@ -739,7 +860,7 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
EXPECT_EQ(MOJO_RESULT_OK, pwaiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(78u, context);
hss = HandleSignalsState();
- this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
+ this->ProducerRemoveAwakable(&pwaiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -748,44 +869,42 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
void* buffer = nullptr;
num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerBeginWriteData(
- MakeUserPointer(&buffer), MakeUserPointer(&num_bytes), false));
+ this->ProducerBeginWriteData(MakeUserPointer(&buffer),
+ MakeUserPointer(&num_bytes), false));
EXPECT_TRUE(buffer);
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
static_cast<int32_t*>(buffer)[0] = 789;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerEndWriteData(
- static_cast<uint32_t>(1u * sizeof(elements[0]))));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>(
+ 1u * sizeof(elements[0]))));
// Add a waiter.
pwaiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 90, nullptr));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 90,
+ nullptr));
// Read one element, using a two-phase read.
const void* read_buffer = nullptr;
num_bytes = 0u;
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData(
- MakeUserPointer(&read_buffer),
- MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
+ MakeUserPointer(&num_bytes), false));
EXPECT_TRUE(read_buffer);
// Since we only read one element (after having written three in all), the
// two-phase read should only allow us to read one. This checks an
// implementation detail!
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
EXPECT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerEndReadData(
- static_cast<uint32_t>(1u * sizeof(elements[0]))));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
+ 1u * sizeof(elements[0]))));
// Waiting should succeed.
context = 0;
EXPECT_EQ(MOJO_RESULT_OK, pwaiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(90u, context);
hss = HandleSignalsState();
- this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
+ this->ProducerRemoveAwakable(&pwaiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -794,15 +913,15 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
elements[0] = 123;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerWriteData(UserPointer<const void>(elements),
- MakeUserPointer(&num_bytes), false));
+ this->ProducerWriteData(UserPointer<const void>(elements),
+ MakeUserPointer(&num_bytes), false));
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
// Add a waiter.
pwaiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, nullptr));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12,
+ nullptr));
// Close the consumer.
this->ConsumerClose();
@@ -813,7 +932,7 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
pwaiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(12u, context);
hss = HandleSignalsState();
- this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
+ this->ProducerRemoveAwakable(&pwaiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
@@ -837,8 +956,8 @@ TYPED_TEST(DataPipeImplTest, PeerClosedProducerWaiting) {
// Add a waiter.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr));
+ this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ 12, nullptr));
// Close the consumer.
this->ConsumerClose();
@@ -848,7 +967,7 @@ TYPED_TEST(DataPipeImplTest, PeerClosedProducerWaiting) {
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(12u, context);
hss = HandleSignalsState();
- this->dpp()->ProducerRemoveAwakable(&waiter, &hss);
+ this->ProducerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
@@ -872,8 +991,8 @@ TYPED_TEST(DataPipeImplTest, PeerClosedConsumerWaiting) {
// Add a waiter.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ 12, nullptr));
// Close the producer.
this->ProducerClose();
@@ -883,7 +1002,7 @@ TYPED_TEST(DataPipeImplTest, PeerClosedConsumerWaiting) {
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(12u, context);
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
@@ -909,8 +1028,8 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
waiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12, &hss));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12,
+ &hss));
EXPECT_EQ(0u, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -918,38 +1037,38 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
// Add waiter: not yet readable.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34,
+ nullptr));
// Write two elements.
int32_t elements[2] = {123, 456};
uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerWriteData(UserPointer<const void>(elements),
- MakeUserPointer(&num_bytes), true));
+ this->ProducerWriteData(UserPointer<const void>(elements),
+ MakeUserPointer(&num_bytes), true));
// Wait for readability (needed for remote cases).
context = 0;
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(34u, context);
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// Discard one element.
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerDiscardData(
- MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
// Should still be readable.
waiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, &hss));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78,
+ &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -958,9 +1077,9 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
- UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, true));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), true, true));
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
EXPECT_EQ(456, elements[0]);
EXPECT_EQ(-1, elements[1]);
@@ -969,8 +1088,8 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
waiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 78, &hss));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78,
+ &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -979,9 +1098,9 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
- UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), true, false));
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
EXPECT_EQ(456, elements[0]);
EXPECT_EQ(-1, elements[1]);
@@ -989,23 +1108,23 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
// Adding a waiter should now succeed.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 90, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 90,
+ nullptr));
// Write one element.
elements[0] = 789;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerWriteData(UserPointer<const void>(elements),
- MakeUserPointer(&num_bytes), true));
+ this->ProducerWriteData(UserPointer<const void>(elements),
+ MakeUserPointer(&num_bytes), true));
// Waiting should now succeed.
context = 0;
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(90u, context);
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -1013,8 +1132,8 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
// We'll want to wait for the peer closed signal to propagate.
waiter.Init();
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 12, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ 12, nullptr));
// Close the producer.
this->ProducerClose();
@@ -1024,8 +1143,8 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
waiter2.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->dpc()->ConsumerAddAwakable(
- &waiter2, MOJO_HANDLE_SIGNAL_READABLE, 34, &hss));
+ this->ConsumerAddAwakable(&waiter2, MOJO_HANDLE_SIGNAL_READABLE, 34,
+ &hss));
// We don't know if the peer closed signal has propagated yet (for the remote
// cases).
EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
@@ -1037,7 +1156,7 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(12u, context);
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
@@ -1047,9 +1166,9 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
elements[0] = -1;
elements[1] = -1;
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
- UserPointer<void>(elements),
- MakeUserPointer(&num_bytes), true, false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(elements),
+ MakeUserPointer(&num_bytes), true, false));
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
EXPECT_EQ(789, elements[0]);
EXPECT_EQ(-1, elements[1]);
@@ -1058,8 +1177,8 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
waiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 56, &hss));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 56,
+ &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
@@ -1085,8 +1204,8 @@ TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
// Add waiter: not yet readable.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 12, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 12,
+ nullptr));
// Write two elements.
int32_t* elements = nullptr;
@@ -1094,23 +1213,22 @@ TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
// Request room for three (but we'll only write two).
uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerBeginWriteData(
- MakeUserPointer(&buffer), MakeUserPointer(&num_bytes), true));
+ this->ProducerBeginWriteData(MakeUserPointer(&buffer),
+ MakeUserPointer(&num_bytes), true));
EXPECT_TRUE(buffer);
EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
elements = static_cast<int32_t*>(buffer);
elements[0] = 123;
elements[1] = 456;
- EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerEndWriteData(
- static_cast<uint32_t>(2u * sizeof(elements[0]))));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(static_cast<uint32_t>(
+ 2u * sizeof(elements[0]))));
// Wait for readability (needed for remote cases).
context = 0;
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(12u, context);
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -1119,23 +1237,22 @@ TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
// Request two in all-or-none mode, but only read one.
const void* read_buffer = nullptr;
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData(
- MakeUserPointer(&read_buffer),
- MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
+ MakeUserPointer(&num_bytes), true));
EXPECT_TRUE(read_buffer);
EXPECT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
EXPECT_EQ(123, read_elements[0]);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerEndReadData(
- static_cast<uint32_t>(1u * sizeof(elements[0]))));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
+ 1u * sizeof(elements[0]))));
// Should still be readable.
waiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 34, &hss));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34,
+ &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -1144,22 +1261,21 @@ TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
// Request three, but not in all-or-none mode.
read_buffer = nullptr;
num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData(
- MakeUserPointer(&read_buffer),
- MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_buffer),
+ MakeUserPointer(&num_bytes), false));
EXPECT_TRUE(read_buffer);
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
read_elements = static_cast<const int32_t*>(read_buffer);
EXPECT_EQ(456, read_elements[0]);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerEndReadData(
- static_cast<uint32_t>(1u * sizeof(elements[0]))));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(static_cast<uint32_t>(
+ 1u * sizeof(elements[0]))));
// Adding a waiter should now succeed.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 56, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 56,
+ nullptr));
// Close the producer.
this->ProducerClose();
@@ -1170,7 +1286,7 @@ TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
waiter.Wait(test::TinyDeadline(), &context));
EXPECT_EQ(56u, context);
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
@@ -1196,28 +1312,28 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
pwaiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 0,
+ &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
void* write_ptr = nullptr;
- EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerBeginWriteData(
- MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
+ MakeUserPointer(&num_bytes), false));
EXPECT_TRUE(write_ptr);
EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
// At this point, it shouldn't be writable.
pwaiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1, nullptr));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 1,
+ nullptr));
EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, pwaiter.Wait(0, nullptr));
hss = HandleSignalsState();
- this->dpp()->ProducerRemoveAwakable(&pwaiter, &hss);
+ this->ProducerRemoveAwakable(&pwaiter, &hss);
EXPECT_EQ(0u, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -1225,19 +1341,19 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
// It shouldn't be readable yet either (we'll wait later).
cwaiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 2, nullptr));
+ this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 2,
+ nullptr));
static_cast<int32_t*>(write_ptr)[0] = 123;
- EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerEndWriteData(
+ EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(
static_cast<uint32_t>(1u * sizeof(int32_t))));
// It should immediately be writable again.
pwaiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3, &hss));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 3,
+ &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -1245,7 +1361,7 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
// It should become readable.
EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyDeadline(), nullptr));
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&cwaiter, &hss);
+ this->ConsumerRemoveAwakable(&cwaiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -1254,9 +1370,9 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
// middle of it.
num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
write_ptr = nullptr;
- EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerBeginWriteData(
- MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
+ MakeUserPointer(&num_bytes), false));
EXPECT_TRUE(write_ptr);
EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
@@ -1264,21 +1380,21 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
cwaiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->dpc()->ConsumerAddAwakable(
- &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 5, &hss));
+ this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 5,
+ &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// End the two-phase write without writing anything.
- EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerEndWriteData(0u));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
// Start a two-phase read.
num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
const void* read_ptr = nullptr;
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerBeginReadData(
- MakeUserPointer(&read_ptr),
- MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
+ MakeUserPointer(&num_bytes), false));
EXPECT_TRUE(read_ptr);
EXPECT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
@@ -1286,8 +1402,8 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
pwaiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->dpp()->ProducerAddAwakable(
- &pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 6, &hss));
+ this->ProducerAddAwakable(&pwaiter, MOJO_HANDLE_SIGNAL_WRITABLE, 6,
+ &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -1295,24 +1411,24 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
// But not readable.
cwaiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 7, nullptr));
+ this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 7,
+ nullptr));
EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, cwaiter.Wait(0, nullptr));
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&cwaiter, &hss);
+ this->ConsumerRemoveAwakable(&cwaiter, &hss);
EXPECT_EQ(0u, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
// End the two-phase read without reading anything.
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerEndReadData(0u));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u));
// It should be readable again.
cwaiter.Init();
hss = HandleSignalsState();
EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
- this->dpc()->ConsumerAddAwakable(
- &cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 8, &hss));
+ this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 8,
+ &hss));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -1344,27 +1460,27 @@ TYPED_TEST(DataPipeImplTest, AllOrNone) {
int32_t buffer[100];
Seq(0, arraysize(buffer), buffer);
EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
- this->dpp()->ProducerWriteData(UserPointer<const void>(buffer),
- MakeUserPointer(&num_bytes), true));
+ this->ProducerWriteData(UserPointer<const void>(buffer),
+ MakeUserPointer(&num_bytes), true));
// Should still be empty.
num_bytes = ~0u;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
EXPECT_EQ(0u, num_bytes);
// Add waiter.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
+ nullptr));
// Write some data.
num_bytes = 5u * sizeof(int32_t);
Seq(100, arraysize(buffer), buffer);
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerWriteData(UserPointer<const void>(buffer),
- MakeUserPointer(&num_bytes), true));
+ this->ProducerWriteData(UserPointer<const void>(buffer),
+ MakeUserPointer(&num_bytes), true));
EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
// Wait for data.
@@ -1374,7 +1490,7 @@ TYPED_TEST(DataPipeImplTest, AllOrNone) {
// of data to become available.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -1382,55 +1498,53 @@ TYPED_TEST(DataPipeImplTest, AllOrNone) {
// Half full.
num_bytes = 0u;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
// Too much.
num_bytes = 6u * sizeof(int32_t);
Seq(200, arraysize(buffer), buffer);
EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
- this->dpp()->ProducerWriteData(UserPointer<const void>(buffer),
- MakeUserPointer(&num_bytes), true));
+ this->ProducerWriteData(UserPointer<const void>(buffer),
+ MakeUserPointer(&num_bytes), true));
// Try reading too much.
num_bytes = 11u * sizeof(int32_t);
memset(buffer, 0xab, sizeof(buffer));
- EXPECT_EQ(
- MOJO_RESULT_OUT_OF_RANGE,
- this->dpc()->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), true, false));
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
+ this->ConsumerReadData(UserPointer<void>(buffer),
+ MakeUserPointer(&num_bytes), true, false));
int32_t expected_buffer[100];
memset(expected_buffer, 0xab, sizeof(expected_buffer));
EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
// Try discarding too much.
num_bytes = 11u * sizeof(int32_t);
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpc()->ConsumerDiscardData(
- MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
+ this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
// Just a little.
num_bytes = 2u * sizeof(int32_t);
Seq(300, arraysize(buffer), buffer);
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerWriteData(UserPointer<const void>(buffer),
- MakeUserPointer(&num_bytes), true));
+ this->ProducerWriteData(UserPointer<const void>(buffer),
+ MakeUserPointer(&num_bytes), true));
EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
// Just right.
num_bytes = 3u * sizeof(int32_t);
Seq(400, arraysize(buffer), buffer);
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerWriteData(UserPointer<const void>(buffer),
- MakeUserPointer(&num_bytes), true));
+ this->ProducerWriteData(UserPointer<const void>(buffer),
+ MakeUserPointer(&num_bytes), true));
EXPECT_EQ(3u * sizeof(int32_t), num_bytes);
// TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
// specified amount of data to be available, so poll.
- const size_t kMaxPoll = 100;
for (size_t i = 0; i < kMaxPoll; i++) {
num_bytes = 0u;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
if (num_bytes >= 10u * sizeof(int32_t))
break;
@@ -1441,9 +1555,9 @@ TYPED_TEST(DataPipeImplTest, AllOrNone) {
// Read half.
num_bytes = 5u * sizeof(int32_t);
memset(buffer, 0xab, sizeof(buffer));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
- UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), true, false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(buffer),
+ MakeUserPointer(&num_bytes), true, false));
EXPECT_EQ(5u * sizeof(int32_t), num_bytes);
memset(expected_buffer, 0xab, sizeof(expected_buffer));
Seq(100, 5, expected_buffer);
@@ -1452,35 +1566,34 @@ TYPED_TEST(DataPipeImplTest, AllOrNone) {
// Try reading too much again.
num_bytes = 6u * sizeof(int32_t);
memset(buffer, 0xab, sizeof(buffer));
- EXPECT_EQ(
- MOJO_RESULT_OUT_OF_RANGE,
- this->dpc()->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), true, false));
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
+ this->ConsumerReadData(UserPointer<void>(buffer),
+ MakeUserPointer(&num_bytes), true, false));
memset(expected_buffer, 0xab, sizeof(expected_buffer));
EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
// Try discarding too much again.
num_bytes = 6u * sizeof(int32_t);
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpc()->ConsumerDiscardData(
- MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
+ this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
// Discard a little.
num_bytes = 2u * sizeof(int32_t);
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerDiscardData(
- MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
// Three left.
num_bytes = 0u;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
EXPECT_EQ(3u * sizeof(int32_t), num_bytes);
// We'll need to wait for the peer closed to propagate.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ 2, nullptr));
// Close the producer, then test producer-closed cases.
this->ProducerClose();
@@ -1488,7 +1601,7 @@ TYPED_TEST(DataPipeImplTest, AllOrNone) {
// Wait.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
@@ -1497,25 +1610,23 @@ TYPED_TEST(DataPipeImplTest, AllOrNone) {
// Try reading too much; "failed precondition" since the producer is closed.
num_bytes = 4u * sizeof(int32_t);
memset(buffer, 0xab, sizeof(buffer));
- EXPECT_EQ(
- MOJO_RESULT_FAILED_PRECONDITION,
- this->dpc()->ConsumerReadData(UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), true, false));
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ this->ConsumerReadData(UserPointer<void>(buffer),
+ MakeUserPointer(&num_bytes), true, false));
memset(expected_buffer, 0xab, sizeof(expected_buffer));
EXPECT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
// Try discarding too much; "failed precondition" again.
num_bytes = 4u * sizeof(int32_t);
- EXPECT_EQ(
- MOJO_RESULT_FAILED_PRECONDITION,
- this->dpc()->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
// Read a little.
num_bytes = 2u * sizeof(int32_t);
memset(buffer, 0xab, sizeof(buffer));
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerReadData(
- UserPointer<void>(buffer),
- MakeUserPointer(&num_bytes), true, false));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(buffer),
+ MakeUserPointer(&num_bytes), true, false));
EXPECT_EQ(2u * sizeof(int32_t), num_bytes);
memset(expected_buffer, 0xab, sizeof(expected_buffer));
Seq(400, 2, expected_buffer);
@@ -1523,14 +1634,14 @@ TYPED_TEST(DataPipeImplTest, AllOrNone) {
// Discard the remaining element.
num_bytes = 1u * sizeof(int32_t);
- EXPECT_EQ(MOJO_RESULT_OK, this->dpc()->ConsumerDiscardData(
- MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerDiscardData(MakeUserPointer(&num_bytes), true));
EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
// Empty again.
num_bytes = ~0u;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
EXPECT_EQ(0u, num_bytes);
this->ConsumerClose();
@@ -1552,51 +1663,49 @@ TYPED_TEST(DataPipeImplTest, TwoPhaseAllOrNone) {
// Try writing way too much (two-phase).
uint32_t num_bytes = 20u * sizeof(int32_t);
void* write_ptr = nullptr;
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpp()->ProducerBeginWriteData(
- MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
+ MakeUserPointer(&num_bytes), true));
// Try writing an amount which isn't a multiple of the element size
// (two-phase).
static_assert(sizeof(int32_t) > 1u, "Wow! int32_t's have size 1");
num_bytes = 1u;
write_ptr = nullptr;
- EXPECT_EQ(
- MOJO_RESULT_INVALID_ARGUMENT,
- this->dpp()->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
+ MakeUserPointer(&num_bytes), true));
// Try reading way too much (two-phase).
num_bytes = 20u * sizeof(int32_t);
const void* read_ptr = nullptr;
EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
- this->dpc()->ConsumerBeginReadData(
- MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
+ this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
+ MakeUserPointer(&num_bytes), true));
// Add waiter.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_READABLE, 1, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
+ nullptr));
// Write half (two-phase).
num_bytes = 5u * sizeof(int32_t);
write_ptr = nullptr;
- EXPECT_EQ(MOJO_RESULT_OK, this->dpp()->ProducerBeginWriteData(
- MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
+ MakeUserPointer(&num_bytes), true));
// May provide more space than requested.
EXPECT_GE(num_bytes, 5u * sizeof(int32_t));
EXPECT_TRUE(write_ptr);
Seq(0, 5, static_cast<int32_t*>(write_ptr));
- EXPECT_EQ(MOJO_RESULT_OK,
- this->dpp()->ProducerEndWriteData(5u * sizeof(int32_t)));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(5u * sizeof(int32_t)));
// Wait for data.
// TODO(vtl): (See corresponding TODO in AllOrNone.)
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
@@ -1606,43 +1715,41 @@ TYPED_TEST(DataPipeImplTest, TwoPhaseAllOrNone) {
num_bytes = 1u;
read_ptr = nullptr;
EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
- this->dpc()->ConsumerBeginReadData(
- MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
+ this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
+ MakeUserPointer(&num_bytes), true));
// Read one (two-phase).
num_bytes = 1u * sizeof(int32_t);
read_ptr = nullptr;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerBeginReadData(
- MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
+ this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
+ MakeUserPointer(&num_bytes), true));
EXPECT_GE(num_bytes, 1u * sizeof(int32_t));
EXPECT_EQ(0, static_cast<const int32_t*>(read_ptr)[0]);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerEndReadData(1u * sizeof(int32_t)));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(1u * sizeof(int32_t)));
// We should have four left, leaving room for six.
num_bytes = 0u;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
EXPECT_EQ(4u * sizeof(int32_t), num_bytes);
// Assuming a tight circular buffer of the specified capacity, we can't do a
// two-phase write of six now.
num_bytes = 6u * sizeof(int32_t);
write_ptr = nullptr;
- EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, this->dpp()->ProducerBeginWriteData(
- MakeUserPointer(&write_ptr),
- MakeUserPointer(&num_bytes), true));
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
+ MakeUserPointer(&num_bytes), true));
// TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
// specified amount of space to be available, so poll.
- const size_t kMaxPoll = 100;
for (size_t i = 0; i < kMaxPoll; i++) {
// Write six elements (simple), filling the buffer.
num_bytes = 6u * sizeof(int32_t);
int32_t buffer[100];
Seq(100, 6, buffer);
- MojoResult result = this->dpp()->ProducerWriteData(
+ MojoResult result = this->ProducerWriteData(
UserPointer<const void>(buffer), MakeUserPointer(&num_bytes), true);
if (result == MOJO_RESULT_OK)
break;
@@ -1657,7 +1764,7 @@ TYPED_TEST(DataPipeImplTest, TwoPhaseAllOrNone) {
// We have ten.
num_bytes = 0u;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
if (num_bytes >= 10u * sizeof(int32_t))
break;
@@ -1671,8 +1778,8 @@ TYPED_TEST(DataPipeImplTest, TwoPhaseAllOrNone) {
// Add waiter.
waiter.Init();
ASSERT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerAddAwakable(
- &waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 2, nullptr));
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ 2, nullptr));
// Close the producer.
this->ProducerClose();
@@ -1681,8 +1788,8 @@ TYPED_TEST(DataPipeImplTest, TwoPhaseAllOrNone) {
num_bytes = 9u * sizeof(int32_t);
read_ptr = nullptr;
EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerBeginReadData(
- MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
+ this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
+ MakeUserPointer(&num_bytes), true));
EXPECT_GE(num_bytes, 9u * sizeof(int32_t));
EXPECT_EQ(1, static_cast<const int32_t*>(read_ptr)[0]);
EXPECT_EQ(2, static_cast<const int32_t*>(read_ptr)[1]);
@@ -1693,13 +1800,12 @@ TYPED_TEST(DataPipeImplTest, TwoPhaseAllOrNone) {
EXPECT_EQ(102, static_cast<const int32_t*>(read_ptr)[6]);
EXPECT_EQ(103, static_cast<const int32_t*>(read_ptr)[7]);
EXPECT_EQ(104, static_cast<const int32_t*>(read_ptr)[8]);
- EXPECT_EQ(MOJO_RESULT_OK,
- this->dpc()->ConsumerEndReadData(9u * sizeof(int32_t)));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(9u * sizeof(int32_t)));
// Wait for peer closed.
EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
hss = HandleSignalsState();
- this->dpc()->ConsumerRemoveAwakable(&waiter, &hss);
+ this->ConsumerRemoveAwakable(&waiter, &hss);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
@@ -1709,9 +1815,582 @@ TYPED_TEST(DataPipeImplTest, TwoPhaseAllOrNone) {
num_bytes = 2u * sizeof(int32_t);
read_ptr = nullptr;
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
- this->dpc()->ConsumerBeginReadData(
- MakeUserPointer(&read_ptr), MakeUserPointer(&num_bytes), true));
+ this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
+ MakeUserPointer(&num_bytes), true));
+
+ this->ConsumerClose();
+}
+
+// Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
+// respectively, as much as possible, even if it may have to "wrap around" the
+// internal circular buffer. (Note that the two-phase write and read need not do
+// this.)
+TYPED_TEST(DataPipeImplTest, WrapAround) {
+ unsigned char test_data[1000];
+ for (size_t i = 0; i < arraysize(test_data); i++)
+ test_data[i] = static_cast<unsigned char>(i);
+
+ const MojoCreateDataPipeOptions options = {
+ kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 1u, // |element_num_bytes|.
+ 100u // |capacity_num_bytes|.
+ };
+ MojoCreateDataPipeOptions validated_options = {};
+ // This test won't be valid if |ValidateCreateOptions()| decides to give the
+ // pipe more space.
+ EXPECT_EQ(MOJO_RESULT_OK, DataPipe::ValidateCreateOptions(
+ MakeUserPointer(&options), &validated_options));
+ ASSERT_EQ(100u, validated_options.capacity_num_bytes);
+ this->Create(options);
+ this->DoTransfer();
+
+ Waiter waiter;
+ HandleSignalsState hss;
+
+ // Add waiter.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
+ nullptr));
+ // Write 20 bytes.
+ uint32_t num_bytes = 20u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerWriteData(UserPointer<const void>(&test_data[0]),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(20u, num_bytes);
+
+ // Wait for data.
+ // TODO(vtl): (See corresponding TODO in AllOrNone.)
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
+ hss = HandleSignalsState();
+ this->ConsumerRemoveAwakable(&waiter, &hss);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ hss.satisfiable_signals);
+
+ // Read 10 bytes.
+ unsigned char read_buffer[1000] = {0};
+ num_bytes = 10u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(read_buffer),
+ MakeUserPointer(&num_bytes), false, false));
+ EXPECT_EQ(10u, num_bytes);
+ EXPECT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
+
+ if (this->IsStrictCircularBuffer()) {
+ // Check that a two-phase write can now only write (at most) 80 bytes. (This
+ // checks an implementation detail; this behavior is not guaranteed.)
+ void* write_buffer_ptr = nullptr;
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_TRUE(write_buffer_ptr);
+ EXPECT_EQ(80u, num_bytes);
+ EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(0u));
+ }
+
+ // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
+ size_t total_num_bytes = 0;
+ for (size_t i = 0; i < kMaxPoll; i++) {
+ // Write as much data as we can (using |ProducerWriteData()|). We should
+ // write 90 bytes (eventually).
+ num_bytes = 200u;
+ MojoResult result = this->ProducerWriteData(
+ UserPointer<const void>(&test_data[20 + total_num_bytes]),
+ MakeUserPointer(&num_bytes), false);
+ if (result == MOJO_RESULT_OK) {
+ total_num_bytes += num_bytes;
+ if (total_num_bytes >= 90u)
+ break;
+ } else {
+ EXPECT_EQ(MOJO_RESULT_OUT_OF_RANGE, result);
+ }
+
+ base::PlatformThread::Sleep(test::EpsilonTimeout());
+ }
+ EXPECT_EQ(90u, total_num_bytes);
+
+ // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
+ for (size_t i = 0; i < kMaxPoll; i++) {
+ // We have 100.
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ if (num_bytes >= 100u)
+ break;
+
+ base::PlatformThread::Sleep(test::EpsilonTimeout());
+ }
+ EXPECT_EQ(100u, num_bytes);
+
+ if (this->IsStrictCircularBuffer()) {
+ // Check that a two-phase read can now only read (at most) 90 bytes. (This
+ // checks an implementation detail; this behavior is not guaranteed.)
+ const void* read_buffer_ptr = nullptr;
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_TRUE(read_buffer_ptr);
+ EXPECT_EQ(90u, num_bytes);
+ EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(0u));
+ }
+
+ // Read as much as possible (using |ConsumerReadData()|). We should read 100
+ // bytes.
+ num_bytes =
+ static_cast<uint32_t>(arraysize(read_buffer) * sizeof(read_buffer[0]));
+ memset(read_buffer, 0, num_bytes);
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(read_buffer),
+ MakeUserPointer(&num_bytes), false, false));
+ EXPECT_EQ(100u, num_bytes);
+ EXPECT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
+
+ this->ProducerClose();
+ this->ConsumerClose();
+}
+
+// Tests the behavior of writing (simple and two-phase), closing the producer,
+// then reading (simple and two-phase).
+TYPED_TEST(DataPipeImplTest, WriteCloseProducerRead) {
+ const char kTestData[] = "hello world";
+ const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
+
+ const MojoCreateDataPipeOptions options = {
+ kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 1u, // |element_num_bytes|.
+ 1000u // |capacity_num_bytes|.
+ };
+ this->Create(options);
+ this->DoTransfer();
+
+ // Write some data, so we'll have something to read.
+ uint32_t num_bytes = kTestDataSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerWriteData(UserPointer<const void>(kTestData),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(kTestDataSize, num_bytes);
+
+ // Write it again, so we'll have something left over.
+ num_bytes = kTestDataSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerWriteData(UserPointer<const void>(kTestData),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(kTestDataSize, num_bytes);
+
+ // Start two-phase write.
+ void* write_buffer_ptr = nullptr;
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_TRUE(write_buffer_ptr);
+ EXPECT_GT(num_bytes, 0u);
+
+ // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
+ for (size_t i = 0; i < kMaxPoll; i++) {
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ if (num_bytes >= 2u * kTestDataSize)
+ break;
+
+ base::PlatformThread::Sleep(test::EpsilonTimeout());
+ }
+ EXPECT_EQ(2u * kTestDataSize, num_bytes);
+
+ // Start two-phase read.
+ const void* read_buffer_ptr = nullptr;
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_TRUE(read_buffer_ptr);
+ EXPECT_EQ(2u * kTestDataSize, num_bytes);
+
+ // Close the producer.
+ this->ProducerClose();
+
+ // The consumer can finish its two-phase read.
+ EXPECT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
+ EXPECT_EQ(MOJO_RESULT_OK, this->ConsumerEndReadData(kTestDataSize));
+
+ // And start another.
+ read_buffer_ptr = nullptr;
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_TRUE(read_buffer_ptr);
+ EXPECT_EQ(kTestDataSize, num_bytes);
+
+ // Close the consumer, which cancels the two-phase read.
+ this->ConsumerClose();
+}
+
+// Tests the behavior of interrupting a two-phase read and write by closing the
+// consumer.
+TYPED_TEST(DataPipeImplTest, TwoPhaseWriteReadCloseConsumer) {
+ const char kTestData[] = "hello world";
+ const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
+
+ const MojoCreateDataPipeOptions options = {
+ kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 1u, // |element_num_bytes|.
+ 1000u // |capacity_num_bytes|.
+ };
+ this->Create(options);
+ this->DoTransfer();
+
+ Waiter waiter;
+ HandleSignalsState hss;
+
+ // Add waiter.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
+ nullptr));
+
+ // Write some data, so we'll have something to read.
+ uint32_t num_bytes = kTestDataSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerWriteData(UserPointer<const void>(kTestData),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(kTestDataSize, num_bytes);
+
+ // Start two-phase write.
+ void* write_buffer_ptr = nullptr;
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_TRUE(write_buffer_ptr);
+ ASSERT_GT(num_bytes, kTestDataSize);
+
+ // Wait for data.
+ // TODO(vtl): (See corresponding TODO in AllOrNone.)
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
+ hss = HandleSignalsState();
+ this->ConsumerRemoveAwakable(&waiter, &hss);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ hss.satisfiable_signals);
+
+ // Start two-phase read.
+ const void* read_buffer_ptr = nullptr;
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_TRUE(read_buffer_ptr);
+ EXPECT_EQ(kTestDataSize, num_bytes);
+
+ // Add waiter.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ProducerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ 1, nullptr));
+
+ // Close the consumer.
+ this->ConsumerClose();
+
+ // Wait for producer to know that the consumer is closed.
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
+ hss = HandleSignalsState();
+ this->ProducerRemoveAwakable(&waiter, &hss);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
+
+ // Actually write some data. (Note: Premature freeing of the buffer would
+ // probably only be detected under ASAN or similar.)
+ memcpy(write_buffer_ptr, kTestData, kTestDataSize);
+ // Note: Even though the consumer has been closed, ending the two-phase
+ // write will report success.
+ EXPECT_EQ(MOJO_RESULT_OK, this->ProducerEndWriteData(kTestDataSize));
+
+ // But trying to write should result in failure.
+ num_bytes = kTestDataSize;
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ this->ProducerWriteData(UserPointer<const void>(kTestData),
+ MakeUserPointer(&num_bytes), false));
+
+ // As will trying to start another two-phase write.
+ write_buffer_ptr = nullptr;
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
+ MakeUserPointer(&num_bytes), false));
+
+ this->ProducerClose();
+}
+
+// Tests the behavior of "interrupting" a two-phase write by closing both the
+// producer and the consumer.
+TYPED_TEST(DataPipeImplTest, TwoPhaseWriteCloseBoth) {
+ const uint32_t kTestDataSize = 15u;
+
+ const MojoCreateDataPipeOptions options = {
+ kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 1u, // |element_num_bytes|.
+ 1000u // |capacity_num_bytes|.
+ };
+ this->Create(options);
+ this->DoTransfer();
+
+ // Start two-phase write.
+ void* write_buffer_ptr = nullptr;
+ uint32_t num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_buffer_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_TRUE(write_buffer_ptr);
+ ASSERT_GT(num_bytes, kTestDataSize);
+
+ this->ConsumerClose();
+ this->ProducerClose();
+}
+
+// Tests the behavior of writing, closing the producer, and then reading (with
+// and without data remaining).
+TYPED_TEST(DataPipeImplTest, WriteCloseProducerReadNoData) {
+ const char kTestData[] = "hello world";
+ const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
+
+ const MojoCreateDataPipeOptions options = {
+ kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ 1u, // |element_num_bytes|.
+ 1000u // |capacity_num_bytes|.
+ };
+ this->Create(options);
+ this->DoTransfer();
+
+ Waiter waiter;
+ HandleSignalsState hss;
+
+ // Write some data, so we'll have something to read.
+ uint32_t num_bytes = kTestDataSize;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerWriteData(UserPointer<const void>(kTestData),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(kTestDataSize, num_bytes);
+
+ // Add waiter.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ 1, nullptr));
+
+ // Close the producer.
+ this->ProducerClose();
+
+ // Wait. (Note that once the consumer knows that the producer is closed, it
+ // must also know about all the data that was sent.)
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
+ hss = HandleSignalsState();
+ this->ConsumerRemoveAwakable(&waiter, &hss);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ hss.satisfiable_signals);
+
+ // Peek that data.
+ char buffer[1000];
+ num_bytes = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(buffer),
+ MakeUserPointer(&num_bytes), false, true));
+ EXPECT_EQ(kTestDataSize, num_bytes);
+ EXPECT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
+
+ // Read that data.
+ memset(buffer, 0, 1000);
+ num_bytes = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerReadData(UserPointer<void>(buffer),
+ MakeUserPointer(&num_bytes), false, false));
+ EXPECT_EQ(kTestDataSize, num_bytes);
+ EXPECT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
+
+ // A second read should fail.
+ num_bytes = static_cast<uint32_t>(sizeof(buffer));
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ this->ConsumerReadData(UserPointer<void>(buffer),
+ MakeUserPointer(&num_bytes), false, false));
+
+ // A two-phase read should also fail.
+ const void* read_buffer_ptr = nullptr;
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_buffer_ptr),
+ MakeUserPointer(&num_bytes), false));
+
+ // Ditto for discard.
+ num_bytes = 10u;
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ this->ConsumerDiscardData(MakeUserPointer(&num_bytes), false));
+
+ this->ConsumerClose();
+}
+
+// Test that two-phase reads/writes behave correctly when given invalid
+// arguments.
+TYPED_TEST(DataPipeImplTest, TwoPhaseMoreInvalidArguments) {
+ const MojoCreateDataPipeOptions options = {
+ kSizeOfOptions, // |struct_size|.
+ MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
+ static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
+ 10 * sizeof(int32_t) // |capacity_num_bytes|.
+ };
+ this->Create(options);
+ this->DoTransfer();
+
+ Waiter waiter;
+ HandleSignalsState hss;
+
+ // No data.
+ uint32_t num_bytes = 1000u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ EXPECT_EQ(0u, num_bytes);
+
+ // Try "ending" a two-phase write when one isn't active.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ this->ProducerEndWriteData(1u * sizeof(int32_t)));
+
+ // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
+ // have time to propagate.
+ base::PlatformThread::Sleep(test::EpsilonTimeout());
+
+ // Still no data.
+ num_bytes = 1000u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ EXPECT_EQ(0u, num_bytes);
+
+ // Try ending a two-phase write with an invalid amount (too much).
+ num_bytes = 0u;
+ void* write_ptr = nullptr;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ this->ProducerEndWriteData(num_bytes +
+ static_cast<uint32_t>(sizeof(int32_t))));
+
+ // But the two-phase write still ended.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, this->ProducerEndWriteData(0u));
+
+ // Wait a bit (as above).
+ base::PlatformThread::Sleep(test::EpsilonTimeout());
+
+ // Still no data.
+ num_bytes = 1000u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ EXPECT_EQ(0u, num_bytes);
+
+ // Try ending a two-phase write with an invalid amount (not a multiple of the
+ // element size).
+ num_bytes = 0u;
+ write_ptr = nullptr;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerBeginWriteData(MakeUserPointer(&write_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_GE(num_bytes, 1u);
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, this->ProducerEndWriteData(1u));
+
+ // But the two-phase write still ended.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, this->ProducerEndWriteData(0u));
+
+ // Wait a bit (as above).
+ base::PlatformThread::Sleep(test::EpsilonTimeout());
+
+ // Still no data.
+ num_bytes = 1000u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ EXPECT_EQ(0u, num_bytes);
+
+ // Add waiter.
+ waiter.Init();
+ ASSERT_EQ(MOJO_RESULT_OK,
+ this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 1,
+ nullptr));
+
+ // Now write some data, so we'll be able to try reading.
+ int32_t element = 123;
+ num_bytes = 1u * sizeof(int32_t);
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ProducerWriteData(UserPointer<const void>(&element),
+ MakeUserPointer(&num_bytes), false));
+
+ // Wait for data.
+ // TODO(vtl): (See corresponding TODO in AllOrNone.)
+ EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyDeadline(), nullptr));
+ hss = HandleSignalsState();
+ this->ConsumerRemoveAwakable(&waiter, &hss);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
+ EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
+ hss.satisfiable_signals);
+
+ // One element available.
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
+
+ // Try "ending" a two-phase read when one isn't active.
+ EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
+ this->ConsumerEndReadData(1u * sizeof(int32_t)));
+
+ // Still one element available.
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
+
+ // Try ending a two-phase read with an invalid amount (too much).
+ num_bytes = 0u;
+ const void* read_ptr = nullptr;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
+ this->ConsumerEndReadData(num_bytes +
+ static_cast<uint32_t>(sizeof(int32_t))));
+
+ // Still one element available.
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
+
+ // Try ending a two-phase read with an invalid amount (not a multiple of the
+ // element size).
+ num_bytes = 0u;
+ read_ptr = nullptr;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerBeginReadData(MakeUserPointer(&read_ptr),
+ MakeUserPointer(&num_bytes), false));
+ EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
+ EXPECT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
+ EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT, this->ConsumerEndReadData(1u));
+
+ // Still one element available.
+ num_bytes = 0u;
+ EXPECT_EQ(MOJO_RESULT_OK,
+ this->ConsumerQueryData(MakeUserPointer(&num_bytes)));
+ EXPECT_EQ(1u * sizeof(int32_t), num_bytes);
+
+ this->ProducerClose();
this->ConsumerClose();
}

Powered by Google App Engine
This is Rietveld 408576698