Index: mojo/edk/system/remote_producer_data_pipe_impl.cc |
diff --git a/mojo/edk/system/remote_producer_data_pipe_impl.cc b/mojo/edk/system/remote_producer_data_pipe_impl.cc |
index 4946e114eeb41eb549fd527aed1f71273eabb342..26ef410d7aad6a5f9c65e1fb736fade01d492c86 100644 |
--- a/mojo/edk/system/remote_producer_data_pipe_impl.cc |
+++ b/mojo/edk/system/remote_producer_data_pipe_impl.cc |
@@ -10,6 +10,7 @@ |
#include "base/logging.h" |
#include "base/memory/scoped_ptr.h" |
+#include "mojo/edk/system/channel.h" |
#include "mojo/edk/system/channel_endpoint.h" |
#include "mojo/edk/system/configuration.h" |
#include "mojo/edk/system/data_pipe.h" |
@@ -302,9 +303,8 @@ void RemoteProducerDataPipeImpl::ConsumerStartSerialize( |
Channel* channel, |
size_t* max_size, |
size_t* max_platform_handles) { |
- // TODO(vtl): Support serializing consumer data pipe handles. |
- NOTIMPLEMENTED(); // FIXME |
- *max_size = 0; |
+ *max_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
+ channel->GetSerializedEndpointSize(); |
*max_platform_handles = 0; |
} |
@@ -313,10 +313,38 @@ bool RemoteProducerDataPipeImpl::ConsumerEndSerialize( |
void* destination, |
size_t* actual_size, |
embedder::PlatformHandleVector* platform_handles) { |
- // TODO(vtl): Support serializing consumer data pipe handles. |
- NOTIMPLEMENTED(); // FIXME |
- owner()->ConsumerCloseNoLock(); |
- return false; |
+ SerializedDataPipeConsumerDispatcher* s = |
+ static_cast<SerializedDataPipeConsumerDispatcher*>(destination); |
+ s->validated_options = validated_options(); |
+ void* destination_for_endpoint = static_cast<char*>(destination) + |
+ sizeof(SerializedDataPipeConsumerDispatcher); |
+ |
+ MessageInTransitQueue message_queue; |
+ ConvertDataToMessages(buffer_.get(), &start_index_, ¤t_num_bytes_, |
+ &message_queue); |
+ |
+ if (!producer_open()) { |
+ // Case 1: The producer is closed. |
+ channel->SerializeEndpointWithClosedPeer(destination_for_endpoint, |
+ &message_queue); |
+ *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
+ channel->GetSerializedEndpointSize(); |
+ return true; |
+ } |
+ |
+ // Case 2: The producer isn't closed. We pass |channel_endpoint| back to the |
+ // |Channel|. There's no reason for us to continue to exist afterwards. |
+ |
+ // Note: We don't use |port|. |
+ scoped_refptr<ChannelEndpoint> channel_endpoint; |
+ channel_endpoint.swap(channel_endpoint_); |
+ channel->SerializeEndpointWithRemotePeer(destination_for_endpoint, |
+ &message_queue, channel_endpoint); |
+ owner()->SetProducerClosedNoLock(); |
+ |
+ *actual_size = sizeof(SerializedDataPipeConsumerDispatcher) + |
+ channel->GetSerializedEndpointSize(); |
+ return true; |
} |
bool RemoteProducerDataPipeImpl::OnReadMessage(unsigned /*port*/, |