| Index: mojo/edk/system/message_pipe.cc
 | 
| diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
 | 
| index ee5036fba2320e299d1e46b29c5016bc82a806ac..3e243b648419e5c25a7e74ecf69398e4db5aac82 100644
 | 
| --- a/mojo/edk/system/message_pipe.cc
 | 
| +++ b/mojo/edk/system/message_pipe.cc
 | 
| @@ -8,6 +8,7 @@
 | 
|  #include "mojo/edk/system/channel.h"
 | 
|  #include "mojo/edk/system/channel_endpoint.h"
 | 
|  #include "mojo/edk/system/channel_endpoint_id.h"
 | 
| +#include "mojo/edk/system/endpoint_relayer.h"
 | 
|  #include "mojo/edk/system/local_message_pipe_endpoint.h"
 | 
|  #include "mojo/edk/system/message_in_transit.h"
 | 
|  #include "mojo/edk/system/message_pipe_dispatcher.h"
 | 
| @@ -41,7 +42,7 @@ MessagePipe* MessagePipe::CreateLocalLocal() {
 | 
|  // static
 | 
|  MessagePipe* MessagePipe::CreateLocalProxy(
 | 
|      scoped_refptr<ChannelEndpoint>* channel_endpoint) {
 | 
| -  DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
 | 
| +  DCHECK(!*channel_endpoint);  // Not technically wrong, but unlikely.
 | 
|    MessagePipe* message_pipe = new MessagePipe();
 | 
|    message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
 | 
|    *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
 | 
| @@ -53,7 +54,7 @@ MessagePipe* MessagePipe::CreateLocalProxy(
 | 
|  // static
 | 
|  MessagePipe* MessagePipe::CreateProxyLocal(
 | 
|      scoped_refptr<ChannelEndpoint>* channel_endpoint) {
 | 
| -  DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
 | 
| +  DCHECK(!*channel_endpoint);  // Not technically wrong, but unlikely.
 | 
|    MessagePipe* message_pipe = new MessagePipe();
 | 
|    *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
 | 
|    message_pipe->endpoints_[0].reset(
 | 
| @@ -74,7 +75,7 @@ bool MessagePipe::Deserialize(Channel* channel,
 | 
|                                size_t size,
 | 
|                                scoped_refptr<MessagePipe>* message_pipe,
 | 
|                                unsigned* port) {
 | 
| -  DCHECK(!message_pipe->get());  // Not technically wrong, but unlikely.
 | 
| +  DCHECK(!*message_pipe);  // Not technically wrong, but unlikely.
 | 
|  
 | 
|    if (size != sizeof(SerializedMessagePipe)) {
 | 
|      LOG(ERROR) << "Invalid serialized message pipe";
 | 
| @@ -84,7 +85,7 @@ bool MessagePipe::Deserialize(Channel* channel,
 | 
|    const SerializedMessagePipe* s =
 | 
|        static_cast<const SerializedMessagePipe*>(source);
 | 
|    *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
 | 
| -  if (!message_pipe->get()) {
 | 
| +  if (!*message_pipe) {
 | 
|      LOG(ERROR) << "Failed to deserialize message pipe (ID = "
 | 
|                 << s->receiver_endpoint_id << ")";
 | 
|      return false;
 | 
| @@ -104,18 +105,18 @@ MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
 | 
|    return endpoints_[port]->GetType();
 | 
|  }
 | 
|  
 | 
| -void MessagePipe::CancelAllWaiters(unsigned port) {
 | 
| +void MessagePipe::CancelAllAwakables(unsigned port) {
 | 
|    DCHECK(port == 0 || port == 1);
 | 
|  
 | 
|    base::AutoLock locker(lock_);
 | 
|    DCHECK(endpoints_[port]);
 | 
| -  endpoints_[port]->CancelAllWaiters();
 | 
| +  endpoints_[port]->CancelAllAwakables();
 | 
|  }
 | 
|  
 | 
|  void MessagePipe::Close(unsigned port) {
 | 
|    DCHECK(port == 0 || port == 1);
 | 
|  
 | 
| -  unsigned destination_port = GetPeerPort(port);
 | 
| +  unsigned peer_port = GetPeerPort(port);
 | 
|  
 | 
|    base::AutoLock locker(lock_);
 | 
|    // The endpoint's |OnPeerClose()| may have been called first and returned
 | 
| @@ -124,9 +125,9 @@ void MessagePipe::Close(unsigned port) {
 | 
|      return;
 | 
|  
 | 
|    endpoints_[port]->Close();
 | 
| -  if (endpoints_[destination_port]) {
 | 
| -    if (!endpoints_[destination_port]->OnPeerClose())
 | 
| -      endpoints_[destination_port].reset();
 | 
| +  if (endpoints_[peer_port]) {
 | 
| +    if (!endpoints_[peer_port]->OnPeerClose())
 | 
| +      endpoints_[peer_port].reset();
 | 
|    }
 | 
|    endpoints_[port].reset();
 | 
|  }
 | 
| @@ -139,7 +140,9 @@ MojoResult MessagePipe::WriteMessage(
 | 
|      std::vector<DispatcherTransport>* transports,
 | 
|      MojoWriteMessageFlags flags) {
 | 
|    DCHECK(port == 0 || port == 1);
 | 
| -  return EnqueueMessage(
 | 
| +
 | 
| +  base::AutoLock locker(lock_);
 | 
| +  return EnqueueMessageNoLock(
 | 
|        GetPeerPort(port),
 | 
|        make_scoped_ptr(new MessageInTransit(
 | 
|            MessageInTransit::kTypeEndpoint,
 | 
| @@ -171,28 +174,29 @@ HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
 | 
|    return endpoints_[port]->GetHandleSignalsState();
 | 
|  }
 | 
|  
 | 
| -MojoResult MessagePipe::AddWaiter(unsigned port,
 | 
| -                                  Waiter* waiter,
 | 
| -                                  MojoHandleSignals signals,
 | 
| -                                  uint32_t context,
 | 
| -                                  HandleSignalsState* signals_state) {
 | 
| +MojoResult MessagePipe::AddAwakable(unsigned port,
 | 
| +                                    Awakable* awakable,
 | 
| +                                    MojoHandleSignals signals,
 | 
| +                                    uint32_t context,
 | 
| +                                    HandleSignalsState* signals_state) {
 | 
|    DCHECK(port == 0 || port == 1);
 | 
|  
 | 
|    base::AutoLock locker(lock_);
 | 
|    DCHECK(endpoints_[port]);
 | 
|  
 | 
| -  return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
 | 
| +  return endpoints_[port]->AddAwakable(awakable, signals, context,
 | 
| +                                       signals_state);
 | 
|  }
 | 
|  
 | 
| -void MessagePipe::RemoveWaiter(unsigned port,
 | 
| -                               Waiter* waiter,
 | 
| -                               HandleSignalsState* signals_state) {
 | 
| +void MessagePipe::RemoveAwakable(unsigned port,
 | 
| +                                 Awakable* awakable,
 | 
| +                                 HandleSignalsState* signals_state) {
 | 
|    DCHECK(port == 0 || port == 1);
 | 
|  
 | 
|    base::AutoLock locker(lock_);
 | 
|    DCHECK(endpoints_[port]);
 | 
|  
 | 
| -  endpoints_[port]->RemoveWaiter(waiter, signals_state);
 | 
| +  endpoints_[port]->RemoveAwakable(awakable, signals_state);
 | 
|  }
 | 
|  
 | 
|  void MessagePipe::StartSerialize(unsigned /*port*/,
 | 
| @@ -243,16 +247,17 @@ bool MessagePipe::EndSerialize(
 | 
|      //     We also pass its |ChannelEndpoint| to the channel, which then decides
 | 
|      //     what to do. We have no reason to continue to exist.
 | 
|      //
 | 
| -    // TODO(vtl): Factor some of this out to |ChannelEndpoint|.
 | 
| +    // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
 | 
|  
 | 
| -    if (!endpoints_[GetPeerPort(port)]) {
 | 
| +    unsigned peer_port = GetPeerPort(port);
 | 
| +    if (!endpoints_[peer_port]) {
 | 
|        // Case 1.
 | 
|        channel_endpoint = new ChannelEndpoint(
 | 
|            nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
 | 
|                            endpoints_[port].get())->message_queue());
 | 
|        endpoints_[port]->Close();
 | 
|        endpoints_[port].reset();
 | 
| -    } else if (endpoints_[GetPeerPort(port)]->GetType() ==
 | 
| +    } else if (endpoints_[peer_port]->GetType() ==
 | 
|                 MessagePipeEndpoint::kTypeLocal) {
 | 
|        // Case 2.
 | 
|        channel_endpoint = new ChannelEndpoint(
 | 
| @@ -263,15 +268,44 @@ bool MessagePipe::EndSerialize(
 | 
|            new ProxyMessagePipeEndpoint(channel_endpoint.get()));
 | 
|      } else {
 | 
|        // Case 3.
 | 
| -      // TODO(vtl): Temporarily the same as case 2.
 | 
|        DLOG(WARNING) << "Direct message pipe passing across multiple channels "
 | 
|                         "not yet implemented; will proxy";
 | 
| +
 | 
| +      // Create an |EndpointRelayer| to replace ourselves (rather than having a
 | 
| +      // |MessagePipe| object that exists solely to relay messages between two
 | 
| +      // |ChannelEndpoint|s, owned by the |Channel| through them.
 | 
| +      //
 | 
| +      // This reduces overhead somewhat, and more importantly restores some
 | 
| +      // invariants, e.g., that |MessagePipe|s are owned by dispatchers.
 | 
| +      //
 | 
| +      // TODO(vtl): If we get the |Channel| to own/track the relayer directly,
 | 
| +      // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw
 | 
| +      // pointer (and not have the |Channel| owning the relayer via its
 | 
| +      // |ChannelEndpoint|s.
 | 
| +      //
 | 
| +      // TODO(vtl): This is not obviously the right place for (all of) this
 | 
| +      // logic, nor is it obviously factored correctly.
 | 
| +
 | 
| +      DCHECK_EQ(endpoints_[peer_port]->GetType(),
 | 
| +                MessagePipeEndpoint::kTypeProxy);
 | 
| +      ProxyMessagePipeEndpoint* peer_endpoint =
 | 
| +          static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
 | 
| +      scoped_refptr<ChannelEndpoint> peer_channel_endpoint =
 | 
| +          peer_endpoint->ReleaseChannelEndpoint();
 | 
| +
 | 
| +      scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer());
 | 
| +      // We'll assign our peer port's endpoint to the relayer's port 1, and this
 | 
| +      // port's endpoint to the relayer's port 0.
 | 
|        channel_endpoint = new ChannelEndpoint(
 | 
| -          this, port, static_cast<LocalMessagePipeEndpoint*>(
 | 
| -                          endpoints_[port].get())->message_queue());
 | 
| +          relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>(
 | 
| +                                endpoints_[port].get())->message_queue());
 | 
| +      relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get());
 | 
| +      peer_channel_endpoint->ReplaceClient(relayer.get(), 1);
 | 
| +
 | 
|        endpoints_[port]->Close();
 | 
| -      endpoints_[port].reset(
 | 
| -          new ProxyMessagePipeEndpoint(channel_endpoint.get()));
 | 
| +      endpoints_[port].reset();
 | 
| +      // No need to call |Close()| after |ReleaseChannelEndpoint()|.
 | 
| +      endpoints_[peer_port].reset();
 | 
|      }
 | 
|    }
 | 
|  
 | 
| @@ -287,16 +321,27 @@ bool MessagePipe::EndSerialize(
 | 
|    return true;
 | 
|  }
 | 
|  
 | 
| -void MessagePipe::OnReadMessage(unsigned port,
 | 
| -                                scoped_ptr<MessageInTransit> message) {
 | 
| +bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) {
 | 
| +  base::AutoLock locker(lock_);
 | 
| +
 | 
| +  if (!endpoints_[port]) {
 | 
| +    // This will happen only on the rare occasion that the call to
 | 
| +    // |OnReadMessage()| is racing with us calling
 | 
| +    // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message,
 | 
| +    // and the |ChannelEndpoint| can retry (calling the new client's
 | 
| +    // |OnReadMessage()|).
 | 
| +    return false;
 | 
| +  }
 | 
| +
 | 
|    // This is called when the |ChannelEndpoint| for the
 | 
|    // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
 | 
|    // We need to pass this message on to its peer port (typically a
 | 
|    // |LocalMessagePipeEndpoint|).
 | 
| -  MojoResult result =
 | 
| -      EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr);
 | 
| +  MojoResult result = EnqueueMessageNoLock(GetPeerPort(port),
 | 
| +                                           make_scoped_ptr(message), nullptr);
 | 
|    DLOG_IF(WARNING, result != MOJO_RESULT_OK)
 | 
| -      << "EnqueueMessage() failed (result  = " << result << ")";
 | 
| +      << "EnqueueMessageNoLock() failed (result  = " << result << ")";
 | 
| +  return true;
 | 
|  }
 | 
|  
 | 
|  void MessagePipe::OnDetachFromChannel(unsigned port) {
 | 
| @@ -314,7 +359,7 @@ MessagePipe::~MessagePipe() {
 | 
|    DCHECK(!endpoints_[1]);
 | 
|  }
 | 
|  
 | 
| -MojoResult MessagePipe::EnqueueMessage(
 | 
| +MojoResult MessagePipe::EnqueueMessageNoLock(
 | 
|      unsigned port,
 | 
|      scoped_ptr<MessageInTransit> message,
 | 
|      std::vector<DispatcherTransport>* transports) {
 | 
| @@ -322,8 +367,6 @@ MojoResult MessagePipe::EnqueueMessage(
 | 
|    DCHECK(message);
 | 
|  
 | 
|    DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint);
 | 
| -
 | 
| -  base::AutoLock locker(lock_);
 | 
|    DCHECK(endpoints_[GetPeerPort(port)]);
 | 
|  
 | 
|    // The destination port need not be open, unlike the source port.
 | 
| 
 |