| Index: mojo/edk/system/message_pipe.cc
|
| diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
|
| index e2157302cbc8e46555bc4b1c3dfdf5c6b7f25154..731fd904e4fe861fc3c48ae1b552ab995b759f97 100644
|
| --- a/mojo/edk/system/message_pipe.cc
|
| +++ b/mojo/edk/system/message_pipe.cc
|
| @@ -5,7 +5,9 @@
|
| #include "mojo/edk/system/message_pipe.h"
|
|
|
| #include "base/logging.h"
|
| +#include "mojo/edk/system/channel.h"
|
| #include "mojo/edk/system/channel_endpoint.h"
|
| +#include "mojo/edk/system/channel_endpoint_id.h"
|
| #include "mojo/edk/system/local_message_pipe_endpoint.h"
|
| #include "mojo/edk/system/message_in_transit.h"
|
| #include "mojo/edk/system/message_pipe_dispatcher.h"
|
| @@ -15,6 +17,19 @@
|
| namespace mojo {
|
| namespace system {
|
|
|
| +namespace {
|
| +
|
| +// TODO(vtl): Move this into |Channel| (and possible further).
|
| +struct SerializedMessagePipe {
|
| + // This is the endpoint ID on the receiving side, and should be a "remote ID".
|
| + // (The receiving side should already have had an endpoint attached and been
|
| + // run via the |Channel|s. This endpoint will have both IDs assigned, so this
|
| + // ID is only needed to associate that endpoint with a particular dispatcher.)
|
| + ChannelEndpointId receiver_endpoint_id;
|
| +};
|
| +
|
| +} // namespace
|
| +
|
| // static
|
| MessagePipe* MessagePipe::CreateLocalLocal() {
|
| MessagePipe* message_pipe = new MessagePipe();
|
| @@ -53,6 +68,34 @@ unsigned MessagePipe::GetPeerPort(unsigned port) {
|
| return port ^ 1;
|
| }
|
|
|
| +// static
|
| +bool MessagePipe::Deserialize(Channel* channel,
|
| + const void* source,
|
| + size_t size,
|
| + scoped_refptr<MessagePipe>* message_pipe,
|
| + unsigned* port) {
|
| + DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely.
|
| +
|
| + if (size != sizeof(SerializedMessagePipe)) {
|
| + LOG(ERROR) << "Invalid serialized message pipe";
|
| + return false;
|
| + }
|
| +
|
| + const SerializedMessagePipe* s =
|
| + static_cast<const SerializedMessagePipe*>(source);
|
| + *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
|
| + if (!message_pipe->get()) {
|
| + LOG(ERROR) << "Failed to deserialize message pipe (ID = "
|
| + << s->receiver_endpoint_id << ")";
|
| + return false;
|
| + }
|
| +
|
| + DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = "
|
| + << s->receiver_endpoint_id << ")";
|
| + *port = 0;
|
| + return true;
|
| +}
|
| +
|
| MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
|
| DCHECK(port == 0 || port == 1);
|
| base::AutoLock locker(lock_);
|
| @@ -100,9 +143,7 @@ MojoResult MessagePipe::WriteMessage(
|
| GetPeerPort(port),
|
| make_scoped_ptr(new MessageInTransit(
|
| MessageInTransit::kTypeMessagePipeEndpoint,
|
| - MessageInTransit::kSubtypeMessagePipeEndpointData,
|
| - num_bytes,
|
| - bytes)),
|
| + MessageInTransit::kSubtypeMessagePipeEndpointData, num_bytes, bytes)),
|
| transports);
|
| }
|
|
|
| @@ -117,8 +158,8 @@ MojoResult MessagePipe::ReadMessage(unsigned port,
|
| base::AutoLock locker(lock_);
|
| DCHECK(endpoints_[port]);
|
|
|
| - return endpoints_[port]->ReadMessage(
|
| - bytes, num_bytes, dispatchers, num_dispatchers, flags);
|
| + return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers,
|
| + num_dispatchers, flags);
|
| }
|
|
|
| HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
|
| @@ -154,6 +195,32 @@ void MessagePipe::RemoveWaiter(unsigned port,
|
| endpoints_[port]->RemoveWaiter(waiter, signals_state);
|
| }
|
|
|
| +void MessagePipe::StartSerialize(unsigned /*port*/,
|
| + Channel* /*channel*/,
|
| + size_t* max_size,
|
| + size_t* max_platform_handles) {
|
| + *max_size = sizeof(SerializedMessagePipe);
|
| + *max_platform_handles = 0;
|
| +}
|
| +
|
| +bool MessagePipe::EndSerialize(
|
| + unsigned port,
|
| + Channel* channel,
|
| + void* destination,
|
| + size_t* actual_size,
|
| + embedder::PlatformHandleVector* /*platform_handles*/) {
|
| + SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination);
|
| +
|
| + // Convert the local endpoint to a proxy endpoint (moving the message queue)
|
| + // and attach it to the channel.
|
| + s->receiver_endpoint_id =
|
| + channel->AttachAndRunEndpoint(ConvertLocalToProxy(port), false);
|
| + DVLOG(2) << "Serializing message pipe (remote ID = "
|
| + << s->receiver_endpoint_id << ")";
|
| + *actual_size = sizeof(SerializedMessagePipe);
|
| + return true;
|
| +}
|
| +
|
| scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| @@ -165,10 +232,8 @@ scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
|
| // send the already-queued messages.
|
| if (!endpoints_[GetPeerPort(port)]) {
|
| scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint(
|
| - nullptr,
|
| - 0,
|
| - static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get())
|
| - ->message_queue()));
|
| + nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
|
| + endpoints_[port].get())->message_queue()));
|
| endpoints_[port]->Close();
|
| endpoints_[port].reset();
|
| return channel_endpoint;
|
| @@ -177,18 +242,15 @@ scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
|
| // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
|
| // |MessagePipe| with two proxy endpoints, which will then act as a proxy
|
| // (rather than trying to connect the two ends directly).
|
| - DLOG_IF(WARNING,
|
| - endpoints_[GetPeerPort(port)]->GetType() !=
|
| - MessagePipeEndpoint::kTypeLocal)
|
| + DLOG_IF(WARNING, endpoints_[GetPeerPort(port)]->GetType() !=
|
| + MessagePipeEndpoint::kTypeLocal)
|
| << "Direct message pipe passing across multiple channels not yet "
|
| "implemented; will proxy";
|
|
|
| scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
|
| scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint(
|
| - this,
|
| - port,
|
| - static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get())
|
| - ->message_queue()));
|
| + this, port, static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get())
|
| + ->message_queue()));
|
| endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
|
| old_endpoint->Close();
|
|
|
| @@ -280,7 +342,7 @@ MojoResult MessagePipe::AttachTransportsNoLock(
|
| (*transports)[i].CreateEquivalentDispatcherAndClose());
|
| } else {
|
| LOG(WARNING) << "Enqueueing null dispatcher";
|
| - dispatchers->push_back(scoped_refptr<Dispatcher>());
|
| + dispatchers->push_back(nullptr);
|
| }
|
| }
|
| message->SetDispatchers(dispatchers.Pass());
|
|
|