Index: third_party/mojo/src/mojo/edk/system/remote_data_pipe_impl_unittest.cc |
diff --git a/third_party/mojo/src/mojo/edk/system/remote_data_pipe_impl_unittest.cc b/third_party/mojo/src/mojo/edk/system/remote_data_pipe_impl_unittest.cc |
deleted file mode 100644 |
index 1aef0fc54bd0270259c36bbc918e39f20a6f7bcc..0000000000000000000000000000000000000000 |
--- a/third_party/mojo/src/mojo/edk/system/remote_data_pipe_impl_unittest.cc |
+++ /dev/null |
@@ -1,509 +0,0 @@ |
-// Copyright 2015 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
- |
-// This file tests both |RemoteProducerDataPipeImpl| and |
-// |RemoteConsumerDataPipeImpl|. |
- |
-#include <stdint.h> |
- |
-#include "base/bind.h" |
-#include "base/location.h" |
-#include "base/logging.h" |
-#include "base/message_loop/message_loop.h" |
-#include "base/test/test_io_thread.h" |
-#include "mojo/public/cpp/system/macros.h" |
-#include "testing/gtest/include/gtest/gtest.h" |
-#include "third_party/mojo/src/mojo/edk/embedder/platform_channel_pair.h" |
-#include "third_party/mojo/src/mojo/edk/embedder/simple_platform_support.h" |
-#include "third_party/mojo/src/mojo/edk/system/channel.h" |
-#include "third_party/mojo/src/mojo/edk/system/channel_endpoint.h" |
-#include "third_party/mojo/src/mojo/edk/system/data_pipe.h" |
-#include "third_party/mojo/src/mojo/edk/system/data_pipe_consumer_dispatcher.h" |
-#include "third_party/mojo/src/mojo/edk/system/data_pipe_producer_dispatcher.h" |
-#include "third_party/mojo/src/mojo/edk/system/memory.h" |
-#include "third_party/mojo/src/mojo/edk/system/message_pipe.h" |
-#include "third_party/mojo/src/mojo/edk/system/raw_channel.h" |
-#include "third_party/mojo/src/mojo/edk/system/test_utils.h" |
-#include "third_party/mojo/src/mojo/edk/system/waiter.h" |
- |
-namespace mojo { |
-namespace system { |
-namespace { |
- |
-const MojoHandleSignals kAllSignals = MOJO_HANDLE_SIGNAL_READABLE | |
- MOJO_HANDLE_SIGNAL_WRITABLE | |
- MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
- |
-class RemoteDataPipeImplTest : public testing::Test { |
- public: |
- RemoteDataPipeImplTest() : io_thread_(base::TestIOThread::kAutoStart) {} |
- ~RemoteDataPipeImplTest() override {} |
- |
- void SetUp() override { |
- scoped_refptr<ChannelEndpoint> ep[2]; |
- message_pipes_[0] = MessagePipe::CreateLocalProxy(&ep[0]); |
- message_pipes_[1] = MessagePipe::CreateLocalProxy(&ep[1]); |
- |
- io_thread_.PostTaskAndWait( |
- FROM_HERE, base::Bind(&RemoteDataPipeImplTest::SetUpOnIOThread, |
- base::Unretained(this), ep[0], ep[1])); |
- } |
- |
- void TearDown() override { |
- EnsureMessagePipeClosed(0); |
- EnsureMessagePipeClosed(1); |
- io_thread_.PostTaskAndWait( |
- FROM_HERE, base::Bind(&RemoteDataPipeImplTest::TearDownOnIOThread, |
- base::Unretained(this))); |
- } |
- |
- protected: |
- static DataPipe* CreateLocal(size_t element_size, size_t num_elements) { |
- const MojoCreateDataPipeOptions options = { |
- static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)), |
- MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, |
- static_cast<uint32_t>(element_size), |
- static_cast<uint32_t>(num_elements * element_size)}; |
- MojoCreateDataPipeOptions validated_options = {}; |
- CHECK_EQ(DataPipe::ValidateCreateOptions(MakeUserPointer(&options), |
- &validated_options), |
- MOJO_RESULT_OK); |
- return DataPipe::CreateLocal(validated_options); |
- } |
- |
- scoped_refptr<MessagePipe> message_pipe(size_t i) { |
- return message_pipes_[i]; |
- } |
- |
- void EnsureMessagePipeClosed(size_t i) { |
- if (!message_pipes_[i]) |
- return; |
- message_pipes_[i]->Close(0); |
- message_pipes_[i] = nullptr; |
- } |
- |
- private: |
- void SetUpOnIOThread(scoped_refptr<ChannelEndpoint> ep0, |
- scoped_refptr<ChannelEndpoint> ep1) { |
- CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop()); |
- |
- embedder::PlatformChannelPair channel_pair; |
- channels_[0] = new Channel(&platform_support_); |
- channels_[0]->Init(RawChannel::Create(channel_pair.PassServerHandle())); |
- channels_[0]->SetBootstrapEndpoint(ep0); |
- channels_[1] = new Channel(&platform_support_); |
- channels_[1]->Init(RawChannel::Create(channel_pair.PassClientHandle())); |
- channels_[1]->SetBootstrapEndpoint(ep1); |
- } |
- |
- void TearDownOnIOThread() { |
- CHECK_EQ(base::MessageLoop::current(), io_thread_.message_loop()); |
- |
- if (channels_[0]) { |
- channels_[0]->Shutdown(); |
- channels_[0] = nullptr; |
- } |
- if (channels_[1]) { |
- channels_[1]->Shutdown(); |
- channels_[1] = nullptr; |
- } |
- } |
- |
- embedder::SimplePlatformSupport platform_support_; |
- base::TestIOThread io_thread_; |
- scoped_refptr<Channel> channels_[2]; |
- scoped_refptr<MessagePipe> message_pipes_[2]; |
- |
- MOJO_DISALLOW_COPY_AND_ASSIGN(RemoteDataPipeImplTest); |
-}; |
- |
-// These tests are heavier-weight than ideal. They test remote data pipes by |
-// passing data pipe (producer/consumer) dispatchers over remote message pipes. |
-// Make sure that the test fixture works properly (i.e., that the message pipe |
-// works properly, and that things are shut down correctly). |
-// TODO(vtl): Make lighter-weight tests. Ideally, we'd have tests for remote |
-// data pipes which don't involve message pipes (or even data pipe dispatchers). |
-TEST_F(RemoteDataPipeImplTest, Sanity) { |
- static const char kHello[] = "hello"; |
- char read_buffer[100] = {}; |
- uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); |
- Waiter waiter; |
- HandleSignalsState hss; |
- uintptr_t context = 0; |
- |
- // Write on MP 0 (port 0). Wait and receive on MP 1 (port 0). (Add the waiter |
- // first, to avoid any handling the case where it's already readable.) |
- waiter.Init(); |
- ASSERT_EQ(MOJO_RESULT_OK, |
- message_pipe(1)->AddAwakable( |
- 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); |
- EXPECT_EQ(MOJO_RESULT_OK, |
- message_pipe(0)->WriteMessage(0, UserPointer<const void>(kHello), |
- sizeof(kHello), nullptr, |
- MOJO_WRITE_MESSAGE_FLAG_NONE)); |
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context)); |
- EXPECT_EQ(123u, context); |
- hss = HandleSignalsState(); |
- message_pipe(1)->RemoveAwakable(0, &waiter, &hss); |
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, |
- hss.satisfied_signals); |
- EXPECT_EQ(kAllSignals, hss.satisfiable_signals); |
- EXPECT_EQ(MOJO_RESULT_OK, message_pipe(1)->ReadMessage( |
- 0, UserPointer<void>(read_buffer), |
- MakeUserPointer(&read_buffer_size), nullptr, |
- nullptr, MOJO_READ_MESSAGE_FLAG_NONE)); |
- EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size)); |
- EXPECT_STREQ(kHello, read_buffer); |
-} |
- |
-TEST_F(RemoteDataPipeImplTest, SendConsumerWithClosedProducer) { |
- char read_buffer[100] = {}; |
- uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); |
- DispatcherVector read_dispatchers; |
- uint32_t read_num_dispatchers = 10; // Maximum to get. |
- Waiter waiter; |
- HandleSignalsState hss; |
- uintptr_t context = 0; |
- |
- scoped_refptr<DataPipe> dp(CreateLocal(sizeof(int32_t), 1000)); |
- // This is the consumer dispatcher we'll send. |
- scoped_refptr<DataPipeConsumerDispatcher> consumer = |
- DataPipeConsumerDispatcher::Create(); |
- consumer->Init(dp); |
- |
- // Write to the producer and close it, before sending the consumer. |
- int32_t elements[10] = {123}; |
- uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
- EXPECT_EQ(MOJO_RESULT_OK, |
- dp->ProducerWriteData(UserPointer<const void>(elements), |
- MakeUserPointer(&num_bytes), false)); |
- EXPECT_EQ(1u * sizeof(elements[0]), num_bytes); |
- dp->ProducerClose(); |
- |
- // Write the consumer to MP 0 (port 0). Wait and receive on MP 1 (port 0). |
- // (Add the waiter first, to avoid any handling the case where it's already |
- // readable.) |
- waiter.Init(); |
- ASSERT_EQ(MOJO_RESULT_OK, |
- message_pipe(1)->AddAwakable( |
- 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); |
- { |
- DispatcherTransport transport( |
- test::DispatcherTryStartTransport(consumer.get())); |
- EXPECT_TRUE(transport.is_valid()); |
- |
- std::vector<DispatcherTransport> transports; |
- transports.push_back(transport); |
- EXPECT_EQ(MOJO_RESULT_OK, message_pipe(0)->WriteMessage( |
- 0, NullUserPointer(), 0, &transports, |
- MOJO_WRITE_MESSAGE_FLAG_NONE)); |
- transport.End(); |
- |
- // |consumer| should have been closed. This is |DCHECK()|ed when it is |
- // destroyed. |
- EXPECT_TRUE(consumer->HasOneRef()); |
- consumer = nullptr; |
- } |
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context)); |
- EXPECT_EQ(123u, context); |
- hss = HandleSignalsState(); |
- message_pipe(1)->RemoveAwakable(0, &waiter, &hss); |
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, |
- hss.satisfied_signals); |
- EXPECT_EQ(kAllSignals, hss.satisfiable_signals); |
- EXPECT_EQ(MOJO_RESULT_OK, |
- message_pipe(1)->ReadMessage( |
- 0, UserPointer<void>(read_buffer), |
- MakeUserPointer(&read_buffer_size), &read_dispatchers, |
- &read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE)); |
- EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size)); |
- EXPECT_EQ(1u, read_dispatchers.size()); |
- EXPECT_EQ(1u, read_num_dispatchers); |
- ASSERT_TRUE(read_dispatchers[0]); |
- EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); |
- |
- EXPECT_EQ(Dispatcher::Type::DATA_PIPE_CONSUMER, |
- read_dispatchers[0]->GetType()); |
- consumer = |
- static_cast<DataPipeConsumerDispatcher*>(read_dispatchers[0].get()); |
- read_dispatchers.clear(); |
- |
- waiter.Init(); |
- hss = HandleSignalsState(); |
- MojoResult result = |
- consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, &hss); |
- if (result == MOJO_RESULT_OK) { |
- context = 0; |
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context)); |
- EXPECT_EQ(456u, context); |
- consumer->RemoveAwakable(&waiter, &hss); |
- } else { |
- ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); |
- } |
- // We don't know if the fact that the producer has been closed is known yet. |
- EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE)); |
- EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE)); |
- |
- // Read one element. |
- elements[0] = -1; |
- elements[1] = -1; |
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
- EXPECT_EQ(MOJO_RESULT_OK, consumer->ReadData(UserPointer<void>(elements), |
- MakeUserPointer(&num_bytes), |
- MOJO_READ_DATA_FLAG_NONE)); |
- EXPECT_EQ(1u * sizeof(elements[0]), num_bytes); |
- EXPECT_EQ(123, elements[0]); |
- EXPECT_EQ(-1, elements[1]); |
- |
- waiter.Init(); |
- hss = HandleSignalsState(); |
- result = |
- consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss); |
- if (result == MOJO_RESULT_OK) { |
- context = 0; |
- EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, |
- waiter.Wait(test::ActionDeadline(), &context)); |
- EXPECT_EQ(789u, context); |
- consumer->RemoveAwakable(&waiter, &hss); |
- } else { |
- ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result); |
- } |
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals); |
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals); |
- |
- consumer->Close(); |
-} |
- |
-TEST_F(RemoteDataPipeImplTest, SendConsumerDuringTwoPhaseWrite) { |
- char read_buffer[100] = {}; |
- uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); |
- DispatcherVector read_dispatchers; |
- uint32_t read_num_dispatchers = 10; // Maximum to get. |
- Waiter waiter; |
- HandleSignalsState hss; |
- uintptr_t context = 0; |
- |
- scoped_refptr<DataPipe> dp(CreateLocal(sizeof(int32_t), 1000)); |
- // This is the consumer dispatcher we'll send. |
- scoped_refptr<DataPipeConsumerDispatcher> consumer = |
- DataPipeConsumerDispatcher::Create(); |
- consumer->Init(dp); |
- |
- void* write_ptr = nullptr; |
- uint32_t num_bytes = 0u; |
- EXPECT_EQ(MOJO_RESULT_OK, |
- dp->ProducerBeginWriteData(MakeUserPointer(&write_ptr), |
- MakeUserPointer(&num_bytes))); |
- ASSERT_GE(num_bytes, 1u * sizeof(int32_t)); |
- |
- // Write the consumer to MP 0 (port 0). Wait and receive on MP 1 (port 0). |
- // (Add the waiter first, to avoid any handling the case where it's already |
- // readable.) |
- waiter.Init(); |
- ASSERT_EQ(MOJO_RESULT_OK, |
- message_pipe(1)->AddAwakable( |
- 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); |
- { |
- DispatcherTransport transport( |
- test::DispatcherTryStartTransport(consumer.get())); |
- EXPECT_TRUE(transport.is_valid()); |
- |
- std::vector<DispatcherTransport> transports; |
- transports.push_back(transport); |
- EXPECT_EQ(MOJO_RESULT_OK, message_pipe(0)->WriteMessage( |
- 0, NullUserPointer(), 0, &transports, |
- MOJO_WRITE_MESSAGE_FLAG_NONE)); |
- transport.End(); |
- |
- // |consumer| should have been closed. This is |DCHECK()|ed when it is |
- // destroyed. |
- EXPECT_TRUE(consumer->HasOneRef()); |
- consumer = nullptr; |
- } |
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context)); |
- EXPECT_EQ(123u, context); |
- hss = HandleSignalsState(); |
- message_pipe(1)->RemoveAwakable(0, &waiter, &hss); |
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, |
- hss.satisfied_signals); |
- EXPECT_EQ(kAllSignals, hss.satisfiable_signals); |
- EXPECT_EQ(MOJO_RESULT_OK, |
- message_pipe(1)->ReadMessage( |
- 0, UserPointer<void>(read_buffer), |
- MakeUserPointer(&read_buffer_size), &read_dispatchers, |
- &read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE)); |
- EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size)); |
- EXPECT_EQ(1u, read_dispatchers.size()); |
- EXPECT_EQ(1u, read_num_dispatchers); |
- ASSERT_TRUE(read_dispatchers[0]); |
- EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); |
- |
- EXPECT_EQ(Dispatcher::Type::DATA_PIPE_CONSUMER, |
- read_dispatchers[0]->GetType()); |
- consumer = |
- static_cast<DataPipeConsumerDispatcher*>(read_dispatchers[0].get()); |
- read_dispatchers.clear(); |
- |
- // Now actually write the data, complete the two-phase write, and close the |
- // producer. |
- *static_cast<int32_t*>(write_ptr) = 123456; |
- EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData( |
- static_cast<uint32_t>(1u * sizeof(int32_t)))); |
- dp->ProducerClose(); |
- |
- // Wait for the consumer to be readable. |
- waiter.Init(); |
- hss = HandleSignalsState(); |
- MojoResult result = |
- consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, &hss); |
- if (result == MOJO_RESULT_OK) { |
- context = 0; |
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context)); |
- EXPECT_EQ(456u, context); |
- consumer->RemoveAwakable(&waiter, &hss); |
- } else { |
- ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); |
- } |
- // We don't know if the fact that the producer has been closed is known yet. |
- EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE)); |
- EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE)); |
- |
- // Read one element. |
- int32_t elements[10] = {-1, -1}; |
- num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0])); |
- EXPECT_EQ(MOJO_RESULT_OK, consumer->ReadData(UserPointer<void>(elements), |
- MakeUserPointer(&num_bytes), |
- MOJO_READ_DATA_FLAG_NONE)); |
- EXPECT_EQ(1u * sizeof(elements[0]), num_bytes); |
- EXPECT_EQ(123456, elements[0]); |
- EXPECT_EQ(-1, elements[1]); |
- |
- consumer->Close(); |
-} |
- |
-// Like |SendConsumerDuringTwoPhaseWrite|, but transfers the consumer during the |
-// second two-phase write (to try to test that the offset in circular buffer is |
-// properly preserved). |
-TEST_F(RemoteDataPipeImplTest, SendConsumerDuringSecondTwoPhaseWrite) { |
- char read_buffer[100] = {}; |
- uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer)); |
- DispatcherVector read_dispatchers; |
- uint32_t read_num_dispatchers = 10; // Maximum to get. |
- Waiter waiter; |
- HandleSignalsState hss; |
- uintptr_t context = 0; |
- |
- scoped_refptr<DataPipe> dp(CreateLocal(sizeof(int32_t), 1000)); |
- // This is the consumer dispatcher we'll send. |
- scoped_refptr<DataPipeConsumerDispatcher> consumer = |
- DataPipeConsumerDispatcher::Create(); |
- consumer->Init(dp); |
- |
- void* write_ptr = nullptr; |
- uint32_t num_bytes = 0u; |
- EXPECT_EQ(MOJO_RESULT_OK, |
- dp->ProducerBeginWriteData(MakeUserPointer(&write_ptr), |
- MakeUserPointer(&num_bytes))); |
- ASSERT_GE(num_bytes, 1u * sizeof(int32_t)); |
- *static_cast<int32_t*>(write_ptr) = 123456; |
- EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData( |
- static_cast<uint32_t>(1u * sizeof(int32_t)))); |
- |
- write_ptr = nullptr; |
- num_bytes = 0u; |
- EXPECT_EQ(MOJO_RESULT_OK, |
- dp->ProducerBeginWriteData(MakeUserPointer(&write_ptr), |
- MakeUserPointer(&num_bytes))); |
- ASSERT_GE(num_bytes, 1u * sizeof(int32_t)); |
- |
- // Write the consumer to MP 0 (port 0). Wait and receive on MP 1 (port 0). |
- // (Add the waiter first, to avoid any handling the case where it's already |
- // readable.) |
- waiter.Init(); |
- ASSERT_EQ(MOJO_RESULT_OK, |
- message_pipe(1)->AddAwakable( |
- 0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr)); |
- { |
- DispatcherTransport transport( |
- test::DispatcherTryStartTransport(consumer.get())); |
- EXPECT_TRUE(transport.is_valid()); |
- |
- std::vector<DispatcherTransport> transports; |
- transports.push_back(transport); |
- EXPECT_EQ(MOJO_RESULT_OK, message_pipe(0)->WriteMessage( |
- 0, NullUserPointer(), 0, &transports, |
- MOJO_WRITE_MESSAGE_FLAG_NONE)); |
- transport.End(); |
- |
- // |consumer| should have been closed. This is |DCHECK()|ed when it is |
- // destroyed. |
- EXPECT_TRUE(consumer->HasOneRef()); |
- consumer = nullptr; |
- } |
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context)); |
- EXPECT_EQ(123u, context); |
- hss = HandleSignalsState(); |
- message_pipe(1)->RemoveAwakable(0, &waiter, &hss); |
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE, |
- hss.satisfied_signals); |
- EXPECT_EQ(kAllSignals, hss.satisfiable_signals); |
- EXPECT_EQ(MOJO_RESULT_OK, |
- message_pipe(1)->ReadMessage( |
- 0, UserPointer<void>(read_buffer), |
- MakeUserPointer(&read_buffer_size), &read_dispatchers, |
- &read_num_dispatchers, MOJO_READ_MESSAGE_FLAG_NONE)); |
- EXPECT_EQ(0u, static_cast<size_t>(read_buffer_size)); |
- EXPECT_EQ(1u, read_dispatchers.size()); |
- EXPECT_EQ(1u, read_num_dispatchers); |
- ASSERT_TRUE(read_dispatchers[0]); |
- EXPECT_TRUE(read_dispatchers[0]->HasOneRef()); |
- |
- EXPECT_EQ(Dispatcher::Type::DATA_PIPE_CONSUMER, |
- read_dispatchers[0]->GetType()); |
- consumer = |
- static_cast<DataPipeConsumerDispatcher*>(read_dispatchers[0].get()); |
- read_dispatchers.clear(); |
- |
- // Now actually write the data, complete the two-phase write, and close the |
- // producer. |
- *static_cast<int32_t*>(write_ptr) = 789012; |
- EXPECT_EQ(MOJO_RESULT_OK, dp->ProducerEndWriteData( |
- static_cast<uint32_t>(1u * sizeof(int32_t)))); |
- dp->ProducerClose(); |
- |
- // Wait for the consumer to know that the producer is closed. |
- waiter.Init(); |
- hss = HandleSignalsState(); |
- MojoResult result = |
- consumer->AddAwakable(&waiter, MOJO_HANDLE_SIGNAL_PEER_CLOSED, 456, &hss); |
- if (result == MOJO_RESULT_OK) { |
- context = 0; |
- EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(test::ActionDeadline(), &context)); |
- EXPECT_EQ(456u, context); |
- consumer->RemoveAwakable(&waiter, &hss); |
- } else { |
- ASSERT_EQ(MOJO_RESULT_ALREADY_EXISTS, result); |
- } |
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
- hss.satisfied_signals); |
- EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED, |
- hss.satisfiable_signals); |
- |
- // Read some elements. |
- int32_t elements[10] = {}; |
- num_bytes = static_cast<uint32_t>(sizeof(elements)); |
- EXPECT_EQ(MOJO_RESULT_OK, consumer->ReadData(UserPointer<void>(elements), |
- MakeUserPointer(&num_bytes), |
- MOJO_READ_DATA_FLAG_NONE)); |
- EXPECT_EQ(2u * sizeof(elements[0]), num_bytes); |
- EXPECT_EQ(123456, elements[0]); |
- EXPECT_EQ(789012, elements[1]); |
- EXPECT_EQ(0, elements[2]); |
- |
- consumer->Close(); |
-} |
- |
-} // namespace |
-} // namespace system |
-} // namespace mojo |