| Index: mojo/edk/system/message_pipe.cc
|
| diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
|
| index 3e243b648419e5c25a7e74ecf69398e4db5aac82..1b0427b6ec5b2454fd49464c9aa057ee97892202 100644
|
| --- a/mojo/edk/system/message_pipe.cc
|
| +++ b/mojo/edk/system/message_pipe.cc
|
| @@ -9,6 +9,7 @@
|
| #include "mojo/edk/system/channel_endpoint.h"
|
| #include "mojo/edk/system/channel_endpoint_id.h"
|
| #include "mojo/edk/system/endpoint_relayer.h"
|
| +#include "mojo/edk/system/incoming_endpoint.h"
|
| #include "mojo/edk/system/local_message_pipe_endpoint.h"
|
| #include "mojo/edk/system/message_in_transit.h"
|
| #include "mojo/edk/system/message_pipe_dispatcher.h"
|
| @@ -18,19 +19,6 @@
|
| namespace mojo {
|
| namespace system {
|
|
|
| -namespace {
|
| -
|
| -// TODO(vtl): Move this into |Channel| (and possible further).
|
| -struct SerializedMessagePipe {
|
| - // This is the endpoint ID on the receiving side, and should be a "remote ID".
|
| - // (The receiving side should already have had an endpoint attached and been
|
| - // run via the |Channel|s. This endpoint will have both IDs assigned, so this
|
| - // ID is only needed to associate that endpoint with a particular dispatcher.)
|
| - ChannelEndpointId receiver_endpoint_id;
|
| -};
|
| -
|
| -} // namespace
|
| -
|
| // static
|
| MessagePipe* MessagePipe::CreateLocalLocal() {
|
| MessagePipe* message_pipe = new MessagePipe();
|
| @@ -52,6 +40,30 @@ MessagePipe* MessagePipe::CreateLocalProxy(
|
| }
|
|
|
| // static
|
| +MessagePipe* MessagePipe::CreateLocalProxyFromExisting(
|
| + MessageInTransitQueue* message_queue,
|
| + ChannelEndpoint* channel_endpoint) {
|
| + DCHECK(message_queue);
|
| + MessagePipe* message_pipe = new MessagePipe();
|
| + message_pipe->endpoints_[0].reset(
|
| + new LocalMessagePipeEndpoint(message_queue));
|
| + if (channel_endpoint) {
|
| + bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1);
|
| + message_pipe->endpoints_[1].reset(
|
| + new ProxyMessagePipeEndpoint(channel_endpoint));
|
| + if (!attached_to_channel)
|
| + message_pipe->OnDetachFromChannel(1);
|
| + } else {
|
| + // This means that the proxy side was already closed; we only need to inform
|
| + // the local side of this.
|
| + // TODO(vtl): This is safe to do without locking (but perhaps slightly
|
| + // dubious), since no other thread has access to |message_pipe| yet.
|
| + message_pipe->endpoints_[0]->OnPeerClose();
|
| + }
|
| + return message_pipe;
|
| +}
|
| +
|
| +// static
|
| MessagePipe* MessagePipe::CreateProxyLocal(
|
| scoped_refptr<ChannelEndpoint>* channel_endpoint) {
|
| DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
|
| @@ -77,22 +89,18 @@ bool MessagePipe::Deserialize(Channel* channel,
|
| unsigned* port) {
|
| DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
|
|
|
| - if (size != sizeof(SerializedMessagePipe)) {
|
| + if (size != channel->GetSerializedEndpointSize()) {
|
| LOG(ERROR) << "Invalid serialized message pipe";
|
| return false;
|
| }
|
|
|
| - const SerializedMessagePipe* s =
|
| - static_cast<const SerializedMessagePipe*>(source);
|
| - *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
|
| - if (!*message_pipe) {
|
| - LOG(ERROR) << "Failed to deserialize message pipe (ID = "
|
| - << s->receiver_endpoint_id << ")";
|
| + scoped_refptr<IncomingEndpoint> incoming_endpoint =
|
| + channel->DeserializeEndpoint(source);
|
| + if (!incoming_endpoint)
|
| return false;
|
| - }
|
|
|
| - DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = "
|
| - << s->receiver_endpoint_id << ")";
|
| + *message_pipe = incoming_endpoint->ConvertToMessagePipe();
|
| + DCHECK(*message_pipe);
|
| *port = 0;
|
| return true;
|
| }
|
| @@ -200,10 +208,10 @@ void MessagePipe::RemoveAwakable(unsigned port,
|
| }
|
|
|
| void MessagePipe::StartSerialize(unsigned /*port*/,
|
| - Channel* /*channel*/,
|
| + Channel* channel,
|
| size_t* max_size,
|
| size_t* max_platform_handles) {
|
| - *max_size = sizeof(SerializedMessagePipe);
|
| + *max_size = channel->GetSerializedEndpointSize();
|
| *max_platform_handles = 0;
|
| }
|
|
|
| @@ -249,25 +257,22 @@ bool MessagePipe::EndSerialize(
|
| //
|
| // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
|
|
|
| + // The replacement for |endpoints_[port]|, if any.
|
| + MessagePipeEndpoint* replacement_endpoint = nullptr;
|
| +
|
| unsigned peer_port = GetPeerPort(port);
|
| - if (!endpoints_[peer_port]) {
|
| - // Case 1.
|
| + if (!endpoints_[peer_port]) { // Case 1.
|
| channel_endpoint = new ChannelEndpoint(
|
| nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
|
| endpoints_[port].get())->message_queue());
|
| - endpoints_[port]->Close();
|
| - endpoints_[port].reset();
|
| } else if (endpoints_[peer_port]->GetType() ==
|
| - MessagePipeEndpoint::kTypeLocal) {
|
| - // Case 2.
|
| + MessagePipeEndpoint::kTypeLocal) { // Case 2.
|
| channel_endpoint = new ChannelEndpoint(
|
| this, port, static_cast<LocalMessagePipeEndpoint*>(
|
| endpoints_[port].get())->message_queue());
|
| - endpoints_[port]->Close();
|
| - endpoints_[port].reset(
|
| - new ProxyMessagePipeEndpoint(channel_endpoint.get()));
|
| - } else {
|
| - // Case 3.
|
| + replacement_endpoint =
|
| + new ProxyMessagePipeEndpoint(channel_endpoint.get());
|
| + } else { // Case 3.
|
| DLOG(WARNING) << "Direct message pipe passing across multiple channels "
|
| "not yet implemented; will proxy";
|
|
|
| @@ -302,22 +307,18 @@ bool MessagePipe::EndSerialize(
|
| relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get());
|
| peer_channel_endpoint->ReplaceClient(relayer.get(), 1);
|
|
|
| - endpoints_[port]->Close();
|
| - endpoints_[port].reset();
|
| // No need to call |Close()| after |ReleaseChannelEndpoint()|.
|
| endpoints_[peer_port].reset();
|
| }
|
| - }
|
|
|
| - SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination);
|
| + endpoints_[port]->Close();
|
| + endpoints_[port].reset(replacement_endpoint);
|
| + }
|
|
|
| - // Convert the local endpoint to a proxy endpoint (moving the message queue)
|
| - // and attach it to the channel.
|
| - s->receiver_endpoint_id =
|
| - channel->AttachAndRunEndpoint(channel_endpoint, false);
|
| - DVLOG(2) << "Serializing message pipe (remote ID = "
|
| - << s->receiver_endpoint_id << ")";
|
| - *actual_size = sizeof(SerializedMessagePipe);
|
| + // TODO(vtl): More/most of the above should be moved into (some variant of)
|
| + // |Channel::SerializeEndpoint()|.
|
| + channel->SerializeEndpoint(channel_endpoint, destination);
|
| + *actual_size = channel->GetSerializedEndpointSize();
|
| return true;
|
| }
|
|
|
|
|