| Index: third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.cc
|
| diff --git a/third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.cc b/third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.cc
|
| index fa59df06e6ecc830582a7698ff7e9b4ff009c904..f61554ac0c19e3b747aafdcd506b8a9cb842f4a4 100644
|
| --- a/third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.cc
|
| +++ b/third_party/mojo/src/mojo/edk/system/remote_consumer_data_pipe_impl.cc
|
| @@ -26,11 +26,12 @@ bool ValidateIncomingMessage(size_t element_num_bytes,
|
| size_t capacity_num_bytes,
|
| size_t consumer_num_bytes,
|
| const MessageInTransit* message) {
|
| - // We should only receive endpoint messages.
|
| - DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint);
|
| + // We should only receive endpoint client messages.
|
| + DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpointClient);
|
|
|
| // But we should check the subtype; only take data pipe acks.
|
| - if (message->subtype() != MessageInTransit::kSubtypeEndpointDataPipeAck) {
|
| + if (message->subtype() !=
|
| + MessageInTransit::kSubtypeEndpointClientDataPipeAck) {
|
| LOG(WARNING) << "Received message of unexpected subtype: "
|
| << message->subtype();
|
| return false;
|
| @@ -148,7 +149,8 @@ MojoResult RemoteConsumerDataPipeImpl::ProducerWriteData(
|
| size_t message_num_bytes =
|
| std::min(max_message_num_bytes, num_bytes_to_write - offset);
|
| scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| - MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData,
|
| + MessageInTransit::kTypeEndpointClient,
|
| + MessageInTransit::kSubtypeEndpointClientData,
|
| static_cast<uint32_t>(message_num_bytes), elements.At(offset)));
|
| if (!channel_endpoint_->EnqueueMessage(message.Pass())) {
|
| Disconnect();
|
| @@ -203,13 +205,20 @@ MojoResult RemoteConsumerDataPipeImpl::ProducerEndWriteData(
|
| DCHECK_EQ(num_bytes_written % element_num_bytes(), 0u);
|
| DCHECK_LE(num_bytes_written, capacity_num_bytes() - consumer_num_bytes_);
|
|
|
| + if (!consumer_open()) {
|
| + DCHECK(buffer_);
|
| + set_producer_two_phase_max_num_bytes_written(0);
|
| + DestroyBuffer();
|
| + return MOJO_RESULT_OK;
|
| + }
|
| +
|
| // TODO(vtl): The following code is copied almost verbatim from
|
| // |ProducerWriteData()| (it's touchy to factor it out since it uses a
|
| // |UserPointer| while we have a plain pointer.
|
|
|
| // The maximum amount of data to send per message (make it a multiple of the
|
| // element size.
|
| - // TODO(vtl): Copied from |LocalDataPipeImpl::ConvertDataToMessages()|.
|
| + // TODO(vtl): Mostly copied from |LocalDataPipeImpl::ConvertDataToMessages()|.
|
| size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
|
| max_message_num_bytes -= max_message_num_bytes % element_num_bytes();
|
| DCHECK_GT(max_message_num_bytes, 0u);
|
| @@ -219,11 +228,13 @@ MojoResult RemoteConsumerDataPipeImpl::ProducerEndWriteData(
|
| size_t message_num_bytes =
|
| std::min(max_message_num_bytes, num_bytes_written - offset);
|
| scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| - MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData,
|
| + MessageInTransit::kTypeEndpointClient,
|
| + MessageInTransit::kSubtypeEndpointClientData,
|
| static_cast<uint32_t>(message_num_bytes), buffer_.get() + offset));
|
| if (!channel_endpoint_->EnqueueMessage(message.Pass())) {
|
| + set_producer_two_phase_max_num_bytes_written(0);
|
| Disconnect();
|
| - break;
|
| + return MOJO_RESULT_OK;
|
| }
|
|
|
| offset += message_num_bytes;
|
| @@ -231,7 +242,7 @@ MojoResult RemoteConsumerDataPipeImpl::ProducerEndWriteData(
|
| }
|
|
|
| DCHECK_LE(consumer_num_bytes_, capacity_num_bytes());
|
| - // TODO(vtl): (End of copied code.)
|
| + // TODO(vtl): (End of mostly copied code.)
|
|
|
| set_producer_two_phase_max_num_bytes_written(0);
|
| return MOJO_RESULT_OK;
|
| @@ -411,7 +422,8 @@ void RemoteConsumerDataPipeImpl::Disconnect() {
|
| owner()->SetConsumerClosedNoLock();
|
| channel_endpoint_->DetachFromClient();
|
| channel_endpoint_ = nullptr;
|
| - DestroyBuffer();
|
| + if (!producer_in_two_phase_write())
|
| + DestroyBuffer();
|
| }
|
|
|
| } // namespace system
|
|
|