| Index: mojo/edk/system/local_data_pipe_impl.cc | 
| diff --git a/mojo/edk/system/local_data_pipe_impl.cc b/mojo/edk/system/local_data_pipe_impl.cc | 
| index 933e0d76438de161344f089b28cb7ee1b8368ece..938ce68c651637daa55966d3203c57ed753dea54 100644 | 
| --- a/mojo/edk/system/local_data_pipe_impl.cc | 
| +++ b/mojo/edk/system/local_data_pipe_impl.cc | 
| @@ -179,7 +179,6 @@ bool LocalDataPipeImpl::ProducerEndSerialize( | 
| if (!consumer_open()) { | 
| // Case 1: The consumer is closed. | 
| s->consumer_num_bytes = static_cast<size_t>(-1); | 
| -    owner()->ProducerCloseNoLock(); | 
| *actual_size = sizeof(SerializedDataPipeProducerDispatcher); | 
| return true; | 
| } | 
| @@ -187,6 +186,7 @@ bool LocalDataPipeImpl::ProducerEndSerialize( | 
| // Case 2: The consumer isn't closed. We'll replace ourselves with a | 
| // |RemoteProducerDataPipeImpl|. | 
|  | 
| +  s->consumer_num_bytes = current_num_bytes_; | 
| // Note: We don't use |port|. | 
| scoped_refptr<ChannelEndpoint> channel_endpoint = | 
| channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, nullptr, | 
| @@ -356,13 +356,15 @@ bool LocalDataPipeImpl::ConsumerEndSerialize( | 
|  | 
| size_t old_num_bytes = current_num_bytes_; | 
| MessageInTransitQueue message_queue; | 
| -  ConvertDataToMessages(&message_queue); | 
| +  ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_, | 
| +                        &message_queue); | 
| +  start_index_ = 0; | 
| +  current_num_bytes_ = 0; | 
|  | 
| if (!producer_open()) { | 
| // Case 1: The producer is closed. | 
| channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, | 
| &message_queue); | 
| -    owner()->ConsumerCloseNoLock(); | 
| *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + | 
| channel->GetSerializedEndpointSize(); | 
| return true; | 
| @@ -439,28 +441,5 @@ void LocalDataPipeImpl::MarkDataAsConsumed(size_t num_bytes) { | 
| current_num_bytes_ -= num_bytes; | 
| } | 
|  | 
| -void LocalDataPipeImpl::ConvertDataToMessages( | 
| -    MessageInTransitQueue* message_queue) { | 
| -  // The maximum amount of data to send per message (make it a multiple of the | 
| -  // element size. | 
| -  size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; | 
| -  max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); | 
| -  DCHECK_GT(max_message_num_bytes, 0u); | 
| - | 
| -  while (current_num_bytes_ > 0) { | 
| -    size_t message_num_bytes = | 
| -        std::min(max_message_num_bytes, GetMaxNumBytesToRead()); | 
| - | 
| -    // Note: |message_num_bytes| fits in a |uint32_t| since the capacity does. | 
| -    scoped_ptr<MessageInTransit> message(new MessageInTransit( | 
| -        MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, | 
| -        static_cast<uint32_t>(message_num_bytes), | 
| -        buffer_.get() + start_index_)); | 
| -    message_queue->AddMessage(message.Pass()); | 
| - | 
| -    MarkDataAsConsumed(message_num_bytes); | 
| -  } | 
| -} | 
| - | 
| }  // namespace system | 
| }  // namespace mojo | 
|  |