Index: mojo/edk/system/message_pipe.cc |
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc |
index ee5036fba2320e299d1e46b29c5016bc82a806ac..3e243b648419e5c25a7e74ecf69398e4db5aac82 100644 |
--- a/mojo/edk/system/message_pipe.cc |
+++ b/mojo/edk/system/message_pipe.cc |
@@ -8,6 +8,7 @@ |
#include "mojo/edk/system/channel.h" |
#include "mojo/edk/system/channel_endpoint.h" |
#include "mojo/edk/system/channel_endpoint_id.h" |
+#include "mojo/edk/system/endpoint_relayer.h" |
#include "mojo/edk/system/local_message_pipe_endpoint.h" |
#include "mojo/edk/system/message_in_transit.h" |
#include "mojo/edk/system/message_pipe_dispatcher.h" |
@@ -41,7 +42,7 @@ MessagePipe* MessagePipe::CreateLocalLocal() { |
// static |
MessagePipe* MessagePipe::CreateLocalProxy( |
scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
- DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. |
+ DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
MessagePipe* message_pipe = new MessagePipe(); |
message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint()); |
*channel_endpoint = new ChannelEndpoint(message_pipe, 1); |
@@ -53,7 +54,7 @@ MessagePipe* MessagePipe::CreateLocalProxy( |
// static |
MessagePipe* MessagePipe::CreateProxyLocal( |
scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
- DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely. |
+ DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
MessagePipe* message_pipe = new MessagePipe(); |
*channel_endpoint = new ChannelEndpoint(message_pipe, 0); |
message_pipe->endpoints_[0].reset( |
@@ -74,7 +75,7 @@ bool MessagePipe::Deserialize(Channel* channel, |
size_t size, |
scoped_refptr<MessagePipe>* message_pipe, |
unsigned* port) { |
- DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely. |
+ DCHECK(!*message_pipe); // Not technically wrong, but unlikely. |
if (size != sizeof(SerializedMessagePipe)) { |
LOG(ERROR) << "Invalid serialized message pipe"; |
@@ -84,7 +85,7 @@ bool MessagePipe::Deserialize(Channel* channel, |
const SerializedMessagePipe* s = |
static_cast<const SerializedMessagePipe*>(source); |
*message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); |
- if (!message_pipe->get()) { |
+ if (!*message_pipe) { |
LOG(ERROR) << "Failed to deserialize message pipe (ID = " |
<< s->receiver_endpoint_id << ")"; |
return false; |
@@ -104,18 +105,18 @@ MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { |
return endpoints_[port]->GetType(); |
} |
-void MessagePipe::CancelAllWaiters(unsigned port) { |
+void MessagePipe::CancelAllAwakables(unsigned port) { |
DCHECK(port == 0 || port == 1); |
base::AutoLock locker(lock_); |
DCHECK(endpoints_[port]); |
- endpoints_[port]->CancelAllWaiters(); |
+ endpoints_[port]->CancelAllAwakables(); |
} |
void MessagePipe::Close(unsigned port) { |
DCHECK(port == 0 || port == 1); |
- unsigned destination_port = GetPeerPort(port); |
+ unsigned peer_port = GetPeerPort(port); |
base::AutoLock locker(lock_); |
// The endpoint's |OnPeerClose()| may have been called first and returned |
@@ -124,9 +125,9 @@ void MessagePipe::Close(unsigned port) { |
return; |
endpoints_[port]->Close(); |
- if (endpoints_[destination_port]) { |
- if (!endpoints_[destination_port]->OnPeerClose()) |
- endpoints_[destination_port].reset(); |
+ if (endpoints_[peer_port]) { |
+ if (!endpoints_[peer_port]->OnPeerClose()) |
+ endpoints_[peer_port].reset(); |
} |
endpoints_[port].reset(); |
} |
@@ -139,7 +140,9 @@ MojoResult MessagePipe::WriteMessage( |
std::vector<DispatcherTransport>* transports, |
MojoWriteMessageFlags flags) { |
DCHECK(port == 0 || port == 1); |
- return EnqueueMessage( |
+ |
+ base::AutoLock locker(lock_); |
+ return EnqueueMessageNoLock( |
GetPeerPort(port), |
make_scoped_ptr(new MessageInTransit( |
MessageInTransit::kTypeEndpoint, |
@@ -171,28 +174,29 @@ HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { |
return endpoints_[port]->GetHandleSignalsState(); |
} |
-MojoResult MessagePipe::AddWaiter(unsigned port, |
- Waiter* waiter, |
- MojoHandleSignals signals, |
- uint32_t context, |
- HandleSignalsState* signals_state) { |
+MojoResult MessagePipe::AddAwakable(unsigned port, |
+ Awakable* awakable, |
+ MojoHandleSignals signals, |
+ uint32_t context, |
+ HandleSignalsState* signals_state) { |
DCHECK(port == 0 || port == 1); |
base::AutoLock locker(lock_); |
DCHECK(endpoints_[port]); |
- return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state); |
+ return endpoints_[port]->AddAwakable(awakable, signals, context, |
+ signals_state); |
} |
-void MessagePipe::RemoveWaiter(unsigned port, |
- Waiter* waiter, |
- HandleSignalsState* signals_state) { |
+void MessagePipe::RemoveAwakable(unsigned port, |
+ Awakable* awakable, |
+ HandleSignalsState* signals_state) { |
DCHECK(port == 0 || port == 1); |
base::AutoLock locker(lock_); |
DCHECK(endpoints_[port]); |
- endpoints_[port]->RemoveWaiter(waiter, signals_state); |
+ endpoints_[port]->RemoveAwakable(awakable, signals_state); |
} |
void MessagePipe::StartSerialize(unsigned /*port*/, |
@@ -243,16 +247,17 @@ bool MessagePipe::EndSerialize( |
// We also pass its |ChannelEndpoint| to the channel, which then decides |
// what to do. We have no reason to continue to exist. |
// |
- // TODO(vtl): Factor some of this out to |ChannelEndpoint|. |
+ // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). |
- if (!endpoints_[GetPeerPort(port)]) { |
+ unsigned peer_port = GetPeerPort(port); |
+ if (!endpoints_[peer_port]) { |
// Case 1. |
channel_endpoint = new ChannelEndpoint( |
nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( |
endpoints_[port].get())->message_queue()); |
endpoints_[port]->Close(); |
endpoints_[port].reset(); |
- } else if (endpoints_[GetPeerPort(port)]->GetType() == |
+ } else if (endpoints_[peer_port]->GetType() == |
MessagePipeEndpoint::kTypeLocal) { |
// Case 2. |
channel_endpoint = new ChannelEndpoint( |
@@ -263,15 +268,44 @@ bool MessagePipe::EndSerialize( |
new ProxyMessagePipeEndpoint(channel_endpoint.get())); |
} else { |
// Case 3. |
- // TODO(vtl): Temporarily the same as case 2. |
DLOG(WARNING) << "Direct message pipe passing across multiple channels " |
"not yet implemented; will proxy"; |
+ |
+ // Create an |EndpointRelayer| to replace ourselves (rather than having a |
+ // |MessagePipe| object that exists solely to relay messages between two |
+ // |ChannelEndpoint|s, owned by the |Channel| through them. |
+ // |
+ // This reduces overhead somewhat, and more importantly restores some |
+ // invariants, e.g., that |MessagePipe|s are owned by dispatchers. |
+ // |
+ // TODO(vtl): If we get the |Channel| to own/track the relayer directly, |
+ // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw |
+ // pointer (and not have the |Channel| owning the relayer via its |
+ // |ChannelEndpoint|s. |
+ // |
+ // TODO(vtl): This is not obviously the right place for (all of) this |
+ // logic, nor is it obviously factored correctly. |
+ |
+ DCHECK_EQ(endpoints_[peer_port]->GetType(), |
+ MessagePipeEndpoint::kTypeProxy); |
+ ProxyMessagePipeEndpoint* peer_endpoint = |
+ static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get()); |
+ scoped_refptr<ChannelEndpoint> peer_channel_endpoint = |
+ peer_endpoint->ReleaseChannelEndpoint(); |
+ |
+ scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer()); |
+ // We'll assign our peer port's endpoint to the relayer's port 1, and this |
+ // port's endpoint to the relayer's port 0. |
channel_endpoint = new ChannelEndpoint( |
- this, port, static_cast<LocalMessagePipeEndpoint*>( |
- endpoints_[port].get())->message_queue()); |
+ relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>( |
+ endpoints_[port].get())->message_queue()); |
+ relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); |
+ peer_channel_endpoint->ReplaceClient(relayer.get(), 1); |
+ |
endpoints_[port]->Close(); |
- endpoints_[port].reset( |
- new ProxyMessagePipeEndpoint(channel_endpoint.get())); |
+ endpoints_[port].reset(); |
+ // No need to call |Close()| after |ReleaseChannelEndpoint()|. |
+ endpoints_[peer_port].reset(); |
} |
} |
@@ -287,16 +321,27 @@ bool MessagePipe::EndSerialize( |
return true; |
} |
-void MessagePipe::OnReadMessage(unsigned port, |
- scoped_ptr<MessageInTransit> message) { |
+bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) { |
+ base::AutoLock locker(lock_); |
+ |
+ if (!endpoints_[port]) { |
+ // This will happen only on the rare occasion that the call to |
+ // |OnReadMessage()| is racing with us calling |
+ // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message, |
+ // and the |ChannelEndpoint| can retry (calling the new client's |
+ // |OnReadMessage()|). |
+ return false; |
+ } |
+ |
// This is called when the |ChannelEndpoint| for the |
// |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|). |
// We need to pass this message on to its peer port (typically a |
// |LocalMessagePipeEndpoint|). |
- MojoResult result = |
- EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr); |
+ MojoResult result = EnqueueMessageNoLock(GetPeerPort(port), |
+ make_scoped_ptr(message), nullptr); |
DLOG_IF(WARNING, result != MOJO_RESULT_OK) |
- << "EnqueueMessage() failed (result = " << result << ")"; |
+ << "EnqueueMessageNoLock() failed (result = " << result << ")"; |
+ return true; |
} |
void MessagePipe::OnDetachFromChannel(unsigned port) { |
@@ -314,7 +359,7 @@ MessagePipe::~MessagePipe() { |
DCHECK(!endpoints_[1]); |
} |
-MojoResult MessagePipe::EnqueueMessage( |
+MojoResult MessagePipe::EnqueueMessageNoLock( |
unsigned port, |
scoped_ptr<MessageInTransit> message, |
std::vector<DispatcherTransport>* transports) { |
@@ -322,8 +367,6 @@ MojoResult MessagePipe::EnqueueMessage( |
DCHECK(message); |
DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint); |
- |
- base::AutoLock locker(lock_); |
DCHECK(endpoints_[GetPeerPort(port)]); |
// The destination port need not be open, unlike the source port. |