Index: mojo/edk/system/message_pipe.cc |
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc |
index e2157302cbc8e46555bc4b1c3dfdf5c6b7f25154..731fd904e4fe861fc3c48ae1b552ab995b759f97 100644 |
--- a/mojo/edk/system/message_pipe.cc |
+++ b/mojo/edk/system/message_pipe.cc |
@@ -5,7 +5,9 @@ |
#include "mojo/edk/system/message_pipe.h" |
#include "base/logging.h" |
+#include "mojo/edk/system/channel.h" |
#include "mojo/edk/system/channel_endpoint.h" |
+#include "mojo/edk/system/channel_endpoint_id.h" |
#include "mojo/edk/system/local_message_pipe_endpoint.h" |
#include "mojo/edk/system/message_in_transit.h" |
#include "mojo/edk/system/message_pipe_dispatcher.h" |
@@ -15,6 +17,19 @@ |
namespace mojo { |
namespace system { |
+namespace { |
+ |
+// TODO(vtl): Move this into |Channel| (and possible further). |
+struct SerializedMessagePipe { |
+ // This is the endpoint ID on the receiving side, and should be a "remote ID". |
+ // (The receiving side should already have had an endpoint attached and been |
+ // run via the |Channel|s. This endpoint will have both IDs assigned, so this |
+ // ID is only needed to associate that endpoint with a particular dispatcher.) |
+ ChannelEndpointId receiver_endpoint_id; |
+}; |
+ |
+} // namespace |
+ |
// static |
MessagePipe* MessagePipe::CreateLocalLocal() { |
MessagePipe* message_pipe = new MessagePipe(); |
@@ -53,6 +68,34 @@ unsigned MessagePipe::GetPeerPort(unsigned port) { |
return port ^ 1; |
} |
+// static |
+bool MessagePipe::Deserialize(Channel* channel, |
+ const void* source, |
+ size_t size, |
+ scoped_refptr<MessagePipe>* message_pipe, |
+ unsigned* port) { |
+ DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely. |
+ |
+ if (size != sizeof(SerializedMessagePipe)) { |
+ LOG(ERROR) << "Invalid serialized message pipe"; |
+ return false; |
+ } |
+ |
+ const SerializedMessagePipe* s = |
+ static_cast<const SerializedMessagePipe*>(source); |
+ *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id); |
+ if (!message_pipe->get()) { |
+ LOG(ERROR) << "Failed to deserialize message pipe (ID = " |
+ << s->receiver_endpoint_id << ")"; |
+ return false; |
+ } |
+ |
+ DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = " |
+ << s->receiver_endpoint_id << ")"; |
+ *port = 0; |
+ return true; |
+} |
+ |
MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) { |
DCHECK(port == 0 || port == 1); |
base::AutoLock locker(lock_); |
@@ -100,9 +143,7 @@ MojoResult MessagePipe::WriteMessage( |
GetPeerPort(port), |
make_scoped_ptr(new MessageInTransit( |
MessageInTransit::kTypeMessagePipeEndpoint, |
- MessageInTransit::kSubtypeMessagePipeEndpointData, |
- num_bytes, |
- bytes)), |
+ MessageInTransit::kSubtypeMessagePipeEndpointData, num_bytes, bytes)), |
transports); |
} |
@@ -117,8 +158,8 @@ MojoResult MessagePipe::ReadMessage(unsigned port, |
base::AutoLock locker(lock_); |
DCHECK(endpoints_[port]); |
- return endpoints_[port]->ReadMessage( |
- bytes, num_bytes, dispatchers, num_dispatchers, flags); |
+ return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers, |
+ num_dispatchers, flags); |
} |
HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const { |
@@ -154,6 +195,32 @@ void MessagePipe::RemoveWaiter(unsigned port, |
endpoints_[port]->RemoveWaiter(waiter, signals_state); |
} |
+void MessagePipe::StartSerialize(unsigned /*port*/, |
+ Channel* /*channel*/, |
+ size_t* max_size, |
+ size_t* max_platform_handles) { |
+ *max_size = sizeof(SerializedMessagePipe); |
+ *max_platform_handles = 0; |
+} |
+ |
+bool MessagePipe::EndSerialize( |
+ unsigned port, |
+ Channel* channel, |
+ void* destination, |
+ size_t* actual_size, |
+ embedder::PlatformHandleVector* /*platform_handles*/) { |
+ SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination); |
+ |
+ // Convert the local endpoint to a proxy endpoint (moving the message queue) |
+ // and attach it to the channel. |
+ s->receiver_endpoint_id = |
+ channel->AttachAndRunEndpoint(ConvertLocalToProxy(port), false); |
+ DVLOG(2) << "Serializing message pipe (remote ID = " |
+ << s->receiver_endpoint_id << ")"; |
+ *actual_size = sizeof(SerializedMessagePipe); |
+ return true; |
+} |
+ |
scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { |
DCHECK(port == 0 || port == 1); |
@@ -165,10 +232,8 @@ scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { |
// send the already-queued messages. |
if (!endpoints_[GetPeerPort(port)]) { |
scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( |
- nullptr, |
- 0, |
- static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()) |
- ->message_queue())); |
+ nullptr, 0, static_cast<LocalMessagePipeEndpoint*>( |
+ endpoints_[port].get())->message_queue())); |
endpoints_[port]->Close(); |
endpoints_[port].reset(); |
return channel_endpoint; |
@@ -177,18 +242,15 @@ scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) { |
// TODO(vtl): Allowing this case is a temporary hack. It'll set up a |
// |MessagePipe| with two proxy endpoints, which will then act as a proxy |
// (rather than trying to connect the two ends directly). |
- DLOG_IF(WARNING, |
- endpoints_[GetPeerPort(port)]->GetType() != |
- MessagePipeEndpoint::kTypeLocal) |
+ DLOG_IF(WARNING, endpoints_[GetPeerPort(port)]->GetType() != |
+ MessagePipeEndpoint::kTypeLocal) |
<< "Direct message pipe passing across multiple channels not yet " |
"implemented; will proxy"; |
scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass()); |
scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint( |
- this, |
- port, |
- static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get()) |
- ->message_queue())); |
+ this, port, static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get()) |
+ ->message_queue())); |
endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get())); |
old_endpoint->Close(); |
@@ -280,7 +342,7 @@ MojoResult MessagePipe::AttachTransportsNoLock( |
(*transports)[i].CreateEquivalentDispatcherAndClose()); |
} else { |
LOG(WARNING) << "Enqueueing null dispatcher"; |
- dispatchers->push_back(scoped_refptr<Dispatcher>()); |
+ dispatchers->push_back(nullptr); |
} |
} |
message->SetDispatchers(dispatchers.Pass()); |