| Index: mojo/edk/system/remote_producer_data_pipe_impl.cc | 
| diff --git a/mojo/edk/system/remote_producer_data_pipe_impl.cc b/mojo/edk/system/remote_producer_data_pipe_impl.cc | 
| index 4946e114eeb41eb549fd527aed1f71273eabb342..26ef410d7aad6a5f9c65e1fb736fade01d492c86 100644 | 
| --- a/mojo/edk/system/remote_producer_data_pipe_impl.cc | 
| +++ b/mojo/edk/system/remote_producer_data_pipe_impl.cc | 
| @@ -10,6 +10,7 @@ | 
|  | 
| #include "base/logging.h" | 
| #include "base/memory/scoped_ptr.h" | 
| +#include "mojo/edk/system/channel.h" | 
| #include "mojo/edk/system/channel_endpoint.h" | 
| #include "mojo/edk/system/configuration.h" | 
| #include "mojo/edk/system/data_pipe.h" | 
| @@ -302,9 +303,8 @@ void RemoteProducerDataPipeImpl::ConsumerStartSerialize( | 
| Channel* channel, | 
| size_t* max_size, | 
| size_t* max_platform_handles) { | 
| -  // TODO(vtl): Support serializing consumer data pipe handles. | 
| -  NOTIMPLEMENTED();  // FIXME | 
| -  *max_size = 0; | 
| +  *max_size = sizeof(SerializedDataPipeConsumerDispatcher) + | 
| +              channel->GetSerializedEndpointSize(); | 
| *max_platform_handles = 0; | 
| } | 
|  | 
| @@ -313,10 +313,38 @@ bool RemoteProducerDataPipeImpl::ConsumerEndSerialize( | 
| void* destination, | 
| size_t* actual_size, | 
| embedder::PlatformHandleVector* platform_handles) { | 
| -  // TODO(vtl): Support serializing consumer data pipe handles. | 
| -  NOTIMPLEMENTED();  // FIXME | 
| -  owner()->ConsumerCloseNoLock(); | 
| -  return false; | 
| +  SerializedDataPipeConsumerDispatcher* s = | 
| +      static_cast<SerializedDataPipeConsumerDispatcher*>(destination); | 
| +  s->validated_options = validated_options(); | 
| +  void* destination_for_endpoint = static_cast<char*>(destination) + | 
| +                                   sizeof(SerializedDataPipeConsumerDispatcher); | 
| + | 
| +  MessageInTransitQueue message_queue; | 
| +  ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_, | 
| +                        &message_queue); | 
| + | 
| +  if (!producer_open()) { | 
| +    // Case 1: The producer is closed. | 
| +    channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, | 
| +                                             &message_queue); | 
| +    *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + | 
| +                   channel->GetSerializedEndpointSize(); | 
| +    return true; | 
| +  } | 
| + | 
| +  // Case 2: The producer isn't closed. We pass |channel_endpoint| back to the | 
| +  // |Channel|. There's no reason for us to continue to exist afterwards. | 
| + | 
| +  // Note: We don't use |port|. | 
| +  scoped_refptr<ChannelEndpoint> channel_endpoint; | 
| +  channel_endpoint.swap(channel_endpoint_); | 
| +  channel->SerializeEndpointWithRemotePeer(destination_for_endpoint, | 
| +                                           &message_queue, channel_endpoint); | 
| +  owner()->SetProducerClosedNoLock(); | 
| + | 
| +  *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + | 
| +                 channel->GetSerializedEndpointSize(); | 
| +  return true; | 
| } | 
|  | 
| bool RemoteProducerDataPipeImpl::OnReadMessage(unsigned /*port*/, | 
|  |