| Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| index b66ffe169b48d6b28621e0691bd0ffddc1d6f3d4..b35517878947e336d86c0f7504466f5a004bfdc4 100644
|
| --- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| @@ -7,6 +7,7 @@
|
| #include <stddef.h>
|
| #include <stdint.h>
|
|
|
| +#include <algorithm>
|
| #include <utility>
|
|
|
| #include "base/bind.h"
|
| @@ -16,38 +17,42 @@
|
| #include "mojo/edk/embedder/platform_shared_buffer.h"
|
| #include "mojo/edk/embedder/platform_support.h"
|
| #include "mojo/edk/system/configuration.h"
|
| -#include "mojo/edk/system/data_pipe.h"
|
| +#include "mojo/edk/system/transport_data.h"
|
|
|
| namespace mojo {
|
| namespace edk {
|
|
|
| void DataPipeProducerDispatcher::Init(
|
| - ScopedPlatformHandle message_pipe,
|
| - char* serialized_write_buffer, size_t serialized_write_buffer_size) {
|
| - if (message_pipe.is_valid()) {
|
| - channel_ = RawChannel::Create(std::move(message_pipe));
|
| - channel_->SetSerializedData(
|
| - nullptr, 0u, serialized_write_buffer, serialized_write_buffer_size,
|
| - nullptr, nullptr);
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
|
| - } else {
|
| - error_ = true;
|
| + ScopedPlatformHandle channel_handle,
|
| + scoped_refptr<PlatformSharedBuffer> shared_buffer) {
|
| + CHECK(shared_buffer);
|
| + if (channel_handle.is_valid()) {
|
| + RawChannel* channel = RawChannel::Create(std::move(channel_handle));
|
| + data_pipe_->set_channel(channel);
|
| }
|
| + data_pipe_->set_shared_buffer(shared_buffer);
|
| + InitInternal();
|
| +}
|
| +
|
| +void DataPipeProducerDispatcher::InitInternal() {
|
| + peer_closed_ = data_pipe_->channel() == nullptr;
|
| + internal::g_io_thread_task_runner->PostTask(
|
| + FROM_HERE, base::Bind(&DataPipeProducerDispatcher::InitOnIO, this));
|
| + data_pipe_->Init();
|
| }
|
|
|
| void DataPipeProducerDispatcher::InitOnIO() {
|
| base::AutoLock locker(lock());
|
| - if (channel_)
|
| - channel_->Init(this);
|
| + calling_init_ = true;
|
| + RawChannel* channel = data_pipe_->channel();
|
| + if (channel)
|
| + channel->Init(this);
|
| + calling_init_ = false;
|
| }
|
|
|
| void DataPipeProducerDispatcher::CloseOnIO() {
|
| base::AutoLock locker(lock());
|
| - if (channel_) {
|
| - channel_->Shutdown();
|
| - channel_ = nullptr;
|
| - }
|
| + data_pipe_->Shutdown();
|
| }
|
|
|
| Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
|
| @@ -59,43 +64,32 @@ DataPipeProducerDispatcher::Deserialize(
|
| const void* source,
|
| size_t size,
|
| PlatformHandleVector* platform_handles) {
|
| - MojoCreateDataPipeOptions options;
|
| - ScopedPlatformHandle shared_memory_handle;
|
| - size_t shared_memory_size = 0;
|
| - ScopedPlatformHandle platform_handle =
|
| - DataPipe::Deserialize(source, size, platform_handles, &options,
|
| - &shared_memory_handle, &shared_memory_size);
|
| -
|
| - scoped_refptr<DataPipeProducerDispatcher> rv(Create(options));
|
| -
|
| - char* serialized_write_buffer = nullptr;
|
| - size_t serialized_write_buffer_size = 0;
|
| - scoped_refptr<PlatformSharedBuffer> shared_buffer;
|
| - scoped_ptr<PlatformSharedBufferMapping> mapping;
|
| - if (shared_memory_size) {
|
| - shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
|
| - shared_memory_size, std::move(shared_memory_handle));
|
| - mapping = shared_buffer->Map(0, shared_memory_size);
|
| - serialized_write_buffer = static_cast<char*>(mapping->GetBase());
|
| - serialized_write_buffer_size = shared_memory_size;
|
| - }
|
| -
|
| - rv->Init(std::move(platform_handle), serialized_write_buffer,
|
| - serialized_write_buffer_size);
|
| + scoped_refptr<DataPipe> data_pipe(
|
| + DataPipe::Deserialize(source, size, platform_handles));
|
| + scoped_refptr<DataPipeProducerDispatcher> rv(
|
| + new DataPipeProducerDispatcher(data_pipe));
|
| + rv->InitInternal();
|
| return rv;
|
| }
|
|
|
| DataPipeProducerDispatcher::DataPipeProducerDispatcher(
|
| const MojoCreateDataPipeOptions& options)
|
| - : options_(options), channel_(nullptr), error_(false), serialized_(false) {
|
| -}
|
| + : DataPipeProducerDispatcher(new DataPipe(options)) {}
|
| +
|
| +DataPipeProducerDispatcher::DataPipeProducerDispatcher(
|
| + scoped_refptr<DataPipe> data_pipe)
|
| + : data_pipe_(data_pipe),
|
| + calling_init_(false),
|
| + peer_closed_(false),
|
| + in_two_phase_write_(false),
|
| + two_phase_max_bytes_write_(0u) {}
|
|
|
| DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
|
| // See comment in ~MessagePipeDispatcher.
|
| - if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
|
| - channel_->Shutdown();
|
| + if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
|
| + data_pipe_->Shutdown();
|
| else
|
| - DCHECK(!channel_);
|
| + DCHECK(!data_pipe_->channel());
|
| }
|
|
|
| void DataPipeProducerDispatcher::CancelAllAwakablesNoLock() {
|
| @@ -111,14 +105,15 @@ void DataPipeProducerDispatcher::CloseImplNoLock() {
|
|
|
| scoped_refptr<Dispatcher>
|
| DataPipeProducerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
|
| + // This function is used by TransportData to make sure there are no references
|
| + // to the dispatcher it is trying to serialize and transport.
|
| lock().AssertAcquired();
|
|
|
| - SerializeInternal();
|
| + scoped_refptr<DataPipeProducerDispatcher> rv = Create(data_pipe_->options());
|
| + data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get());
|
| +
|
| + DCHECK(!in_two_phase_write_);
|
|
|
| - scoped_refptr<DataPipeProducerDispatcher> rv = Create(options_);
|
| - serialized_write_buffer_.swap(rv->serialized_write_buffer_);
|
| - rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
|
| - rv->serialized_ = true;
|
| return scoped_refptr<Dispatcher>(rv.get());
|
| }
|
|
|
| @@ -127,41 +122,54 @@ MojoResult DataPipeProducerDispatcher::WriteDataImplNoLock(
|
| uint32_t* num_bytes,
|
| MojoWriteDataFlags flags) {
|
| lock().AssertAcquired();
|
| - if (InTwoPhaseWrite())
|
| + if (in_two_phase_write_)
|
| return MOJO_RESULT_BUSY;
|
| - if (error_)
|
| + if (peer_closed_)
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
| - if (*num_bytes % options_.element_num_bytes != 0)
|
| + if (*num_bytes % data_pipe_->options().element_num_bytes != 0)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| if (*num_bytes == 0)
|
| return MOJO_RESULT_OK; // Nothing to do.
|
|
|
| - // For now, we ignore options.capacity_num_bytes as a total of all pending
|
| - // writes (and just treat it per message). We will implement that later if
|
| - // we need to. All current uses want all their data to be sent, and it's not
|
| - // clear that this backpressure should be done at the mojo layer or at a
|
| - // higher application layer.
|
| + // Don't write non element sized chunks.
|
| + uint32_t writable = data_pipe_->GetWritableBytes();
|
| + writable -= writable % data_pipe_->options().element_num_bytes;
|
| +
|
| bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
|
| uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
|
| - if (min_num_bytes_to_write > options_.capacity_num_bytes) {
|
| + if (min_num_bytes_to_write > writable) {
|
| // Don't return "should wait" since you can't wait for a specified amount of
|
| // data.
|
| return MOJO_RESULT_OUT_OF_RANGE;
|
| }
|
|
|
| - uint32_t num_bytes_to_write =
|
| - std::min(*num_bytes, options_.capacity_num_bytes);
|
| - if (num_bytes_to_write == 0)
|
| + if (writable == 0)
|
| return MOJO_RESULT_SHOULD_WAIT;
|
|
|
| - HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
|
| + uint32_t num_bytes_to_write = std::min(*num_bytes, writable);
|
| +
|
| + // The failure case for |WriteDataIntoSharedBuffer| is the shared
|
| + // buffer not existing, so we should wait.
|
| + if (!data_pipe_->WriteDataIntoSharedBuffer(elements, num_bytes_to_write)) {
|
| + return MOJO_RESULT_SHOULD_WAIT;
|
| + }
|
| +
|
| + // If we can't tell the other end about the write, pretend this write didn't
|
| + // happen and mark the other end as closed. We deal with any state changes
|
| + // due to the other side being closed in OnError.
|
| + if (!data_pipe_->NotifyWrite(num_bytes_to_write)) {
|
| + peer_closed_ = true;
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| + }
|
|
|
| *num_bytes = num_bytes_to_write;
|
| - WriteDataIntoMessages(elements, num_bytes_to_write);
|
|
|
| + HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
|
| + data_pipe_->UpdateFromWrite(num_bytes_to_write);
|
| HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
|
| if (!new_state.equals(old_state))
|
| awakable_list_.AwakeForStateChange(new_state);
|
| +
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| @@ -170,21 +178,26 @@ MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
|
| uint32_t* buffer_num_bytes,
|
| MojoWriteDataFlags flags) {
|
| lock().AssertAcquired();
|
| - if (InTwoPhaseWrite())
|
| + if (in_two_phase_write_)
|
| return MOJO_RESULT_BUSY;
|
| - if (error_)
|
| + if (peer_closed_)
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
|
|
| - // See comment in WriteDataImplNoLock about ignoring capacity_num_bytes.
|
| + uint32_t max_num_bytes_to_write;
|
| + void* temp_buf = data_pipe_->GetWriteBuffer(&max_num_bytes_to_write);
|
| +
|
| + if (max_num_bytes_to_write == 0)
|
| + return MOJO_RESULT_SHOULD_WAIT;
|
| +
|
| if (*buffer_num_bytes == 0)
|
| - *buffer_num_bytes = options_.capacity_num_bytes;
|
| + *buffer_num_bytes = max_num_bytes_to_write;
|
|
|
| - two_phase_data_.resize(*buffer_num_bytes);
|
| - *buffer = &two_phase_data_[0];
|
| + // Don't promise more bytes than we have.
|
| + *buffer_num_bytes = std::min(max_num_bytes_to_write, *buffer_num_bytes);
|
|
|
| - // TODO: if buffer_num_bytes.Get() > GetConfiguration().max_message_num_bytes
|
| - // we can construct a MessageInTransit here. But then we need to make
|
| - // MessageInTransit support changing its data size later.
|
| + two_phase_max_bytes_write_ = *buffer_num_bytes;
|
| + *buffer = temp_buf;
|
| + in_two_phase_write_ = true;
|
|
|
| return MOJO_RESULT_OK;
|
| }
|
| @@ -192,28 +205,30 @@ MojoResult DataPipeProducerDispatcher::BeginWriteDataImplNoLock(
|
| MojoResult DataPipeProducerDispatcher::EndWriteDataImplNoLock(
|
| uint32_t num_bytes_written) {
|
| lock().AssertAcquired();
|
| - if (!InTwoPhaseWrite())
|
| + if (!in_two_phase_write_)
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
|
|
| - // Note: Allow successful completion of the two-phase write even if the other
|
| - // side has been closed.
|
| - MojoResult rv = MOJO_RESULT_OK;
|
| - if (num_bytes_written > two_phase_data_.size() ||
|
| - num_bytes_written % options_.element_num_bytes != 0) {
|
| - rv = MOJO_RESULT_INVALID_ARGUMENT;
|
| - } else if (channel_) {
|
| - WriteDataIntoMessages(&two_phase_data_[0], num_bytes_written);
|
| + HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
|
| + in_two_phase_write_ = false;
|
| +
|
| + if (num_bytes_written > two_phase_max_bytes_write_ ||
|
| + num_bytes_written % data_pipe_->options().element_num_bytes != 0) {
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| - // Two-phase write ended even on failure.
|
| - two_phase_data_.clear();
|
| - // If we're now writable, we *became* writable (since we weren't writable
|
| - // during the two-phase write), so awake producer awakables.
|
| + data_pipe_->UpdateFromWrite(num_bytes_written);
|
| +
|
| HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
|
| - if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
|
| + if (!new_state.equals(old_state))
|
| awakable_list_.AwakeForStateChange(new_state);
|
|
|
| - return rv;
|
| + // Note: Allow successful completion of the two-phase write even if the other
|
| + // side has been closed.
|
| + // Deal with state changes due to peer being closed in OnError.
|
| + if (!data_pipe_->NotifyWrite(num_bytes_written))
|
| + peer_closed_ = true;
|
| +
|
| + return MOJO_RESULT_OK;
|
| }
|
|
|
| HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
|
| @@ -221,13 +236,14 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateImplNoLock()
|
| lock().AssertAcquired();
|
|
|
| HandleSignalsState rv;
|
| - if (!error_) {
|
| - if (!InTwoPhaseWrite())
|
| + if (!peer_closed_) {
|
| + if (!in_two_phase_write_ && data_pipe_->GetWritableBytes())
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| } else {
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| }
|
| +
|
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| return rv;
|
| }
|
| @@ -238,8 +254,6 @@ MojoResult DataPipeProducerDispatcher::AddAwakableImplNoLock(
|
| uintptr_t context,
|
| HandleSignalsState* signals_state) {
|
| lock().AssertAcquired();
|
| - if (channel_)
|
| - channel_->EnsureLazyInitialized();
|
| HandleSignalsState state = GetHandleSignalsStateImplNoLock();
|
| if (state.satisfies(signals)) {
|
| if (signals_state)
|
| @@ -268,34 +282,14 @@ void DataPipeProducerDispatcher::RemoveAwakableImplNoLock(
|
| void DataPipeProducerDispatcher::StartSerializeImplNoLock(
|
| size_t* max_size,
|
| size_t* max_platform_handles) {
|
| - if (!serialized_)
|
| - SerializeInternal();
|
| -
|
| - DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
|
| - !serialized_write_buffer_.empty(), max_size,
|
| - max_platform_handles);
|
| + data_pipe_->StartSerialize(max_size, max_platform_handles);
|
| }
|
|
|
| bool DataPipeProducerDispatcher::EndSerializeAndCloseImplNoLock(
|
| void* destination,
|
| size_t* actual_size,
|
| PlatformHandleVector* platform_handles) {
|
| - ScopedPlatformHandle shared_memory_handle;
|
| - size_t shared_memory_size = serialized_write_buffer_.size();
|
| - if (shared_memory_size) {
|
| - scoped_refptr<PlatformSharedBuffer> shared_buffer(
|
| - internal::g_platform_support->CreateSharedBuffer(
|
| - shared_memory_size));
|
| - scoped_ptr<PlatformSharedBufferMapping> mapping(
|
| - shared_buffer->Map(0, shared_memory_size));
|
| - memcpy(mapping->GetBase(), &serialized_write_buffer_[0],
|
| - shared_memory_size);
|
| - shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
|
| - }
|
| -
|
| - DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
|
| - std::move(shared_memory_handle), shared_memory_size,
|
| - destination, actual_size, platform_handles);
|
| + data_pipe_->EndSerialize(destination, actual_size, platform_handles);
|
| CloseImplNoLock();
|
| return true;
|
| }
|
| @@ -310,44 +304,82 @@ void DataPipeProducerDispatcher::TransportEnded() {
|
|
|
| bool DataPipeProducerDispatcher::IsBusyNoLock() const {
|
| lock().AssertAcquired();
|
| - return InTwoPhaseWrite();
|
| + return in_two_phase_write_;
|
| +}
|
| +
|
| +bool DataPipeProducerDispatcher::ProcessCommand(
|
| + const DataPipeCommandHeader& command,
|
| + ScopedPlatformHandleVectorPtr platform_handles) {
|
| + // Handles write/read case and shared buffer becoming available case.
|
| + return data_pipe_->ProcessCommand(command, std::move(platform_handles));
|
| }
|
|
|
| void DataPipeProducerDispatcher::OnReadMessage(
|
| const MessageInTransit::View& message_view,
|
| ScopedPlatformHandleVectorPtr platform_handles) {
|
| - CHECK(false) << "DataPipeProducerDispatcher shouldn't get any messages.";
|
| + const DataPipeCommandHeader* command =
|
| + static_cast<const DataPipeCommandHeader*>(message_view.bytes());
|
| + DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader));
|
| +
|
| + if (started_transport_.Try()) {
|
| + // We're not in the middle of being sent.
|
| +
|
| + // Can get synchronously called back from RawChannel::Init in InitOnIO if
|
| + // there was initial data. InitOnIO locks, so don't lock twice.
|
| + scoped_ptr<base::AutoLock> locker;
|
| + if (!calling_init_) {
|
| + locker.reset(new base::AutoLock(lock()));
|
| + }
|
| +
|
| + if (ProcessCommand(*command, std::move(platform_handles))) {
|
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| + }
|
| + started_transport_.Release();
|
| + } else {
|
| + // DataPipe::Serialize calls ReleaseHandle on the channel, which
|
| + // acquires RawChannel's read_lock_. The function OnReadMessage is only
|
| + // called while read_lock_ is acquired, and not after ReleaseHandle has been
|
| + // called. This means this function will only be called before Serialize
|
| + // calls ReleaseHandle, meaning the serialisation will not have started yet.
|
| + // We only notify awakables if we're not in the process of being
|
| + // transported.
|
| + ProcessCommand(*command, std::move(platform_handles));
|
| + }
|
| }
|
|
|
| void DataPipeProducerDispatcher::OnError(Error error) {
|
| switch (error) {
|
| - case ERROR_READ_BROKEN:
|
| - case ERROR_READ_BAD_MESSAGE:
|
| - case ERROR_READ_UNKNOWN:
|
| - LOG(ERROR) << "DataPipeProducerDispatcher shouldn't get read error.";
|
| - break;
|
| case ERROR_READ_SHUTDOWN:
|
| // The other side was cleanly closed, so this isn't actually an error.
|
| DVLOG(1) << "DataPipeProducerDispatcher read error (shutdown)";
|
| break;
|
| + case ERROR_READ_BROKEN:
|
| + LOG(ERROR) << "DataPipeProducerDispatcher read error (connection broken)";
|
| + break;
|
| + case ERROR_READ_BAD_MESSAGE:
|
| + // Receiving a bad message means either a bug, data corruption, or
|
| + // malicious attack (probably due to some other bug).
|
| + LOG(ERROR) << "DataPipeProducerDispatcher read error (received bad "
|
| + << "message)";
|
| + break;
|
| + case ERROR_READ_UNKNOWN:
|
| + LOG(ERROR) << "DataPipeProducerDispatcher read error (unknown)";
|
| + break;
|
| case ERROR_WRITE:
|
| - // Write errors are slightly notable: they probably shouldn't happen under
|
| - // normal operation (but maybe the other side crashed).
|
| - LOG(WARNING) << "DataPipeProducerDispatcher write error";
|
| + LOG(ERROR) << "DataPipeProducerDispatcher write error";
|
| break;
|
| }
|
|
|
| - error_ = true;
|
| + peer_closed_ = true;
|
| if (started_transport_.Try()) {
|
| base::AutoLock locker(lock());
|
| // We can get two OnError callbacks before the post task below completes.
|
| // Although RawChannel still has a pointer to this object until Shutdown is
|
| // called, that is safe since this class always does a PostTask to the IO
|
| // thread to self destruct.
|
| - if (channel_) {
|
| + if (data_pipe_->channel()) {
|
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| - channel_->Shutdown();
|
| - channel_ = nullptr;
|
| + data_pipe_->Shutdown();
|
| }
|
| started_transport_.Release();
|
| } else {
|
| @@ -355,56 +387,5 @@ void DataPipeProducerDispatcher::OnError(Error error) {
|
| }
|
| }
|
|
|
| -bool DataPipeProducerDispatcher::InTwoPhaseWrite() const {
|
| - return !two_phase_data_.empty();
|
| -}
|
| -
|
| -bool DataPipeProducerDispatcher::WriteDataIntoMessages(
|
| - const void* elements,
|
| - uint32_t num_bytes) {
|
| - // The maximum amount of data to send per message (make it a multiple of the
|
| - // element size.
|
| - size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
|
| - max_message_num_bytes -= max_message_num_bytes % options_.element_num_bytes;
|
| - DCHECK_GT(max_message_num_bytes, 0u);
|
| -
|
| - uint32_t offset = 0;
|
| - while (offset < num_bytes) {
|
| - uint32_t message_num_bytes =
|
| - std::min(static_cast<uint32_t>(max_message_num_bytes),
|
| - num_bytes - offset);
|
| - scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| - MessageInTransit::Type::MESSAGE, message_num_bytes,
|
| - static_cast<const char*>(elements) + offset));
|
| - if (!channel_->WriteMessage(std::move(message))) {
|
| - error_ = true;
|
| - return false;
|
| - }
|
| -
|
| - offset += message_num_bytes;
|
| - }
|
| -
|
| - return true;
|
| -}
|
| -
|
| -void DataPipeProducerDispatcher::SerializeInternal() {
|
| - // We need to stop watching handle immediately, even though not on IO thread,
|
| - // so that other messages aren't read after this.
|
| - if (channel_) {
|
| - std::vector<char> serialized_read_buffer;
|
| - std::vector<int> fds;
|
| - bool write_error = false;
|
| - serialized_platform_handle_ = channel_->ReleaseHandle(
|
| - &serialized_read_buffer, &serialized_write_buffer_, &fds, &fds,
|
| - &write_error);
|
| - CHECK(serialized_read_buffer.empty());
|
| - CHECK(fds.empty());
|
| - if (write_error)
|
| - serialized_platform_handle_.reset();
|
| - channel_ = nullptr;
|
| - }
|
| - serialized_ = true;
|
| -}
|
| -
|
| } // namespace edk
|
| } // namespace mojo
|
|
|