| Index: mojo/edk/system/data_pipe_impl_unittest.cc
|
| diff --git a/mojo/edk/system/data_pipe_impl_unittest.cc b/mojo/edk/system/data_pipe_impl_unittest.cc
|
| index d181255ef5c7a0bd5d1346730a96c6d605d5ca8b..a783ad94e63b2c6ffadd4b4986704f6dea42ad1b 100644
|
| --- a/mojo/edk/system/data_pipe_impl_unittest.cc
|
| +++ b/mojo/edk/system/data_pipe_impl_unittest.cc
|
| @@ -40,9 +40,6 @@ namespace mojo {
|
| namespace system {
|
| namespace {
|
|
|
| -const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE |
|
| - MOJO_HANDLE_SIGNAL_WRITABLE |
|
| - MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| const uint32_t kSizeOfOptions =
|
| static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
|
|
|
| @@ -148,6 +145,12 @@ class DataPipeImplTest : public testing::Test {
|
| }
|
|
|
| void ConsumerClose() { helper_->ConsumerClose(); }
|
| + MojoResult ConsumerSetOptions(uint32_t read_threshold_num_bytes) {
|
| + return dpc()->ConsumerSetOptions(read_threshold_num_bytes);
|
| + }
|
| + void ConsumerGetOptions(uint32_t* read_threshold_num_bytes) {
|
| + dpc()->ConsumerGetOptions(read_threshold_num_bytes);
|
| + }
|
| MojoResult ConsumerReadData(UserPointer<void> elements,
|
| UserPointer<uint32_t> num_bytes,
|
| bool all_or_none,
|
| @@ -288,7 +291,9 @@ class RemoteDataPipeImplTestHelper : public DataPipeImplTestHelper {
|
| message_pipe(dest_i)->RemoveAwakable(0, &waiter, &hss);
|
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
|
| hss.satisfied_signals);
|
| - EXPECT_EQ(kAllSignals, hss.satisfiable_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
|
| + MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| char read_buffer[100] = {};
|
| uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
|
| DispatcherVector read_dispatchers;
|
| @@ -661,8 +666,10 @@ TYPED_TEST(DataPipeImplTest, SimpleReadWrite) {
|
| EXPECT_EQ(123u, context);
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Query.
|
| @@ -814,8 +821,10 @@ TYPED_TEST(DataPipeImplTest, BasicProducerWaiting) {
|
| EXPECT_EQ(1234u, context);
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&cwaiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Peek one element.
|
| @@ -1035,7 +1044,8 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
|
| this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_WRITABLE, 12,
|
| &hss));
|
| EXPECT_EQ(0u, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Add waiter: not yet readable.
|
| @@ -1057,8 +1067,10 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
|
| EXPECT_EQ(34u, context);
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Discard one element.
|
| @@ -1073,8 +1085,10 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
|
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78,
|
| &hss));
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Peek one element.
|
| @@ -1094,8 +1108,10 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
|
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 78,
|
| &hss));
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Read one element.
|
| @@ -1129,8 +1145,10 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
|
| EXPECT_EQ(90u, context);
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // We'll want to wait for the peer closed signal to propagate.
|
| @@ -1152,7 +1170,8 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
|
| // We don't know if the peer closed signal has propagated yet (for the remote
|
| // cases).
|
| EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Wait for the peer closed signal.
|
| @@ -1161,9 +1180,11 @@ TYPED_TEST(DataPipeImplTest, BasicConsumerWaiting) {
|
| EXPECT_EQ(12u, context);
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Read one element.
|
| @@ -1232,8 +1253,10 @@ TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
|
| EXPECT_EQ(12u, context);
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Read one element.
|
| @@ -1255,8 +1278,10 @@ TYPED_TEST(DataPipeImplTest, ConsumerWaitingTwoPhase) {
|
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| this->ConsumerAddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 34,
|
| &hss));
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Read one element.
|
| @@ -1364,8 +1389,10 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
|
| EXPECT_EQ(MOJO_RESULT_OK, cwaiter.Wait(test::TinyTimeout(), nullptr));
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&cwaiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Start another two-phase write and check that it's readable even in the
|
| @@ -1384,8 +1411,10 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
|
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 5,
|
| &hss));
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // End the two-phase write without writing anything.
|
| @@ -1419,7 +1448,8 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&cwaiter, &hss);
|
| EXPECT_EQ(0u, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // End the two-phase read without reading anything.
|
| @@ -1431,8 +1461,10 @@ TYPED_TEST(DataPipeImplTest, BasicTwoPhaseWaiting) {
|
| EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| this->ConsumerAddAwakable(&cwaiter, MOJO_HANDLE_SIGNAL_READABLE, 8,
|
| &hss));
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| this->ProducerClose();
|
| @@ -1493,8 +1525,10 @@ TYPED_TEST(DataPipeImplTest, AllOrNone) {
|
| EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Half full.
|
| @@ -1604,9 +1638,11 @@ TYPED_TEST(DataPipeImplTest, AllOrNone) {
|
| EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Try reading too much; "failed precondition" since the producer is closed.
|
| @@ -1694,8 +1730,10 @@ TYPED_TEST(DataPipeImplTest, WrapAround) {
|
| EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Read 10 bytes.
|
| @@ -1906,8 +1944,10 @@ TYPED_TEST(DataPipeImplTest, TwoPhaseWriteReadCloseConsumer) {
|
| EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Start two-phase read.
|
| @@ -2024,9 +2064,11 @@ TYPED_TEST(DataPipeImplTest, WriteCloseProducerReadNoData) {
|
| EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Peek that data.
|
| @@ -2103,8 +2145,10 @@ TYPED_TEST(DataPipeImplTest, WriteReadCloseProducerWaitNoData) {
|
| EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Read that data.
|
| @@ -2172,8 +2216,10 @@ TYPED_TEST(DataPipeImplTest, BeginReadCloseProducerWaitEndReadNoData) {
|
| EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Start a two-phase read.
|
| @@ -2199,7 +2245,8 @@ TYPED_TEST(DataPipeImplTest, BeginReadCloseProducerWaitEndReadNoData) {
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // Add waiter (for the consumer to become readable).
|
| @@ -2382,8 +2429,10 @@ TYPED_TEST(DataPipeImplTest, TwoPhaseMoreInvalidArguments) {
|
| EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
|
| hss = HandleSignalsState();
|
| this->ConsumerRemoveAwakable(&waiter, &hss);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| - EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| hss.satisfiable_signals);
|
|
|
| // One element available.
|
| @@ -2500,6 +2549,209 @@ TYPED_TEST(DataPipeImplTest, WriteCloseProducerTwoPhaseReadAllData) {
|
| this->ConsumerClose();
|
| }
|
|
|
| +TYPED_TEST(DataPipeImplTest, ReadThreshold) {
|
| + const MojoCreateDataPipeOptions options = {
|
| + kSizeOfOptions, // |struct_size|.
|
| + MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
|
| + 1u, // |element_num_bytes|.
|
| + 1000u // |capacity_num_bytes|.
|
| + };
|
| + this->Create(options);
|
| + this->DoTransfer();
|
| +
|
| + // The default read threshold should be 0 (which means "default", i.e., one
|
| + // element).
|
| + uint32_t read_threshold_num_bytes = 123u;
|
| + this->ConsumerGetOptions(&read_threshold_num_bytes);
|
| + EXPECT_EQ(0u, read_threshold_num_bytes);
|
| +
|
| + Waiter waiter;
|
| + HandleSignalsState hss;
|
| +
|
| + // Add a waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
|
| + // Trivial wait: it shouldn't have the read threshold signal.
|
| + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
|
| + hss = HandleSignalsState();
|
| + this->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(0u, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Add a waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
|
| +
|
| + // Write a byte.
|
| + const char kTestData[] = {'x'};
|
| + const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
|
| + uint32_t num_bytes = kTestDataSize;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->ProducerWriteData(UserPointer<const void>(kTestData),
|
| + MakeUserPointer(&num_bytes), false));
|
| + EXPECT_EQ(kTestDataSize, num_bytes);
|
| +
|
| + // Wait for the read threshold signal.
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
|
| + hss = HandleSignalsState();
|
| + this->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Set the read threshold to 1.
|
| + this->ConsumerSetOptions(1);
|
| + read_threshold_num_bytes = 123u;
|
| + this->ConsumerGetOptions(&read_threshold_num_bytes);
|
| + EXPECT_EQ(1u, read_threshold_num_bytes);
|
| +
|
| + // Try to add a waiter: it should (still) already have the read threshold
|
| + // signal.
|
| + waiter.Init();
|
| + hss = HandleSignalsState();
|
| + ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Set the read threshold to 2.
|
| + this->ConsumerSetOptions(2);
|
| + read_threshold_num_bytes = 123u;
|
| + this->ConsumerGetOptions(&read_threshold_num_bytes);
|
| + EXPECT_EQ(2u, read_threshold_num_bytes);
|
| +
|
| + // Add a waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
|
| +
|
| + // Write another byte.
|
| + num_bytes = kTestDataSize;
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->ProducerWriteData(UserPointer<const void>(kTestData),
|
| + MakeUserPointer(&num_bytes), false));
|
| + EXPECT_EQ(kTestDataSize, num_bytes);
|
| +
|
| + // Wait for the read threshold signal.
|
| + EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::TinyTimeout(), nullptr));
|
| + hss = HandleSignalsState();
|
| + this->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Read one byte.
|
| + char read_byte = 'a';
|
| + num_bytes = sizeof(read_byte);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->ConsumerReadData(UserPointer<void>(&read_byte),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + EXPECT_EQ(1u, num_bytes);
|
| + EXPECT_EQ(kTestData[0], read_byte);
|
| +
|
| + // Add a waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
|
| + // Trivial wait: it shouldn't have the read threshold signal.
|
| + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
|
| + hss = HandleSignalsState();
|
| + this->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Add a waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
|
| + // Trivial wait: it shouldn't have the read threshold signal.
|
| + EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, waiter.Wait(0, nullptr));
|
| + hss = HandleSignalsState();
|
| + this->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Add a waiter.
|
| + waiter.Init();
|
| + ASSERT_EQ(MOJO_RESULT_OK,
|
| + this->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, nullptr));
|
| +
|
| + // Close the producer.
|
| + this->ProducerClose();
|
| +
|
| + // Wait; the current read threshold becomes never satisfiable.
|
| + EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + waiter.Wait(test::TinyTimeout(), nullptr));
|
| + hss = HandleSignalsState();
|
| + this->ConsumerRemoveAwakable(&waiter, &hss);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Set the read threshold back to zero to 0.
|
| + this->ConsumerSetOptions(0);
|
| + read_threshold_num_bytes = 123u;
|
| + this->ConsumerGetOptions(&read_threshold_num_bytes);
|
| + // "Get options" should preserve 0 (and not set it to the element size).
|
| + EXPECT_EQ(0u, read_threshold_num_bytes);
|
| +
|
| + // Try to add a waiter: it should have the read threshold signal.
|
| + waiter.Init();
|
| + hss = HandleSignalsState();
|
| + ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS,
|
| + this->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
| + MOJO_HANDLE_SIGNAL_READ_THRESHOLD,
|
| + hss.satisfiable_signals);
|
| +
|
| + // Read the other byte.
|
| + read_byte = 'a';
|
| + num_bytes = sizeof(read_byte);
|
| + EXPECT_EQ(MOJO_RESULT_OK,
|
| + this->ConsumerReadData(UserPointer<void>(&read_byte),
|
| + MakeUserPointer(&num_bytes), true, false));
|
| + EXPECT_EQ(1u, num_bytes);
|
| + EXPECT_EQ(kTestData[0], read_byte);
|
| +
|
| + // Try to add a waiter: the read threshold signal should be unsatisfiable.
|
| + waiter.Init();
|
| + hss = HandleSignalsState();
|
| + ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
| + this->ConsumerAddAwakable(
|
| + &waiter, MOJO_HANDLE_SIGNAL_READ_THRESHOLD, 0, &hss));
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
| + EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
| +
|
| + this->ConsumerClose();
|
| +}
|
| +
|
| } // namespace
|
| } // namespace system
|
| } // namespace mojo
|
|
|