| Index: mojo/edk/system/local_data_pipe_impl.cc
|
| diff --git a/mojo/edk/system/local_data_pipe_impl.cc b/mojo/edk/system/local_data_pipe_impl.cc
|
| index 933e0d76438de161344f089b28cb7ee1b8368ece..938ce68c651637daa55966d3203c57ed753dea54 100644
|
| --- a/mojo/edk/system/local_data_pipe_impl.cc
|
| +++ b/mojo/edk/system/local_data_pipe_impl.cc
|
| @@ -179,7 +179,6 @@ bool LocalDataPipeImpl::ProducerEndSerialize(
|
| if (!consumer_open()) {
|
| // Case 1: The consumer is closed.
|
| s->consumer_num_bytes = static_cast<size_t>(-1);
|
| - owner()->ProducerCloseNoLock();
|
| *actual_size = sizeof(SerializedDataPipeProducerDispatcher);
|
| return true;
|
| }
|
| @@ -187,6 +186,7 @@ bool LocalDataPipeImpl::ProducerEndSerialize(
|
| // Case 2: The consumer isn't closed. We'll replace ourselves with a
|
| // |RemoteProducerDataPipeImpl|.
|
|
|
| + s->consumer_num_bytes = current_num_bytes_;
|
| // Note: We don't use |port|.
|
| scoped_refptr<ChannelEndpoint> channel_endpoint =
|
| channel->SerializeEndpointWithLocalPeer(destination_for_endpoint, nullptr,
|
| @@ -356,13 +356,15 @@ bool LocalDataPipeImpl::ConsumerEndSerialize(
|
|
|
| size_t old_num_bytes = current_num_bytes_;
|
| MessageInTransitQueue message_queue;
|
| - ConvertDataToMessages(&message_queue);
|
| + ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_,
|
| + &message_queue);
|
| + start_index_ = 0;
|
| + current_num_bytes_ = 0;
|
|
|
| if (!producer_open()) {
|
| // Case 1: The producer is closed.
|
| channel->SerializeEndpointWithClosedPeer(destination_for_endpoint,
|
| &message_queue);
|
| - owner()->ConsumerCloseNoLock();
|
| *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) +
|
| channel->GetSerializedEndpointSize();
|
| return true;
|
| @@ -439,28 +441,5 @@ void LocalDataPipeImpl::MarkDataAsConsumed(size_t num_bytes) {
|
| current_num_bytes_ -= num_bytes;
|
| }
|
|
|
| -void LocalDataPipeImpl::ConvertDataToMessages(
|
| - MessageInTransitQueue* message_queue) {
|
| - // The maximum amount of data to send per message (make it a multiple of the
|
| - // element size.
|
| - size_t max_message_num_bytes = GetConfiguration().max_message_num_bytes;
|
| - max_message_num_bytes -= max_message_num_bytes % element_num_bytes();
|
| - DCHECK_GT(max_message_num_bytes, 0u);
|
| -
|
| - while (current_num_bytes_ > 0) {
|
| - size_t message_num_bytes =
|
| - std::min(max_message_num_bytes, GetMaxNumBytesToRead());
|
| -
|
| - // Note: |message_num_bytes| fits in a |uint32_t| since the capacity does.
|
| - scoped_ptr<MessageInTransit> message(new MessageInTransit(
|
| - MessageInTransit::kTypeEndpoint, MessageInTransit::kSubtypeEndpointData,
|
| - static_cast<uint32_t>(message_num_bytes),
|
| - buffer_.get() + start_index_));
|
| - message_queue->AddMessage(message.Pass());
|
| -
|
| - MarkDataAsConsumed(message_num_bytes);
|
| - }
|
| -}
|
| -
|
| } // namespace system
|
| } // namespace mojo
|
|
|