Index: third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.cc |
diff --git a/third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.cc b/third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.cc |
index fa59df06e6ecc830582a7698ff7e9b4ff009c904..f61554ac0c19e3b747aafdcd506b8a9cb842f4a4 100644 |
--- a/third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.cc |
+++ b/third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.cc |
@@ -26,11 +26,12 @@ bool ValidateIncomingMessage(size_t element_num_bytes, |
size_t capacity_num_bytes, |
size_t consumer_num_bytes, |
const MessageInTransit* message) { |
- // We should only receive endpoint messages. |
- DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); |
+ // We should only receive endpoint client messages. |
+ DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpointClient); |
// But we should check the subtype; only take data pipe acks. |
- if (message->subtype() != MessageInTransit::kSubtypeEndpointDataPipeAck) { |
+ if (message->subtype() != |
+ MessageInTransit::kSubtypeEndpointClientDataPipeAck) { |
LOG(WARNING) << "Received message of unexpected subtype: " |
<< message->subtype(); |
return false; |
@@ -148,7 +149,8 @@ MojoResult RemoteConsumerDataPipeImpl::ProducerWriteData( |
size_t message_num_bytes = |
std::min(max_message_num_bytes, num_bytes_to_write - offset); |
scoped_ptr<MessageInTransit> message(new MessageInTransit( |
- MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, |
+ MessageInTransit::kTypeEndpointClient, |
+ MessageInTransit::kSubtypeEndpointClientData, |
static_cast<uint32_t>(message_num_bytes), elements.At(offset))); |
if (!channel_endpoint_->EnqueueMessage(message.Pass())) { |
Disconnect(); |
@@ -203,13 +205,20 @@ MojoResult RemoteConsumerDataPipeImpl::ProducerEndWriteData( |
DCHECK_EQ(num_bytes_written % element_num_bytes(), 0u); |
DCHECK_LE(num_bytes_written, capacity_num_bytes() - consumer_num_bytes_); |
+ if (!consumer_open()) { |
+ DCHECK(buffer_); |
+ set_producer_two_phase_max_num_bytes_written(0); |
+ DestroyBuffer(); |
+ return MOJO_RESULT_OK; |
+ } |
+ |
// TODO(vtl): The following code is copied almost verbatim from |
// |ProducerWriteData()| (it's touchy to factor it out since it uses a |
// |UserPointer| while we have a plain pointer. |
// The maximum amount of data to send per message (make it a multiple of the |
// element size. |
- // TODO(vtl): Copied from |LocalDataPipeImpl::ConvertDataToMessages()|. |
+ // TODO(vtl): Mostly copied from |LocalDataPipeImpl::ConvertDataToMessages()|. |
size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; |
max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); |
DCHECK_GT(max_message_num_bytes, 0u); |
@@ -219,11 +228,13 @@ MojoResult RemoteConsumerDataPipeImpl::ProducerEndWriteData( |
size_t message_num_bytes = |
std::min(max_message_num_bytes, num_bytes_written - offset); |
scoped_ptr<MessageInTransit> message(new MessageInTransit( |
- MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, |
+ MessageInTransit::kTypeEndpointClient, |
+ MessageInTransit::kSubtypeEndpointClientData, |
static_cast<uint32_t>(message_num_bytes), buffer_.get() + offset)); |
if (!channel_endpoint_->EnqueueMessage(message.Pass())) { |
+ set_producer_two_phase_max_num_bytes_written(0); |
Disconnect(); |
- break; |
+ return MOJO_RESULT_OK; |
} |
offset += message_num_bytes; |
@@ -231,7 +242,7 @@ MojoResult RemoteConsumerDataPipeImpl::ProducerEndWriteData( |
} |
DCHECK_LE(consumer_num_bytes_, capacity_num_bytes()); |
- // TODO(vtl): (End of copied code.) |
+ // TODO(vtl): (End of mostly copied code.) |
set_producer_two_phase_max_num_bytes_written(0); |
return MOJO_RESULT_OK; |
@@ -411,7 +422,8 @@ void RemoteConsumerDataPipeImpl::Disconnect() { |
owner()->SetConsumerClosedNoLock(); |
channel_endpoint_->DetachFromClient(); |
channel_endpoint_ = nullptr; |
- DestroyBuffer(); |
+ if (!producer_in_two_phase_write()) |
+ DestroyBuffer(); |
} |
} // namespace system |