| Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| index 68c9e0e038875f75385c383b3bb018b985d020f2..8cc3d7ba345ef441f33903c4334c4c869d76906a 100644
|
| --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| @@ -16,49 +16,42 @@
|
| #include "mojo/edk/embedder/embedder_internal.h"
|
| #include "mojo/edk/embedder/platform_shared_buffer.h"
|
| #include "mojo/edk/embedder/platform_support.h"
|
| -#include "mojo/edk/system/data_pipe.h"
|
|
|
| namespace mojo {
|
| namespace edk {
|
|
|
| -struct SharedMemoryHeader {
|
| - uint32_t data_size;
|
| - uint32_t read_buffer_size;
|
| -};
|
| -
|
| void DataPipeConsumerDispatcher::Init(
|
| - ScopedPlatformHandle message_pipe,
|
| - char* serialized_read_buffer, size_t serialized_read_buffer_size) {
|
| - if (message_pipe.is_valid()) {
|
| - channel_ = RawChannel::Create(std::move(message_pipe));
|
| - channel_->SetSerializedData(
|
| - serialized_read_buffer, serialized_read_buffer_size, nullptr, 0u,
|
| - nullptr, nullptr);
|
| - internal::g_io_thread_task_runner->PostTask(
|
| - FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
|
| - } else {
|
| - // The data pipe consumer could have read all the data and the producer
|
| - // closed its end subsequently (before the consumer was sent). In that case
|
| - // when we deserialize the consumer we must make sure to set error_ or
|
| - // otherwise the peer-closed signal will never be satisfied.
|
| - error_ = true;
|
| + ScopedPlatformHandle channel_handle,
|
| + scoped_refptr<PlatformSharedBuffer> shared_buffer) {
|
| + CHECK(shared_buffer);
|
| + if (channel_handle.is_valid()) {
|
| + RawChannel* channel = RawChannel::Create(std::move(channel_handle));
|
| + data_pipe_->set_channel(channel);
|
| }
|
| + data_pipe_->set_shared_buffer(shared_buffer);
|
| + InitInternal();
|
| +}
|
| +
|
| +void DataPipeConsumerDispatcher::InitInternal() {
|
| + //LOG(ERROR) << "KEK";
|
| + peer_closed_ = data_pipe_->channel() == nullptr;
|
| + internal::g_io_thread_task_runner->PostTask(
|
| + FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this));
|
| + data_pipe_->Init();
|
| }
|
|
|
| void DataPipeConsumerDispatcher::InitOnIO() {
|
| base::AutoLock locker(lock());
|
| calling_init_ = true;
|
| - if (channel_)
|
| - channel_->Init(this);
|
| + RawChannel* channel = data_pipe_->channel();
|
| + if (channel)
|
| + channel->Init(this);
|
| calling_init_ = false;
|
| }
|
|
|
| void DataPipeConsumerDispatcher::CloseOnIO() {
|
| base::AutoLock locker(lock());
|
| - if (channel_) {
|
| - channel_->Shutdown();
|
| - channel_ = nullptr;
|
| - }
|
| + data_pipe_->Shutdown();
|
| }
|
|
|
| Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
|
| @@ -70,61 +63,32 @@ DataPipeConsumerDispatcher::Deserialize(
|
| const void* source,
|
| size_t size,
|
| PlatformHandleVector* platform_handles) {
|
| - MojoCreateDataPipeOptions options;
|
| - ScopedPlatformHandle shared_memory_handle;
|
| - size_t shared_memory_size = 0;
|
| -
|
| - ScopedPlatformHandle platform_handle =
|
| - DataPipe::Deserialize(source, size, platform_handles, &options,
|
| - &shared_memory_handle, &shared_memory_size);
|
| -
|
| - scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options));
|
| -
|
| - char* serialized_read_buffer = nullptr;
|
| - size_t serialized_read_buffer_size = 0;
|
| - scoped_refptr<PlatformSharedBuffer> shared_buffer;
|
| - scoped_ptr<PlatformSharedBufferMapping> mapping;
|
| - if (shared_memory_size) {
|
| - shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle(
|
| - shared_memory_size, std::move(shared_memory_handle));
|
| - mapping = shared_buffer->Map(0, shared_memory_size);
|
| - char* buffer = static_cast<char*>(mapping->GetBase());
|
| - SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer);
|
| - buffer += sizeof(SharedMemoryHeader);
|
| - if (header->data_size) {
|
| - rv->data_.assign(buffer, buffer + header->data_size);
|
| - buffer += header->data_size;
|
| - }
|
| -
|
| - if (header->read_buffer_size) {
|
| - serialized_read_buffer = buffer;
|
| - serialized_read_buffer_size = header->read_buffer_size;
|
| - buffer += header->read_buffer_size;
|
| - }
|
| - }
|
| -
|
| - rv->Init(std::move(platform_handle), serialized_read_buffer,
|
| - serialized_read_buffer_size);
|
| + scoped_refptr<DataPipe> data_pipe =
|
| + DataPipe::Deserialize(source, size, platform_handles);
|
| + scoped_refptr<DataPipeConsumerDispatcher> rv(
|
| + new DataPipeConsumerDispatcher(data_pipe));
|
| + rv->InitInternal();
|
| return rv;
|
| }
|
|
|
| DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
|
| const MojoCreateDataPipeOptions& options)
|
| - : options_(options),
|
| - channel_(nullptr),
|
| + : DataPipeConsumerDispatcher(new DataPipe(options)) {}
|
| +
|
| +DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
|
| + scoped_refptr<DataPipe> data_pipe)
|
| + : data_pipe_(data_pipe),
|
| calling_init_(false),
|
| + peer_closed_(false),
|
| in_two_phase_read_(false),
|
| - two_phase_max_bytes_read_(0),
|
| - error_(false),
|
| - serialized_(false) {
|
| -}
|
| + two_phase_max_bytes_read_(0u) {}
|
|
|
| DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
|
| // See comment in ~MessagePipeDispatcher.
|
| - if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
|
| - channel_->Shutdown();
|
| + if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread())
|
| + data_pipe_->Shutdown();
|
| else
|
| - DCHECK(!channel_);
|
| + DCHECK(!data_pipe_->channel());
|
| }
|
|
|
| void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() {
|
| @@ -142,13 +106,10 @@ scoped_refptr<Dispatcher>
|
| DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
|
| lock().AssertAcquired();
|
|
|
| - SerializeInternal();
|
| + scoped_refptr<DataPipeConsumerDispatcher> rv = Create(data_pipe_->options());
|
| + data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get());
|
|
|
| - scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_);
|
| - data_.swap(rv->data_);
|
| - serialized_read_buffer_.swap(rv->serialized_read_buffer_);
|
| - rv->serialized_platform_handle_ = std::move(serialized_platform_handle_);
|
| - rv->serialized_ = true;
|
| + DCHECK(!in_two_phase_read_);
|
|
|
| return scoped_refptr<Dispatcher>(rv.get());
|
| }
|
| @@ -158,8 +119,6 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
|
| uint32_t* num_bytes,
|
| MojoReadDataFlags flags) {
|
| lock().AssertAcquired();
|
| - if (channel_)
|
| - channel_->EnsureLazyInitialized();
|
| if (in_two_phase_read_)
|
| return MOJO_RESULT_BUSY;
|
|
|
| @@ -168,45 +127,57 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock(
|
| (flags & MOJO_READ_DATA_FLAG_DISCARD))
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
|
| - DVLOG_IF(2, elements)
|
| - << "Query mode: ignoring non-null |elements|";
|
| - *num_bytes = static_cast<uint32_t>(data_.size());
|
| + DVLOG_IF(2, elements) << "Query mode: ignoring non-null |elements|";
|
| + *num_bytes = data_pipe_->GetReadableBytes();
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| bool discard = false;
|
| if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
|
| - // These flags are mutally exclusive.
|
| + // These flags are mutually exclusive.
|
| if (flags & MOJO_READ_DATA_FLAG_PEEK)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| - DVLOG_IF(2, elements)
|
| - << "Discard mode: ignoring non-null |elements|";
|
| + DVLOG_IF(2, elements) << "Discard mode: ignoring non-null |elements|";
|
| discard = true;
|
| }
|
|
|
| uint32_t max_num_bytes_to_read = *num_bytes;
|
| - if (max_num_bytes_to_read % options_.element_num_bytes != 0)
|
| + if (max_num_bytes_to_read % data_pipe_->options().element_num_bytes != 0)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
|
| - uint32_t min_num_bytes_to_read =
|
| - all_or_none ? max_num_bytes_to_read : 0;
|
| + uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
|
|
|
| - if (min_num_bytes_to_read > data_.size())
|
| - return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE;
|
| + uint32_t readable_bytes = data_pipe_->GetReadableBytes();
|
| + if (min_num_bytes_to_read > readable_bytes)
|
| + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| + : MOJO_RESULT_OUT_OF_RANGE;
|
|
|
| - uint32_t bytes_to_read = std::min(max_num_bytes_to_read,
|
| - static_cast<uint32_t>(data_.size()));
|
| + uint32_t bytes_to_read = std::min(max_num_bytes_to_read, readable_bytes);
|
| if (bytes_to_read == 0)
|
| - return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
|
| + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| + : MOJO_RESULT_SHOULD_WAIT;
|
| +
|
| + // |ReadDataFromSharedBuffer| failing means we haven't got the shared buffer
|
| + // yet, so we should wait.
|
| + if (!discard &&
|
| + !data_pipe_->ReadDataFromSharedBuffer(elements, bytes_to_read)) {
|
| + return MOJO_RESULT_SHOULD_WAIT;
|
| + }
|
|
|
| - if (!discard)
|
| - memcpy(elements, &data_[0], bytes_to_read);
|
| *num_bytes = bytes_to_read;
|
|
|
| - bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
|
| - if (discard || !peek)
|
| - data_.erase(data_.begin(), data_.begin() + bytes_to_read);
|
| + HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
|
| + bool should_update = !(flags & MOJO_READ_DATA_FLAG_PEEK) || discard;
|
| + if (should_update)
|
| + data_pipe_->UpdateFromRead(bytes_to_read);
|
| + HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
|
| + if (!new_state.equals(old_state))
|
| + awakable_list_.AwakeForStateChange(new_state);
|
| +
|
| + // Deal with state changes due to peer being closed in OnError.
|
| + if (should_update && !data_pipe_->NotifyRead(bytes_to_read))
|
| + peer_closed_ = true;
|
|
|
| return MOJO_RESULT_OK;
|
| }
|
| @@ -216,8 +187,6 @@ MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
|
| uint32_t* buffer_num_bytes,
|
| MojoReadDataFlags flags) {
|
| lock().AssertAcquired();
|
| - if (channel_)
|
| - channel_->EnsureLazyInitialized();
|
| if (in_two_phase_read_)
|
| return MOJO_RESULT_BUSY;
|
|
|
| @@ -227,12 +196,16 @@ MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock(
|
| (flags & MOJO_READ_DATA_FLAG_PEEK))
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size());
|
| + uint32_t readable_bytes;
|
| + const void* temp_buf = data_pipe_->GetReadBuffer(&readable_bytes);
|
| +
|
| + uint32_t max_num_bytes_to_read = readable_bytes;
|
| if (max_num_bytes_to_read == 0)
|
| - return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT;
|
| + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| + : MOJO_RESULT_SHOULD_WAIT;
|
|
|
| in_two_phase_read_ = true;
|
| - *buffer = &data_[0];
|
| + *buffer = temp_buf;
|
| *buffer_num_bytes = max_num_bytes_to_read;
|
| two_phase_max_bytes_read_ = max_num_bytes_to_read;
|
|
|
| @@ -246,32 +219,24 @@ MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock(
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
|
|
| HandleSignalsState old_state = GetHandleSignalsStateImplNoLock();
|
| - MojoResult rv;
|
| + in_two_phase_read_ = false;
|
| +
|
| if (num_bytes_read > two_phase_max_bytes_read_ ||
|
| - num_bytes_read % options_.element_num_bytes != 0) {
|
| - rv = MOJO_RESULT_INVALID_ARGUMENT;
|
| - } else {
|
| - rv = MOJO_RESULT_OK;
|
| - data_.erase(data_.begin(), data_.begin() + num_bytes_read);
|
| + num_bytes_read % data_pipe_->options().element_num_bytes != 0) {
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| }
|
|
|
| - in_two_phase_read_ = false;
|
| - two_phase_max_bytes_read_ = 0;
|
| - if (!data_received_during_two_phase_read_.empty()) {
|
| - if (data_.empty()) {
|
| - data_received_during_two_phase_read_.swap(data_);
|
| - } else {
|
| - data_.insert(data_.end(), data_received_during_two_phase_read_.begin(),
|
| - data_received_during_two_phase_read_.end());
|
| - data_received_during_two_phase_read_.clear();
|
| - }
|
| - }
|
| + data_pipe_->UpdateFromRead(num_bytes_read);
|
|
|
| HandleSignalsState new_state = GetHandleSignalsStateImplNoLock();
|
| if (!new_state.equals(old_state))
|
| awakable_list_.AwakeForStateChange(new_state);
|
|
|
| - return rv;
|
| + // Deal with state changes due to peer being closed in OnError.
|
| + if (!data_pipe_->NotifyRead(num_bytes_read))
|
| + peer_closed_ = true;
|
| +
|
| + return MOJO_RESULT_OK;
|
| }
|
|
|
| HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock()
|
| @@ -279,16 +244,19 @@ HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock()
|
| lock().AssertAcquired();
|
|
|
| HandleSignalsState rv;
|
| - if (!data_.empty()) {
|
| + if (data_pipe_->GetReadableBytes()) {
|
| if (!in_two_phase_read_)
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| - } else if (!error_) {
|
| - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| }
|
|
|
| - if (error_)
|
| + if (peer_closed_) {
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| + } else {
|
| + // We could become readable in the future.
|
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
|
| + }
|
| +
|
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
|
| return rv;
|
| }
|
| @@ -299,8 +267,6 @@ MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock(
|
| uintptr_t context,
|
| HandleSignalsState* signals_state) {
|
| lock().AssertAcquired();
|
| - if (channel_)
|
| - channel_->EnsureLazyInitialized();
|
| HandleSignalsState state = GetHandleSignalsStateImplNoLock();
|
| if (state.satisfies(signals)) {
|
| if (signals_state)
|
| @@ -329,57 +295,14 @@ void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock(
|
| void DataPipeConsumerDispatcher::StartSerializeImplNoLock(
|
| size_t* max_size,
|
| size_t* max_platform_handles) {
|
| - if (!serialized_) {
|
| - // Handles the case where we have messages read off RawChannel but not ready
|
| - // by MojoReadMessage.
|
| - SerializeInternal();
|
| - }
|
| -
|
| - DataPipe::StartSerialize(serialized_platform_handle_.is_valid(),
|
| - !data_.empty() || !serialized_read_buffer_.empty(),
|
| - max_size, max_platform_handles);
|
| + data_pipe_->StartSerialize(max_size, max_platform_handles);
|
| }
|
|
|
| bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock(
|
| void* destination,
|
| size_t* actual_size,
|
| PlatformHandleVector* platform_handles) {
|
| - ScopedPlatformHandle shared_memory_handle;
|
| - size_t shared_memory_size = data_.size() + serialized_read_buffer_.size();
|
| - if (shared_memory_size) {
|
| - shared_memory_size += sizeof(SharedMemoryHeader);
|
| - SharedMemoryHeader header;
|
| - header.data_size = static_cast<uint32_t>(data_.size());
|
| - header.read_buffer_size =
|
| - static_cast<uint32_t>(serialized_read_buffer_.size());
|
| -
|
| - scoped_refptr<PlatformSharedBuffer> shared_buffer(
|
| - internal::g_platform_support->CreateSharedBuffer(
|
| - shared_memory_size));
|
| - scoped_ptr<PlatformSharedBufferMapping> mapping(
|
| - shared_buffer->Map(0, shared_memory_size));
|
| -
|
| - char* start = static_cast<char*>(mapping->GetBase());
|
| - memcpy(start, &header, sizeof(SharedMemoryHeader));
|
| - start += sizeof(SharedMemoryHeader);
|
| -
|
| - if (!data_.empty()) {
|
| - memcpy(start, &data_[0], data_.size());
|
| - start += data_.size();
|
| - }
|
| -
|
| - if (!serialized_read_buffer_.empty()) {
|
| - memcpy(start, &serialized_read_buffer_[0],
|
| - serialized_read_buffer_.size());
|
| - start += serialized_read_buffer_.size();
|
| - }
|
| -
|
| - shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release());
|
| - }
|
| -
|
| - DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_),
|
| - std::move(shared_memory_handle), shared_memory_size,
|
| - destination, actual_size, platform_handles);
|
| + data_pipe_->EndSerialize(destination, actual_size, platform_handles);
|
| CloseImplNoLock();
|
| return true;
|
| }
|
| @@ -397,7 +320,7 @@ void DataPipeConsumerDispatcher::TransportEnded() {
|
| // for.
|
| // TODO(jam): should we care about only alerting if it was empty before
|
| // TransportStarted?
|
| - if (!data_.empty())
|
| + if (data_pipe_->GetReadableBytes())
|
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| }
|
|
|
| @@ -406,34 +329,43 @@ bool DataPipeConsumerDispatcher::IsBusyNoLock() const {
|
| return in_two_phase_read_;
|
| }
|
|
|
| +bool DataPipeConsumerDispatcher::ProcessCommand(
|
| + const DataPipeCommandHeader& command,
|
| + ScopedPlatformHandleVectorPtr platform_handles) {
|
| + // Handles write/read case and shared buffer becoming available case.
|
| + return data_pipe_->ProcessCommand(command, std::move(platform_handles));
|
| +}
|
| +
|
| void DataPipeConsumerDispatcher::OnReadMessage(
|
| const MessageInTransit::View& message_view,
|
| ScopedPlatformHandleVectorPtr platform_handles) {
|
| - const char* bytes_start = static_cast<const char*>(message_view.bytes());
|
| - const char* bytes_end = bytes_start + message_view.num_bytes();
|
| + const DataPipeCommandHeader* command =
|
| + static_cast<const DataPipeCommandHeader*>(message_view.bytes());
|
| + DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader));
|
| +
|
| if (started_transport_.Try()) {
|
| // We're not in the middle of being sent.
|
|
|
| - // Can get synchronously called back in Init if there was initial data.
|
| + // Can get synchronously called back from RawChannel::Init in InitOnIO if
|
| + // there was initial data. InitOnIO locks, so don't lock twice.
|
| scoped_ptr<base::AutoLock> locker;
|
| if (!calling_init_) {
|
| locker.reset(new base::AutoLock(lock()));
|
| }
|
|
|
| - if (in_two_phase_read_) {
|
| - data_received_during_two_phase_read_.insert(
|
| - data_received_during_two_phase_read_.end(), bytes_start, bytes_end);
|
| - } else {
|
| - bool was_empty = data_.empty();
|
| - data_.insert(data_.end(), bytes_start, bytes_end);
|
| - if (was_empty)
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| + if (ProcessCommand(*command, std::move(platform_handles))) {
|
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| }
|
| started_transport_.Release();
|
| } else {
|
| - // See comment in MessagePipeDispatcher about why we can't and don't need
|
| - // to lock here.
|
| - data_.insert(data_.end(), bytes_start, bytes_end);
|
| + // DataPipe::Serialize calls ReleaseHandle on the channel, which
|
| + // acquires RawChannel's read_lock_. The function OnReadMessage is only
|
| + // called while read_lock_ is acquired, and not after ReleaseHandle has been
|
| + // called. This means this function will only be called before Serialize
|
| + // calls ReleaseHandle, meaning the serialisation will not have started yet.
|
| + // We only notify awakables if we're not in the process of being
|
| + // transported.
|
| + ProcessCommand(*command, std::move(platform_handles));
|
| }
|
| }
|
|
|
| @@ -444,7 +376,10 @@ void DataPipeConsumerDispatcher::OnError(Error error) {
|
| DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)";
|
| break;
|
| case ERROR_READ_BROKEN:
|
| - LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)";
|
| + // It's okay for the other side to close the connection without reading
|
| + // our updates about how much we've read.
|
| + DLOG(ERROR)
|
| + << "DataPipeConsumerDispatcher read error (connection broken)";
|
| break;
|
| case ERROR_READ_BAD_MESSAGE:
|
| // Receiving a bad message means either a bug, data corruption, or
|
| @@ -456,21 +391,20 @@ void DataPipeConsumerDispatcher::OnError(Error error) {
|
| LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)";
|
| break;
|
| case ERROR_WRITE:
|
| - LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages";
|
| + LOG(ERROR) << "DataPipeConsumerDispatcher write error";
|
| break;
|
| }
|
|
|
| - error_ = true;
|
| + peer_closed_ = true;
|
| if (started_transport_.Try()) {
|
| base::AutoLock locker(lock());
|
| // We can get two OnError callbacks before the post task below completes.
|
| // Although RawChannel still has a pointer to this object until Shutdown is
|
| // called, that is safe since this class always does a PostTask to the IO
|
| // thread to self destruct.
|
| - if (channel_) {
|
| + if (data_pipe_->channel()) {
|
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock());
|
| - channel_->Shutdown();
|
| - channel_ = nullptr;
|
| + data_pipe_->Shutdown();
|
| }
|
| started_transport_.Release();
|
| } else {
|
| @@ -478,26 +412,5 @@ void DataPipeConsumerDispatcher::OnError(Error error) {
|
| }
|
| }
|
|
|
| -void DataPipeConsumerDispatcher::SerializeInternal() {
|
| - DCHECK(!in_two_phase_read_);
|
| - // We need to stop watching handle immediately, even though not on IO thread,
|
| - // so that other messages aren't read after this.
|
| - if (channel_) {
|
| - std::vector<char> serialized_write_buffer;
|
| - std::vector<int> fds;
|
| - bool write_error = false;
|
| - serialized_platform_handle_ = channel_->ReleaseHandle(
|
| - &serialized_read_buffer_, &serialized_write_buffer, &fds, &fds,
|
| - &write_error);
|
| - CHECK(serialized_write_buffer.empty());
|
| - CHECK(fds.empty());
|
| - CHECK(!write_error) << "DataPipeConsumerDispatcher doesn't write.";
|
| -
|
| - channel_ = nullptr;
|
| - }
|
| -
|
| - serialized_ = true;
|
| -}
|
| -
|
| } // namespace edk
|
| } // namespace mojo
|
|
|