| Index: mojo/edk/system/message_pipe.cc
|
| diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
|
| index 1b0427b6ec5b2454fd49464c9aa057ee97892202..554401e875bd2ab8103ae07931406abe6b823928 100644
|
| --- a/mojo/edk/system/message_pipe.cc
|
| +++ b/mojo/edk/system/message_pipe.cc
|
| @@ -8,7 +8,6 @@
|
| #include "mojo/edk/system/channel.h"
|
| #include "mojo/edk/system/channel_endpoint.h"
|
| #include "mojo/edk/system/channel_endpoint_id.h"
|
| -#include "mojo/edk/system/endpoint_relayer.h"
|
| #include "mojo/edk/system/incoming_endpoint.h"
|
| #include "mojo/edk/system/local_message_pipe_endpoint.h"
|
| #include "mojo/edk/system/message_in_transit.h"
|
| @@ -223,101 +222,53 @@ bool MessagePipe::EndSerialize(
|
| embedder::PlatformHandleVector* /*platform_handles*/) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| - scoped_refptr<ChannelEndpoint> channel_endpoint;
|
| - {
|
| - base::AutoLock locker(lock_);
|
| - DCHECK(endpoints_[port]);
|
| -
|
| - // The port being serialized must be local.
|
| - DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
|
| -
|
| - // There are three possibilities for the peer port (below). In all cases, we
|
| - // pass the contents of |port|'s message queue to the channel, and it'll
|
| - // (presumably) make a |ChannelEndpoint| from it.
|
| - //
|
| - // 1. The peer port is (known to be) closed.
|
| - //
|
| - // There's no reason for us to continue to exist and no need for the
|
| - // channel to give us the |ChannelEndpoint|. It only remains for us to
|
| - // "close" |port|'s |LocalMessagePipeEndpoint| and prepare for
|
| - // destruction.
|
| - //
|
| - // 2. The peer port is local (the typical case).
|
| - //
|
| - // The channel gives us back a |ChannelEndpoint|, which we hook up to a
|
| - // |ProxyMessagePipeEndpoint| to replace |port|'s
|
| - // |LocalMessagePipeEndpoint|. We continue to exist, since the peer
|
| - // port's message pipe dispatcher will continue to hold a reference to
|
| - // us.
|
| - //
|
| - // 3. The peer port is remote.
|
| - //
|
| - // We also pass its |ChannelEndpoint| to the channel, which then decides
|
| - // what to do. We have no reason to continue to exist.
|
| - //
|
| - // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
|
| -
|
| - // The replacement for |endpoints_[port]|, if any.
|
| - MessagePipeEndpoint* replacement_endpoint = nullptr;
|
| -
|
| - unsigned peer_port = GetPeerPort(port);
|
| - if (!endpoints_[peer_port]) { // Case 1.
|
| - channel_endpoint = new ChannelEndpoint(
|
| - nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
|
| - endpoints_[port].get())->message_queue());
|
| - } else if (endpoints_[peer_port]->GetType() ==
|
| - MessagePipeEndpoint::kTypeLocal) { // Case 2.
|
| - channel_endpoint = new ChannelEndpoint(
|
| - this, port, static_cast<LocalMessagePipeEndpoint*>(
|
| - endpoints_[port].get())->message_queue());
|
| - replacement_endpoint =
|
| - new ProxyMessagePipeEndpoint(channel_endpoint.get());
|
| - } else { // Case 3.
|
| - DLOG(WARNING) << "Direct message pipe passing across multiple channels "
|
| - "not yet implemented; will proxy";
|
| -
|
| - // Create an |EndpointRelayer| to replace ourselves (rather than having a
|
| - // |MessagePipe| object that exists solely to relay messages between two
|
| - // |ChannelEndpoint|s, owned by the |Channel| through them.
|
| - //
|
| - // This reduces overhead somewhat, and more importantly restores some
|
| - // invariants, e.g., that |MessagePipe|s are owned by dispatchers.
|
| - //
|
| - // TODO(vtl): If we get the |Channel| to own/track the relayer directly,
|
| - // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw
|
| - // pointer (and not have the |Channel| owning the relayer via its
|
| - // |ChannelEndpoint|s.
|
| - //
|
| - // TODO(vtl): This is not obviously the right place for (all of) this
|
| - // logic, nor is it obviously factored correctly.
|
| -
|
| - DCHECK_EQ(endpoints_[peer_port]->GetType(),
|
| - MessagePipeEndpoint::kTypeProxy);
|
| - ProxyMessagePipeEndpoint* peer_endpoint =
|
| - static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
|
| - scoped_refptr<ChannelEndpoint> peer_channel_endpoint =
|
| - peer_endpoint->ReleaseChannelEndpoint();
|
| -
|
| - scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer());
|
| - // We'll assign our peer port's endpoint to the relayer's port 1, and this
|
| - // port's endpoint to the relayer's port 0.
|
| - channel_endpoint = new ChannelEndpoint(
|
| - relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>(
|
| - endpoints_[port].get())->message_queue());
|
| - relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get());
|
| - peer_channel_endpoint->ReplaceClient(relayer.get(), 1);
|
| -
|
| - // No need to call |Close()| after |ReleaseChannelEndpoint()|.
|
| - endpoints_[peer_port].reset();
|
| - }
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(endpoints_[port]);
|
| +
|
| + // The port being serialized must be local.
|
| + DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
|
|
|
| - endpoints_[port]->Close();
|
| - endpoints_[port].reset(replacement_endpoint);
|
| + unsigned peer_port = GetPeerPort(port);
|
| + MessageInTransitQueue* message_queue =
|
| + static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get())
|
| + ->message_queue();
|
| + // The replacement for |endpoints_[port]|, if any.
|
| + MessagePipeEndpoint* replacement_endpoint = nullptr;
|
| +
|
| + // The three cases below correspond to the ones described above
|
| + // |Channel::SerializeEndpoint...()| (in channel.h).
|
| + if (!endpoints_[peer_port]) {
|
| + // Case 1: (known-)closed peer port. There's no reason for us to continue to
|
| + // exist afterwards.
|
| + channel->SerializeEndpointWithClosedPeer(destination, message_queue);
|
| + } else if (endpoints_[peer_port]->GetType() ==
|
| + MessagePipeEndpoint::kTypeLocal) {
|
| + // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint|
|
| + // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that
|
| + // the |Channel| returns to us.
|
| + scoped_refptr<ChannelEndpoint> channel_endpoint =
|
| + channel->SerializeEndpointWithLocalPeer(destination, message_queue,
|
| + this, port);
|
| + replacement_endpoint = new ProxyMessagePipeEndpoint(channel_endpoint.get());
|
| + } else {
|
| + // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and
|
| + // pass it to the |Channel|. There's no reason for us to continue to exist
|
| + // afterwards.
|
| + DCHECK_EQ(endpoints_[peer_port]->GetType(),
|
| + MessagePipeEndpoint::kTypeProxy);
|
| + ProxyMessagePipeEndpoint* peer_endpoint =
|
| + static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
|
| + scoped_refptr<ChannelEndpoint> peer_channel_endpoint =
|
| + peer_endpoint->ReleaseChannelEndpoint();
|
| + channel->SerializeEndpointWithRemotePeer(destination, message_queue,
|
| + peer_channel_endpoint);
|
| + // No need to call |Close()| after |ReleaseChannelEndpoint()|.
|
| + endpoints_[peer_port].reset();
|
| }
|
|
|
| - // TODO(vtl): More/most of the above should be moved into (some variant of)
|
| - // |Channel::SerializeEndpoint()|.
|
| - channel->SerializeEndpoint(channel_endpoint, destination);
|
| + endpoints_[port]->Close();
|
| + endpoints_[port].reset(replacement_endpoint);
|
| +
|
| *actual_size = channel->GetSerializedEndpointSize();
|
| return true;
|
| }
|
|
|