Index: mojo/edk/system/local_data_pipe_impl.cc |
diff --git a/mojo/edk/system/local_data_pipe_impl.cc b/mojo/edk/system/local_data_pipe_impl.cc |
index 933e0d76438de161344f089b28cb7ee1b8368ece..938ce68c651637daa55966d3203c57ed753dea54 100644 |
--- a/mojo/edk/system/local_data_pipe_impl.cc |
+++ b/mojo/edk/system/local_data_pipe_impl.cc |
@@ -179,7 +179,6 @@ bool LocalDataPipeImpl::ProducerEndSerialize( |
if (!consumer_open()) { |
// Case 1: The consumer is closed. |
s->consumer_num_bytes = static_cast<size_t>(-1); |
- owner()->ProducerCloseNoLock(); |
*actual_size = sizeof(SerializedDataPipeProducerDispatcher); |
return true; |
} |
@@ -187,6 +186,7 @@ bool LocalDataPipeImpl::ProducerEndSerialize( |
// Case 2: The consumer isn't closed. We'll replace ourselves with a |
// |RemoteProducerDataPipeImpl|. |
+ s->consumer_num_bytes = current_num_bytes_; |
// Note: We don't use |port|. |
scoped_refptr<ChannelEndpoint> channel_endpoint = |
channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, nullptr, |
@@ -356,13 +356,15 @@ bool LocalDataPipeImpl::ConsumerEndSerialize( |
size_t old_num_bytes = current_num_bytes_; |
MessageInTransitQueue message_queue; |
- ConvertDataToMessages(&message_queue); |
+ ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_, |
+ &message_queue); |
+ start_index_ = 0; |
+ current_num_bytes_ = 0; |
if (!producer_open()) { |
// Case 1: The producer is closed. |
channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, |
&message_queue); |
- owner()->ConsumerCloseNoLock(); |
*actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
channel->GetSerializedEndpointSize(); |
return true; |
@@ -439,28 +441,5 @@ void LocalDataPipeImpl::MarkDataAsConsumed(size_t num_bytes) { |
current_num_bytes_ -= num_bytes; |
} |
-void LocalDataPipeImpl::ConvertDataToMessages( |
- MessageInTransitQueue* message_queue) { |
- // The maximum amount of data to send per message (make it a multiple of the |
- // element size. |
- size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes; |
- max_message_num_bytes -= max_message_num_bytes % element_num_bytes(); |
- DCHECK_GT(max_message_num_bytes, 0u); |
- |
- while (current_num_bytes_ > 0) { |
- size_t message_num_bytes = |
- std::min(max_message_num_bytes, GetMaxNumBytesToRead()); |
- |
- // Note: |message_num_bytes| fits in a |uint32_t| since the capacity does. |
- scoped_ptr<MessageInTransit> message(new MessageInTransit( |
- MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData, |
- static_cast<uint32_t>(message_num_bytes), |
- buffer_.get() + start_index_)); |
- message_queue->AddMessage(message.Pass()); |
- |
- MarkDataAsConsumed(message_num_bytes); |
- } |
-} |
- |
} // namespace system |
} // namespace mojo |