| Index: mojo/edk/system/message_pipe.cc
|
| diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
|
| index ee5036fba2320e299d1e46b29c5016bc82a806ac..3e243b648419e5c25a7e74ecf69398e4db5aac82 100644
|
| --- a/mojo/edk/system/message_pipe.cc
|
| +++ b/mojo/edk/system/message_pipe.cc
|
| @@ -8,6 +8,7 @@
|
| #include "mojo/edk/system/channel.h"
|
| #include "mojo/edk/system/channel_endpoint.h"
|
| #include "mojo/edk/system/channel_endpoint_id.h"
|
| +#include "mojo/edk/system/endpoint_relayer.h"
|
| #include "mojo/edk/system/local_message_pipe_endpoint.h"
|
| #include "mojo/edk/system/message_in_transit.h"
|
| #include "mojo/edk/system/message_pipe_dispatcher.h"
|
| @@ -41,7 +42,7 @@ MessagePipe* MessagePipe::CreateLocalLocal() {
|
| // static
|
| MessagePipe* MessagePipe::CreateLocalProxy(
|
| scoped_refptr<ChannelEndpoint>* channel_endpoint) {
|
| - DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
|
| + DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
|
| MessagePipe* message_pipe = new MessagePipe();
|
| message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
|
| *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
|
| @@ -53,7 +54,7 @@ MessagePipe* MessagePipe::CreateLocalProxy(
|
| // static
|
| MessagePipe* MessagePipe::CreateProxyLocal(
|
| scoped_refptr<ChannelEndpoint>* channel_endpoint) {
|
| - DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
|
| + DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
|
| MessagePipe* message_pipe = new MessagePipe();
|
| *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
|
| message_pipe->endpoints_[0].reset(
|
| @@ -74,7 +75,7 @@ bool MessagePipe::Deserialize(Channel* channel,
|
| size_t size,
|
| scoped_refptr<MessagePipe>* message_pipe,
|
| unsigned* port) {
|
| - DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely.
|
| + DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
|
|
|
| if (size != sizeof(SerializedMessagePipe)) {
|
| LOG(ERROR) << "Invalid serialized message pipe";
|
| @@ -84,7 +85,7 @@ bool MessagePipe::Deserialize(Channel* channel,
|
| const SerializedMessagePipe* s =
|
| static_cast<const SerializedMessagePipe*>(source);
|
| *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
|
| - if (!message_pipe->get()) {
|
| + if (!*message_pipe) {
|
| LOG(ERROR) << "Failed to deserialize message pipe (ID = "
|
| << s->receiver_endpoint_id << ")";
|
| return false;
|
| @@ -104,18 +105,18 @@ MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
|
| return endpoints_[port]->GetType();
|
| }
|
|
|
| -void MessagePipe::CancelAllWaiters(unsigned port) {
|
| +void MessagePipe::CancelAllAwakables(unsigned port) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| base::AutoLock locker(lock_);
|
| DCHECK(endpoints_[port]);
|
| - endpoints_[port]->CancelAllWaiters();
|
| + endpoints_[port]->CancelAllAwakables();
|
| }
|
|
|
| void MessagePipe::Close(unsigned port) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| - unsigned destination_port = GetPeerPort(port);
|
| + unsigned peer_port = GetPeerPort(port);
|
|
|
| base::AutoLock locker(lock_);
|
| // The endpoint's |OnPeerClose()| may have been called first and returned
|
| @@ -124,9 +125,9 @@ void MessagePipe::Close(unsigned port) {
|
| return;
|
|
|
| endpoints_[port]->Close();
|
| - if (endpoints_[destination_port]) {
|
| - if (!endpoints_[destination_port]->OnPeerClose())
|
| - endpoints_[destination_port].reset();
|
| + if (endpoints_[peer_port]) {
|
| + if (!endpoints_[peer_port]->OnPeerClose())
|
| + endpoints_[peer_port].reset();
|
| }
|
| endpoints_[port].reset();
|
| }
|
| @@ -139,7 +140,9 @@ MojoResult MessagePipe::WriteMessage(
|
| std::vector<DispatcherTransport>* transports,
|
| MojoWriteMessageFlags flags) {
|
| DCHECK(port == 0 || port == 1);
|
| - return EnqueueMessage(
|
| +
|
| + base::AutoLock locker(lock_);
|
| + return EnqueueMessageNoLock(
|
| GetPeerPort(port),
|
| make_scoped_ptr(new MessageInTransit(
|
| MessageInTransit::kTypeEndpoint,
|
| @@ -171,28 +174,29 @@ HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
|
| return endpoints_[port]->GetHandleSignalsState();
|
| }
|
|
|
| -MojoResult MessagePipe::AddWaiter(unsigned port,
|
| - Waiter* waiter,
|
| - MojoHandleSignals signals,
|
| - uint32_t context,
|
| - HandleSignalsState* signals_state) {
|
| +MojoResult MessagePipe::AddAwakable(unsigned port,
|
| + Awakable* awakable,
|
| + MojoHandleSignals signals,
|
| + uint32_t context,
|
| + HandleSignalsState* signals_state) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| base::AutoLock locker(lock_);
|
| DCHECK(endpoints_[port]);
|
|
|
| - return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
|
| + return endpoints_[port]->AddAwakable(awakable, signals, context,
|
| + signals_state);
|
| }
|
|
|
| -void MessagePipe::RemoveWaiter(unsigned port,
|
| - Waiter* waiter,
|
| - HandleSignalsState* signals_state) {
|
| +void MessagePipe::RemoveAwakable(unsigned port,
|
| + Awakable* awakable,
|
| + HandleSignalsState* signals_state) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| base::AutoLock locker(lock_);
|
| DCHECK(endpoints_[port]);
|
|
|
| - endpoints_[port]->RemoveWaiter(waiter, signals_state);
|
| + endpoints_[port]->RemoveAwakable(awakable, signals_state);
|
| }
|
|
|
| void MessagePipe::StartSerialize(unsigned /*port*/,
|
| @@ -243,16 +247,17 @@ bool MessagePipe::EndSerialize(
|
| // We also pass its |ChannelEndpoint| to the channel, which then decides
|
| // what to do. We have no reason to continue to exist.
|
| //
|
| - // TODO(vtl): Factor some of this out to |ChannelEndpoint|.
|
| + // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
|
|
|
| - if (!endpoints_[GetPeerPort(port)]) {
|
| + unsigned peer_port = GetPeerPort(port);
|
| + if (!endpoints_[peer_port]) {
|
| // Case 1.
|
| channel_endpoint = new ChannelEndpoint(
|
| nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
|
| endpoints_[port].get())->message_queue());
|
| endpoints_[port]->Close();
|
| endpoints_[port].reset();
|
| - } else if (endpoints_[GetPeerPort(port)]->GetType() ==
|
| + } else if (endpoints_[peer_port]->GetType() ==
|
| MessagePipeEndpoint::kTypeLocal) {
|
| // Case 2.
|
| channel_endpoint = new ChannelEndpoint(
|
| @@ -263,15 +268,44 @@ bool MessagePipe::EndSerialize(
|
| new ProxyMessagePipeEndpoint(channel_endpoint.get()));
|
| } else {
|
| // Case 3.
|
| - // TODO(vtl): Temporarily the same as case 2.
|
| DLOG(WARNING) << "Direct message pipe passing across multiple channels "
|
| "not yet implemented; will proxy";
|
| +
|
| + // Create an |EndpointRelayer| to replace ourselves (rather than having a
|
| + // |MessagePipe| object that exists solely to relay messages between two
|
| + // |ChannelEndpoint|s, owned by the |Channel| through them.
|
| + //
|
| + // This reduces overhead somewhat, and more importantly restores some
|
| + // invariants, e.g., that |MessagePipe|s are owned by dispatchers.
|
| + //
|
| + // TODO(vtl): If we get the |Channel| to own/track the relayer directly,
|
| + // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw
|
| + // pointer (and not have the |Channel| owning the relayer via its
|
| + // |ChannelEndpoint|s.
|
| + //
|
| + // TODO(vtl): This is not obviously the right place for (all of) this
|
| + // logic, nor is it obviously factored correctly.
|
| +
|
| + DCHECK_EQ(endpoints_[peer_port]->GetType(),
|
| + MessagePipeEndpoint::kTypeProxy);
|
| + ProxyMessagePipeEndpoint* peer_endpoint =
|
| + static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
|
| + scoped_refptr<ChannelEndpoint> peer_channel_endpoint =
|
| + peer_endpoint->ReleaseChannelEndpoint();
|
| +
|
| + scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer());
|
| + // We'll assign our peer port's endpoint to the relayer's port 1, and this
|
| + // port's endpoint to the relayer's port 0.
|
| channel_endpoint = new ChannelEndpoint(
|
| - this, port, static_cast<LocalMessagePipeEndpoint*>(
|
| - endpoints_[port].get())->message_queue());
|
| + relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>(
|
| + endpoints_[port].get())->message_queue());
|
| + relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get());
|
| + peer_channel_endpoint->ReplaceClient(relayer.get(), 1);
|
| +
|
| endpoints_[port]->Close();
|
| - endpoints_[port].reset(
|
| - new ProxyMessagePipeEndpoint(channel_endpoint.get()));
|
| + endpoints_[port].reset();
|
| + // No need to call |Close()| after |ReleaseChannelEndpoint()|.
|
| + endpoints_[peer_port].reset();
|
| }
|
| }
|
|
|
| @@ -287,16 +321,27 @@ bool MessagePipe::EndSerialize(
|
| return true;
|
| }
|
|
|
| -void MessagePipe::OnReadMessage(unsigned port,
|
| - scoped_ptr<MessageInTransit> message) {
|
| +bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) {
|
| + base::AutoLock locker(lock_);
|
| +
|
| + if (!endpoints_[port]) {
|
| + // This will happen only on the rare occasion that the call to
|
| + // |OnReadMessage()| is racing with us calling
|
| + // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message,
|
| + // and the |ChannelEndpoint| can retry (calling the new client's
|
| + // |OnReadMessage()|).
|
| + return false;
|
| + }
|
| +
|
| // This is called when the |ChannelEndpoint| for the
|
| // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
|
| // We need to pass this message on to its peer port (typically a
|
| // |LocalMessagePipeEndpoint|).
|
| - MojoResult result =
|
| - EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr);
|
| + MojoResult result = EnqueueMessageNoLock(GetPeerPort(port),
|
| + make_scoped_ptr(message), nullptr);
|
| DLOG_IF(WARNING, result != MOJO_RESULT_OK)
|
| - << "EnqueueMessage() failed (result = " << result << ")";
|
| + << "EnqueueMessageNoLock() failed (result = " << result << ")";
|
| + return true;
|
| }
|
|
|
| void MessagePipe::OnDetachFromChannel(unsigned port) {
|
| @@ -314,7 +359,7 @@ MessagePipe::~MessagePipe() {
|
| DCHECK(!endpoints_[1]);
|
| }
|
|
|
| -MojoResult MessagePipe::EnqueueMessage(
|
| +MojoResult MessagePipe::EnqueueMessageNoLock(
|
| unsigned port,
|
| scoped_ptr<MessageInTransit> message,
|
| std::vector<DispatcherTransport>* transports) {
|
| @@ -322,8 +367,6 @@ MojoResult MessagePipe::EnqueueMessage(
|
| DCHECK(message);
|
|
|
| DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint);
|
| -
|
| - base::AutoLock locker(lock_);
|
| DCHECK(endpoints_[GetPeerPort(port)]);
|
|
|
| // The destination port need not be open, unlike the source port.
|
|
|