Chromium Code Reviews| Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| index 68c9e0e038875f75385c383b3bb018b985d020f2..5180c6221f38244a0aa63cc2aec12c09ed1e7012 100644 |
| --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| @@ -16,49 +16,41 @@ |
| #include "mojo/edk/embedder/embedder_internal.h" |
| #include "mojo/edk/embedder/platform_shared_buffer.h" |
| #include "mojo/edk/embedder/platform_support.h" |
| -#include "mojo/edk/system/data_pipe.h" |
| namespace mojo { |
| namespace edk { |
| -struct SharedMemoryHeader { |
| - uint32_t data_size; |
| - uint32_t read_buffer_size; |
| -}; |
| - |
| -void DataPipeConsumerDispatcher::Init( |
| - ScopedPlatformHandle message_pipe, |
| - char* serialized_read_buffer, size_t serialized_read_buffer_size) { |
| - if (message_pipe.is_valid()) { |
| - channel_ = RawChannel::Create(std::move(message_pipe)); |
| - channel_->SetSerializedData( |
| - serialized_read_buffer, serialized_read_buffer_size, nullptr, 0u, |
| - nullptr, nullptr); |
| - internal::g_io_thread_task_runner->PostTask( |
| - FROM_HERE, base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this)); |
| - } else { |
| - // The data pipe consumer could have read all the data and the producer |
| - // closed its end subsequently (before the consumer was sent). In that case |
| - // when we deserialize the consumer we must make sure to set error_ or |
| - // otherwise the peer-closed signal will never be satisfied. |
| - error_ = true; |
| +void DataPipeConsumerDispatcher::Init(ScopedPlatformHandle message_pipe, |
| + char* serialized_write_buffer, |
| + size_t serialized_write_buffer_size, |
| + char* serialized_read_buffer, |
| + size_t serialized_read_buffer_size, |
| + ScopedPlatformHandle shared_buffer_handle, |
| + size_t ring_buffer_start, |
| + size_t ring_buffer_size) { |
| + if (!message_pipe.is_valid()) { |
| + peer_closed_ = true; |
| } |
| + |
| + data_pipe_->Init(std::move(message_pipe), serialized_write_buffer, |
| + serialized_write_buffer_size, serialized_read_buffer, |
| + serialized_read_buffer_size, std::move(shared_buffer_handle), |
| + ring_buffer_start, ring_buffer_size, false, |
|
Anand Mistry (off Chromium)
2016/01/08 03:28:02
ring_buffer_start, ring_buffer_size, false /* is_p
Eliot Courtney
2016/01/08 04:44:39
Done.
|
| + base::Bind(&DataPipeConsumerDispatcher::InitOnIO, this)); |
| } |
| void DataPipeConsumerDispatcher::InitOnIO() { |
| base::AutoLock locker(lock()); |
| calling_init_ = true; |
| - if (channel_) |
| - channel_->Init(this); |
| + RawChannel* channel = data_pipe_->GetChannel(); |
| + if (channel) |
| + channel->Init(this); |
| calling_init_ = false; |
| } |
| void DataPipeConsumerDispatcher::CloseOnIO() { |
| base::AutoLock locker(lock()); |
| - if (channel_) { |
| - channel_->Shutdown(); |
| - channel_ = nullptr; |
| - } |
| + data_pipe_->Shutdown(); |
| } |
| Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { |
| @@ -71,60 +63,55 @@ DataPipeConsumerDispatcher::Deserialize( |
| size_t size, |
| PlatformHandleVector* platform_handles) { |
| MojoCreateDataPipeOptions options; |
| - ScopedPlatformHandle shared_memory_handle; |
| - size_t shared_memory_size = 0; |
| + ScopedPlatformHandle channel_handle, channel_shared_handle, |
| + shared_buffer_handle; |
| + size_t serialized_read_buffer_size, serialized_write_buffer_size; |
| + size_t ring_buffer_start, ring_buffer_size; |
| - ScopedPlatformHandle platform_handle = |
| - DataPipe::Deserialize(source, size, platform_handles, &options, |
| - &shared_memory_handle, &shared_memory_size); |
| + ScopedPlatformHandle platform_handle = DataPipe::Deserialize( |
| + source, size, platform_handles, &options, &channel_shared_handle, |
| + &serialized_read_buffer_size, &serialized_write_buffer_size, |
| + &shared_buffer_handle, &ring_buffer_start, &ring_buffer_size); |
| scoped_refptr<DataPipeConsumerDispatcher> rv(Create(options)); |
| + size_t sz = serialized_write_buffer_size + serialized_read_buffer_size; |
| char* serialized_read_buffer = nullptr; |
| - size_t serialized_read_buffer_size = 0; |
| - scoped_refptr<PlatformSharedBuffer> shared_buffer; |
| + char* serialized_write_buffer = nullptr; |
| + scoped_refptr<PlatformSharedBuffer> channel_shared_buffer; |
| scoped_ptr<PlatformSharedBufferMapping> mapping; |
| - if (shared_memory_size) { |
| - shared_buffer = internal::g_platform_support->CreateSharedBufferFromHandle( |
| - shared_memory_size, std::move(shared_memory_handle)); |
| - mapping = shared_buffer->Map(0, shared_memory_size); |
| - char* buffer = static_cast<char*>(mapping->GetBase()); |
| - SharedMemoryHeader* header = reinterpret_cast<SharedMemoryHeader*>(buffer); |
| - buffer += sizeof(SharedMemoryHeader); |
| - if (header->data_size) { |
| - rv->data_.assign(buffer, buffer + header->data_size); |
| - buffer += header->data_size; |
| - } |
| - |
| - if (header->read_buffer_size) { |
| - serialized_read_buffer = buffer; |
| - serialized_read_buffer_size = header->read_buffer_size; |
| - buffer += header->read_buffer_size; |
| - } |
| + if (channel_shared_handle.is_valid()) { |
| + channel_shared_buffer = |
| + internal::g_platform_support->CreateSharedBufferFromHandle( |
| + sz, std::move(channel_shared_handle)); |
| + mapping = channel_shared_buffer->Map(0, sz); |
| + |
| + serialized_read_buffer = static_cast<char*>(mapping->GetBase()); |
| + serialized_write_buffer = |
| + static_cast<char*>(mapping->GetBase()) + serialized_read_buffer_size; |
| } |
| rv->Init(std::move(platform_handle), serialized_read_buffer, |
| - serialized_read_buffer_size); |
| + serialized_read_buffer_size, serialized_write_buffer, |
| + serialized_write_buffer_size, std::move(shared_buffer_handle), |
| + ring_buffer_start, ring_buffer_size); |
| return rv; |
| } |
| DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( |
| const MojoCreateDataPipeOptions& options) |
| - : options_(options), |
| - channel_(nullptr), |
| + : data_pipe_(new DataPipe(options)), |
| calling_init_(false), |
| + peer_closed_(false), |
| in_two_phase_read_(false), |
| - two_phase_max_bytes_read_(0), |
| - error_(false), |
| - serialized_(false) { |
| -} |
| + two_phase_max_bytes_read_(0) {} |
| DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { |
| // See comment in ~MessagePipeDispatcher. |
| - if (channel_ && internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
| - channel_->Shutdown(); |
| + if (internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()) |
| + data_pipe_->Shutdown(); |
| else |
| - DCHECK(!channel_); |
| + DCHECK(!data_pipe_->GetChannel()); |
| } |
| void DataPipeConsumerDispatcher::CancelAllAwakablesNoLock() { |
| @@ -142,13 +129,11 @@ scoped_refptr<Dispatcher> |
| DataPipeConsumerDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() { |
| lock().AssertAcquired(); |
| - SerializeInternal(); |
| + scoped_refptr<DataPipeConsumerDispatcher> rv = |
| + Create(data_pipe_->GetOptions()); |
| + data_pipe_->CreateEquivalentAndClose(rv->data_pipe_.get()); |
| - scoped_refptr<DataPipeConsumerDispatcher> rv = Create(options_); |
| - data_.swap(rv->data_); |
| - serialized_read_buffer_.swap(rv->serialized_read_buffer_); |
| - rv->serialized_platform_handle_ = std::move(serialized_platform_handle_); |
| - rv->serialized_ = true; |
| + DCHECK(!in_two_phase_read_); |
| return scoped_refptr<Dispatcher>(rv.get()); |
| } |
| @@ -158,8 +143,6 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock( |
| uint32_t* num_bytes, |
| MojoReadDataFlags flags) { |
| lock().AssertAcquired(); |
| - if (channel_) |
| - channel_->EnsureLazyInitialized(); |
| if (in_two_phase_read_) |
| return MOJO_RESULT_BUSY; |
| @@ -168,45 +151,57 @@ MojoResult DataPipeConsumerDispatcher::ReadDataImplNoLock( |
| (flags & MOJO_READ_DATA_FLAG_DISCARD)) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. |
| - DVLOG_IF(2, elements) |
| - << "Query mode: ignoring non-null |elements|"; |
| - *num_bytes = static_cast<uint32_t>(data_.size()); |
| + DVLOG_IF(2, elements) << "Query mode: ignoring non-null |elements|"; |
| + *num_bytes = uint32_t(data_pipe_->GetReadableBytes()); |
| return MOJO_RESULT_OK; |
| } |
| bool discard = false; |
| if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { |
| - // These flags are mutally exclusive. |
| + // These flags are mutually exclusive. |
| if (flags & MOJO_READ_DATA_FLAG_PEEK) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| - DVLOG_IF(2, elements) |
| - << "Discard mode: ignoring non-null |elements|"; |
| + DVLOG_IF(2, elements) << "Discard mode: ignoring non-null |elements|"; |
| discard = true; |
| } |
| uint32_t max_num_bytes_to_read = *num_bytes; |
| - if (max_num_bytes_to_read % options_.element_num_bytes != 0) |
| + if (max_num_bytes_to_read % data_pipe_->GetOptions().element_num_bytes != 0) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; |
| - uint32_t min_num_bytes_to_read = |
| - all_or_none ? max_num_bytes_to_read : 0; |
| + uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0; |
| - if (min_num_bytes_to_read > data_.size()) |
| - return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_OUT_OF_RANGE; |
| + uint32_t readable_bytes = uint32_t(data_pipe_->GetReadableBytes()); |
| + if (min_num_bytes_to_read > readable_bytes) |
| + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| + : MOJO_RESULT_OUT_OF_RANGE; |
| - uint32_t bytes_to_read = std::min(max_num_bytes_to_read, |
| - static_cast<uint32_t>(data_.size())); |
| + uint32_t bytes_to_read = std::min(max_num_bytes_to_read, readable_bytes); |
| if (bytes_to_read == 0) |
| - return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; |
| + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| + : MOJO_RESULT_SHOULD_WAIT; |
| + |
| + // |ReadDataFromSharedBuffer| failing means we haven't got the shared buffer |
| + // yet, so we should wait. |
| + if (!discard && |
| + !data_pipe_->ReadDataFromSharedBuffer(elements, bytes_to_read)) { |
| + return MOJO_RESULT_SHOULD_WAIT; |
| + } |
| - if (!discard) |
| - memcpy(elements, &data_[0], bytes_to_read); |
| *num_bytes = bytes_to_read; |
| - bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); |
| - if (discard || !peek) |
| - data_.erase(data_.begin(), data_.begin() + bytes_to_read); |
| + HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
| + bool should_update = !(flags & MOJO_READ_DATA_FLAG_PEEK) || discard; |
| + if (should_update) |
| + data_pipe_->UpdateFromRead(bytes_to_read); |
| + HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
| + if (!new_state.equals(old_state)) |
| + awakable_list_.AwakeForStateChange(new_state); |
| + |
| + // Deal with state changes due to peer being closed in OnError. |
| + if (should_update && !data_pipe_->NotifyRead(bytes_to_read)) |
| + peer_closed_ = true; |
| return MOJO_RESULT_OK; |
| } |
| @@ -216,8 +211,6 @@ MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock( |
| uint32_t* buffer_num_bytes, |
| MojoReadDataFlags flags) { |
| lock().AssertAcquired(); |
| - if (channel_) |
| - channel_->EnsureLazyInitialized(); |
| if (in_two_phase_read_) |
| return MOJO_RESULT_BUSY; |
| @@ -227,12 +220,16 @@ MojoResult DataPipeConsumerDispatcher::BeginReadDataImplNoLock( |
| (flags & MOJO_READ_DATA_FLAG_PEEK)) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| - uint32_t max_num_bytes_to_read = static_cast<uint32_t>(data_.size()); |
| + size_t readable_bytes; |
| + const void* temp_buf = data_pipe_->GetReadBuffer(&readable_bytes); |
| + |
| + uint32_t max_num_bytes_to_read = static_cast<uint32_t>(readable_bytes); |
| if (max_num_bytes_to_read == 0) |
| - return error_ ? MOJO_RESULT_FAILED_PRECONDITION : MOJO_RESULT_SHOULD_WAIT; |
| + return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| + : MOJO_RESULT_SHOULD_WAIT; |
| in_two_phase_read_ = true; |
| - *buffer = &data_[0]; |
| + *buffer = temp_buf; |
| *buffer_num_bytes = max_num_bytes_to_read; |
| two_phase_max_bytes_read_ = max_num_bytes_to_read; |
| @@ -246,32 +243,24 @@ MojoResult DataPipeConsumerDispatcher::EndReadDataImplNoLock( |
| return MOJO_RESULT_FAILED_PRECONDITION; |
| HandleSignalsState old_state = GetHandleSignalsStateImplNoLock(); |
| - MojoResult rv; |
| + in_two_phase_read_ = false; |
| + |
| if (num_bytes_read > two_phase_max_bytes_read_ || |
| - num_bytes_read % options_.element_num_bytes != 0) { |
| - rv = MOJO_RESULT_INVALID_ARGUMENT; |
| - } else { |
| - rv = MOJO_RESULT_OK; |
| - data_.erase(data_.begin(), data_.begin() + num_bytes_read); |
| + num_bytes_read % data_pipe_->GetOptions().element_num_bytes != 0) { |
| + return MOJO_RESULT_INVALID_ARGUMENT; |
| } |
| - in_two_phase_read_ = false; |
| - two_phase_max_bytes_read_ = 0; |
| - if (!data_received_during_two_phase_read_.empty()) { |
| - if (data_.empty()) { |
| - data_received_during_two_phase_read_.swap(data_); |
| - } else { |
| - data_.insert(data_.end(), data_received_during_two_phase_read_.begin(), |
| - data_received_during_two_phase_read_.end()); |
| - data_received_during_two_phase_read_.clear(); |
| - } |
| - } |
| + data_pipe_->UpdateFromRead(num_bytes_read); |
| HandleSignalsState new_state = GetHandleSignalsStateImplNoLock(); |
| if (!new_state.equals(old_state)) |
| awakable_list_.AwakeForStateChange(new_state); |
| - return rv; |
| + // Deal with state changes due to peer being closed in OnError. |
| + if (!data_pipe_->NotifyRead(num_bytes_read)) |
| + peer_closed_ = true; |
| + |
| + return MOJO_RESULT_OK; |
| } |
| HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock() |
| @@ -279,16 +268,19 @@ HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsStateImplNoLock() |
| lock().AssertAcquired(); |
| HandleSignalsState rv; |
| - if (!data_.empty()) { |
| + if (data_pipe_->GetReadableBytes()) { |
| if (!in_two_phase_read_) |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| - } else if (!error_) { |
| - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| } |
| - if (error_) |
| + if (peer_closed_) { |
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| + } else { |
| + // We could become readable in the future. |
| + rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| + } |
| + |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| return rv; |
| } |
| @@ -299,8 +291,6 @@ MojoResult DataPipeConsumerDispatcher::AddAwakableImplNoLock( |
| uintptr_t context, |
| HandleSignalsState* signals_state) { |
| lock().AssertAcquired(); |
| - if (channel_) |
| - channel_->EnsureLazyInitialized(); |
| HandleSignalsState state = GetHandleSignalsStateImplNoLock(); |
| if (state.satisfies(signals)) { |
| if (signals_state) |
| @@ -329,57 +319,14 @@ void DataPipeConsumerDispatcher::RemoveAwakableImplNoLock( |
| void DataPipeConsumerDispatcher::StartSerializeImplNoLock( |
| size_t* max_size, |
| size_t* max_platform_handles) { |
| - if (!serialized_) { |
| - // Handles the case where we have messages read off RawChannel but not ready |
| - // by MojoReadMessage. |
| - SerializeInternal(); |
| - } |
| - |
| - DataPipe::StartSerialize(serialized_platform_handle_.is_valid(), |
| - !data_.empty() || !serialized_read_buffer_.empty(), |
| - max_size, max_platform_handles); |
| + data_pipe_->StartSerialize(max_size, max_platform_handles); |
| } |
| bool DataPipeConsumerDispatcher::EndSerializeAndCloseImplNoLock( |
| void* destination, |
| size_t* actual_size, |
| PlatformHandleVector* platform_handles) { |
| - ScopedPlatformHandle shared_memory_handle; |
| - size_t shared_memory_size = data_.size() + serialized_read_buffer_.size(); |
| - if (shared_memory_size) { |
| - shared_memory_size += sizeof(SharedMemoryHeader); |
| - SharedMemoryHeader header; |
| - header.data_size = static_cast<uint32_t>(data_.size()); |
| - header.read_buffer_size = |
| - static_cast<uint32_t>(serialized_read_buffer_.size()); |
| - |
| - scoped_refptr<PlatformSharedBuffer> shared_buffer( |
| - internal::g_platform_support->CreateSharedBuffer( |
| - shared_memory_size)); |
| - scoped_ptr<PlatformSharedBufferMapping> mapping( |
| - shared_buffer->Map(0, shared_memory_size)); |
| - |
| - char* start = static_cast<char*>(mapping->GetBase()); |
| - memcpy(start, &header, sizeof(SharedMemoryHeader)); |
| - start += sizeof(SharedMemoryHeader); |
| - |
| - if (!data_.empty()) { |
| - memcpy(start, &data_[0], data_.size()); |
| - start += data_.size(); |
| - } |
| - |
| - if (!serialized_read_buffer_.empty()) { |
| - memcpy(start, &serialized_read_buffer_[0], |
| - serialized_read_buffer_.size()); |
| - start += serialized_read_buffer_.size(); |
| - } |
| - |
| - shared_memory_handle.reset(shared_buffer->PassPlatformHandle().release()); |
| - } |
| - |
| - DataPipe::EndSerialize(options_, std::move(serialized_platform_handle_), |
| - std::move(shared_memory_handle), shared_memory_size, |
| - destination, actual_size, platform_handles); |
| + data_pipe_->EndSerialize(destination, actual_size, platform_handles); |
| CloseImplNoLock(); |
| return true; |
| } |
| @@ -397,7 +344,7 @@ void DataPipeConsumerDispatcher::TransportEnded() { |
| // for. |
| // TODO(jam): should we care about only alerting if it was empty before |
| // TransportStarted? |
| - if (!data_.empty()) |
| + if (data_pipe_->GetReadableBytes()) |
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| } |
| @@ -406,34 +353,43 @@ bool DataPipeConsumerDispatcher::IsBusyNoLock() const { |
| return in_two_phase_read_; |
| } |
| +bool DataPipeConsumerDispatcher::ProcessCommand( |
| + const DataPipeCommandHeader& command, |
| + ScopedPlatformHandleVectorPtr platform_handles) { |
| + // Handles write/read case and shared buffer becoming available case. |
| + return data_pipe_->ProcessCommand(command, std::move(platform_handles)); |
| +} |
| + |
| void DataPipeConsumerDispatcher::OnReadMessage( |
| const MessageInTransit::View& message_view, |
| ScopedPlatformHandleVectorPtr platform_handles) { |
| - const char* bytes_start = static_cast<const char*>(message_view.bytes()); |
| - const char* bytes_end = bytes_start + message_view.num_bytes(); |
| + const DataPipeCommandHeader* command = |
| + static_cast<const DataPipeCommandHeader*>(message_view.bytes()); |
| + DCHECK(message_view.num_bytes() == sizeof(DataPipeCommandHeader)); |
| + |
| if (started_transport_.Try()) { |
| // We're not in the middle of being sent. |
| - // Can get synchronously called back in Init if there was initial data. |
| + // Can get synchronously called back from RawChannel::Init in InitOnIO if |
| + // there was initial data. InitOnIO locks, so don't lock twice. |
| scoped_ptr<base::AutoLock> locker; |
| if (!calling_init_) { |
| locker.reset(new base::AutoLock(lock())); |
| } |
| - if (in_two_phase_read_) { |
| - data_received_during_two_phase_read_.insert( |
| - data_received_during_two_phase_read_.end(), bytes_start, bytes_end); |
| - } else { |
| - bool was_empty = data_.empty(); |
| - data_.insert(data_.end(), bytes_start, bytes_end); |
| - if (was_empty) |
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| + if (ProcessCommand(*command, std::move(platform_handles))) { |
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| } |
| started_transport_.Release(); |
| } else { |
| - // See comment in MessagePipeDispatcher about why we can't and don't need |
| - // to lock here. |
| - data_.insert(data_.end(), bytes_start, bytes_end); |
| + // DataPipe::Serialize calls ReleaseHandle on the channel, which |
| + // acquires RawChannel's read_lock_. The function OnReadMessage is only |
| + // called while read_lock_ is acquired, and not after ReleaseHandle has been |
| + // called. This means this function will only be called before Serialize |
| + // calls ReleaseHandle, meaning the serialisation will not have started yet. |
| + // We only notify awakables if we're not in the process of being |
| + // transported. |
| + ProcessCommand(*command, std::move(platform_handles)); |
| } |
| } |
| @@ -444,7 +400,10 @@ void DataPipeConsumerDispatcher::OnError(Error error) { |
| DVLOG(1) << "DataPipeConsumerDispatcher read error (shutdown)"; |
| break; |
| case ERROR_READ_BROKEN: |
| - LOG(ERROR) << "DataPipeConsumerDispatcher read error (connection broken)"; |
| + // It's okay for the other side to close the connection without reading |
| + // our updates about how much we've read. |
| + DLOG(ERROR) |
| + << "DataPipeConsumerDispatcher read error (connection broken)"; |
| break; |
| case ERROR_READ_BAD_MESSAGE: |
| // Receiving a bad message means either a bug, data corruption, or |
| @@ -456,21 +415,20 @@ void DataPipeConsumerDispatcher::OnError(Error error) { |
| LOG(ERROR) << "DataPipeConsumerDispatcher read error (unknown)"; |
| break; |
| case ERROR_WRITE: |
| - LOG(ERROR) << "DataPipeConsumerDispatcher shouldn't write messages"; |
| + LOG(ERROR) << "DataPipeConsumerDispatcher write error"; |
| break; |
| } |
| - error_ = true; |
| + peer_closed_ = true; |
| if (started_transport_.Try()) { |
| base::AutoLock locker(lock()); |
| // We can get two OnError callbacks before the post task below completes. |
| // Although RawChannel still has a pointer to this object until Shutdown is |
| // called, that is safe since this class always does a PostTask to the IO |
| // thread to self destruct. |
| - if (channel_) { |
| + if (data_pipe_->GetChannel()) { |
| awakable_list_.AwakeForStateChange(GetHandleSignalsStateImplNoLock()); |
| - channel_->Shutdown(); |
| - channel_ = nullptr; |
| + data_pipe_->Shutdown(); |
| } |
| started_transport_.Release(); |
| } else { |
| @@ -478,26 +436,5 @@ void DataPipeConsumerDispatcher::OnError(Error error) { |
| } |
| } |
| -void DataPipeConsumerDispatcher::SerializeInternal() { |
| - DCHECK(!in_two_phase_read_); |
| - // We need to stop watching handle immediately, even though not on IO thread, |
| - // so that other messages aren't read after this. |
| - if (channel_) { |
| - std::vector<char> serialized_write_buffer; |
| - std::vector<int> fds; |
| - bool write_error = false; |
| - serialized_platform_handle_ = channel_->ReleaseHandle( |
| - &serialized_read_buffer_, &serialized_write_buffer, &fds, &fds, |
| - &write_error); |
| - CHECK(serialized_write_buffer.empty()); |
| - CHECK(fds.empty()); |
| - CHECK(!write_error) << "DataPipeConsumerDispatcher doesn't write."; |
| - |
| - channel_ = nullptr; |
| - } |
| - |
| - serialized_ = true; |
| -} |
| - |
| } // namespace edk |
| } // namespace mojo |