Index: mojo/edk/system/message_pipe.cc |
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc |
index 3e243b648419e5c25a7e74ecf69398e4db5aac82..1b0427b6ec5b2454fd49464c9aa057ee97892202 100644 |
--- a/mojo/edk/system/message_pipe.cc |
+++ b/mojo/edk/system/message_pipe.cc |
@@ -9,6 +9,7 @@ |
#include "mojo/edk/system/channel_endpoint.h" |
#include "mojo/edk/system/channel_endpoint_id.h" |
#include "mojo/edk/system/endpoint_relayer.h" |
+#include "mojo/edk/system/incoming_endpoint.h" |
#include "mojo/edk/system/local_message_pipe_endpoint.h" |
#include "mojo/edk/system/message_in_transit.h" |
#include "mojo/edk/system/message_pipe_dispatcher.h" |
@@ -18,19 +19,6 @@ |
namespace mojo { |
namespace system { |
-namespace { |
- |
-// TODO(vtl): Move this into |Channel| (and possible further). |
-struct SerializedMessagePipe { |
- // This is the endpoint ID on the receiving side, and should be a "remote ID". |
- // (The receiving side should already have had an endpoint attached and been |
- // run via the |Channel|s. This endpoint will have both IDs assigned, so this |
- // ID is only needed to associate that endpoint with a particular dispatcher.) |
- ChannelEndpointId receiver_endpoint_id; |
-}; |
- |
-} // namespace |
- |
// static |
MessagePipe* MessagePipe::CreateLocalLocal() { |
MessagePipe* message_pipe = new MessagePipe(); |
@@ -52,6 +40,30 @@ MessagePipe* MessagePipe::CreateLocalProxy( |
} |
// static |
+MessagePipe* MessagePipe::CreateLocalProxyFromExisting( |
+ MessageInTransitQueue* message_queue, |
+ ChannelEndpoint* channel_endpoint) { |
+ DCHECK(message_queue); |
+ MessagePipe* message_pipe = new MessagePipe(); |
+ message_pipe->endpoints_[0].reset( |
+ new LocalMessagePipeEndpoint(message_queue)); |
+ if (channel_endpoint) { |
+ bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1); |
+ message_pipe->endpoints_[1].reset( |
+ new ProxyMessagePipeEndpoint(channel_endpoint)); |
+ if (!attached_to_channel) |
+ message_pipe->OnDetachFromChannel(1); |
+ } else { |
+ // This means that the proxy side was already closed; we only need to inform |
+ // the local side of this. |
+ // TODO(vtl): This is safe to do without locking (but perhaps slightly |
+ // dubious), since no other thread has access to |message_pipe| yet. |
+ message_pipe->endpoints_[0]->OnPeerClose(); |
+ } |
+ return message_pipe; |
+} |
+ |
+// static |
MessagePipe* MessagePipe::CreateProxyLocal( |
scoped_refptr<ChannelEndpoint>* channel_endpoint) { |
DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely. |
@@ -77,22 +89,18 @@ bool MessagePipe::Deserialize(Channel* channel, |
unsigned* port) { |
DCHECK(!*message_pipe); // Not technically wrong, but unlikely. |
- if (size != sizeof(SerializedMessagePipe)) { |
+ if (size != channel->GetSerializedEndpointSize()) { |
LOG(ERROR) << "Invalid serialized message pipe"; |
return false; |
} |
- const SerializedMessagePipe* s = |
- static_cast<const SerializedMessagePipe*>(source); |
- *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); |
- if (!*message_pipe) { |
- LOG(ERROR) << "Failed to deserialize message pipe (ID = " |
- << s->receiver_endpoint_id << ")"; |
+ scoped_refptr<IncomingEndpoint> incoming_endpoint = |
+ channel->DeserializeEndpoint(source); |
+ if (!incoming_endpoint) |
return false; |
- } |
- DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " |
- << s->receiver_endpoint_id << ")"; |
+ *message_pipe = incoming_endpoint->ConvertToMessagePipe(); |
+ DCHECK(*message_pipe); |
*port = 0; |
return true; |
} |
@@ -200,10 +208,10 @@ void MessagePipe::RemoveAwakable(unsigned port, |
} |
void MessagePipe::StartSerialize(unsigned /*port*/, |
- Channel* /*channel*/, |
+ Channel* channel, |
size_t* max_size, |
size_t* max_platform_handles) { |
- *max_size = sizeof(SerializedMessagePipe); |
+ *max_size = channel->GetSerializedEndpointSize(); |
*max_platform_handles = 0; |
} |
@@ -249,25 +257,22 @@ bool MessagePipe::EndSerialize( |
// |
// TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|). |
+ // The replacement for |endpoints_[port]|, if any. |
+ MessagePipeEndpoint* replacement_endpoint = nullptr; |
+ |
unsigned peer_port = GetPeerPort(port); |
- if (!endpoints_[peer_port]) { |
- // Case 1. |
+ if (!endpoints_[peer_port]) { // Case 1. |
channel_endpoint = new ChannelEndpoint( |
nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( |
endpoints_[port].get())->message_queue()); |
- endpoints_[port]->Close(); |
- endpoints_[port].reset(); |
} else if (endpoints_[peer_port]->GetType() == |
- MessagePipeEndpoint::kTypeLocal) { |
- // Case 2. |
+ MessagePipeEndpoint::kTypeLocal) { // Case 2. |
channel_endpoint = new ChannelEndpoint( |
this, port, static_cast<LocalMessagePipeEndpoint*>( |
endpoints_[port].get())->message_queue()); |
- endpoints_[port]->Close(); |
- endpoints_[port].reset( |
- new ProxyMessagePipeEndpoint(channel_endpoint.get())); |
- } else { |
- // Case 3. |
+ replacement_endpoint = |
+ new ProxyMessagePipeEndpoint(channel_endpoint.get()); |
+ } else { // Case 3. |
DLOG(WARNING) << "Direct message pipe passing across multiple channels " |
"not yet implemented; will proxy"; |
@@ -302,22 +307,18 @@ bool MessagePipe::EndSerialize( |
relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get()); |
peer_channel_endpoint->ReplaceClient(relayer.get(), 1); |
- endpoints_[port]->Close(); |
- endpoints_[port].reset(); |
// No need to call |Close()| after |ReleaseChannelEndpoint()|. |
endpoints_[peer_port].reset(); |
} |
- } |
- SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); |
+ endpoints_[port]->Close(); |
+ endpoints_[port].reset(replacement_endpoint); |
+ } |
- // Convert the local endpoint to a proxy endpoint (moving the message queue) |
- // and attach it to the channel. |
- s->receiver_endpoint_id = |
- channel->AttachAndRunEndpoint(channel_endpoint, false); |
- DVLOG(2) << "Serializing message pipe (remote ID = " |
- << s->receiver_endpoint_id << ")"; |
- *actual_size = sizeof(SerializedMessagePipe); |
+ // TODO(vtl): More/most of the above should be moved into (some variant of) |
+ // |Channel::SerializeEndpoint()|. |
+ channel->SerializeEndpoint(channel_endpoint, destination); |
+ *actual_size = channel->GetSerializedEndpointSize(); |
return true; |
} |