Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(566)

Unified Diff: mojo/edk/system/message_pipe.cc

Issue 799113004: Update mojo sdk to rev 59145288bae55b0fce4276b017df6a1117bcf00f (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: add mojo's ply to checklicenses whitelist Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_test_utils.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/message_pipe.cc
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
index 3e243b648419e5c25a7e74ecf69398e4db5aac82..1b0427b6ec5b2454fd49464c9aa057ee97892202 100644
--- a/mojo/edk/system/message_pipe.cc
+++ b/mojo/edk/system/message_pipe.cc
@@ -9,6 +9,7 @@
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/channel_endpoint_id.h"
#include "mojo/edk/system/endpoint_relayer.h"
+#include "mojo/edk/system/incoming_endpoint.h"
#include "mojo/edk/system/local_message_pipe_endpoint.h"
#include "mojo/edk/system/message_in_transit.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"
@@ -18,19 +19,6 @@
namespace mojo {
namespace system {
-namespace {
-
-// TODO(vtl): Move this into |Channel| (and possible further).
-struct SerializedMessagePipe {
- // This is the endpoint ID on the receiving side, and should be a "remote ID".
- // (The receiving side should already have had an endpoint attached and been
- // run via the |Channel|s. This endpoint will have both IDs assigned, so this
- // ID is only needed to associate that endpoint with a particular dispatcher.)
- ChannelEndpointId receiver_endpoint_id;
-};
-
-} // namespace
-
// static
MessagePipe* MessagePipe::CreateLocalLocal() {
MessagePipe* message_pipe = new MessagePipe();
@@ -52,6 +40,30 @@ MessagePipe* MessagePipe::CreateLocalProxy(
}
// static
+MessagePipe* MessagePipe::CreateLocalProxyFromExisting(
+ MessageInTransitQueue* message_queue,
+ ChannelEndpoint* channel_endpoint) {
+ DCHECK(message_queue);
+ MessagePipe* message_pipe = new MessagePipe();
+ message_pipe->endpoints_[0].reset(
+ new LocalMessagePipeEndpoint(message_queue));
+ if (channel_endpoint) {
+ bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1);
+ message_pipe->endpoints_[1].reset(
+ new ProxyMessagePipeEndpoint(channel_endpoint));
+ if (!attached_to_channel)
+ message_pipe->OnDetachFromChannel(1);
+ } else {
+ // This means that the proxy side was already closed; we only need to inform
+ // the local side of this.
+ // TODO(vtl): This is safe to do without locking (but perhaps slightly
+ // dubious), since no other thread has access to |message_pipe| yet.
+ message_pipe->endpoints_[0]->OnPeerClose();
+ }
+ return message_pipe;
+}
+
+// static
MessagePipe* MessagePipe::CreateProxyLocal(
scoped_refptr<ChannelEndpoint>* channel_endpoint) {
DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
@@ -77,22 +89,18 @@ bool MessagePipe::Deserialize(Channel* channel,
unsigned* port) {
DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
- if (size != sizeof(SerializedMessagePipe)) {
+ if (size != channel->GetSerializedEndpointSize()) {
LOG(ERROR) << "Invalid serialized message pipe";
return false;
}
- const SerializedMessagePipe* s =
- static_cast<const SerializedMessagePipe*>(source);
- *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
- if (!*message_pipe) {
- LOG(ERROR) << "Failed to deserialize message pipe (ID = "
- << s->receiver_endpoint_id << ")";
+ scoped_refptr<IncomingEndpoint> incoming_endpoint =
+ channel->DeserializeEndpoint(source);
+ if (!incoming_endpoint)
return false;
- }
- DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = "
- << s->receiver_endpoint_id << ")";
+ *message_pipe = incoming_endpoint->ConvertToMessagePipe();
+ DCHECK(*message_pipe);
*port = 0;
return true;
}
@@ -200,10 +208,10 @@ void MessagePipe::RemoveAwakable(unsigned port,
}
void MessagePipe::StartSerialize(unsigned /*port*/,
- Channel* /*channel*/,
+ Channel* channel,
size_t* max_size,
size_t* max_platform_handles) {
- *max_size = sizeof(SerializedMessagePipe);
+ *max_size = channel->GetSerializedEndpointSize();
*max_platform_handles = 0;
}
@@ -249,25 +257,22 @@ bool MessagePipe::EndSerialize(
//
// TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
+ // The replacement for |endpoints_[port]|, if any.
+ MessagePipeEndpoint* replacement_endpoint = nullptr;
+
unsigned peer_port = GetPeerPort(port);
- if (!endpoints_[peer_port]) {
- // Case 1.
+ if (!endpoints_[peer_port]) { // Case 1.
channel_endpoint = new ChannelEndpoint(
nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
endpoints_[port].get())->message_queue());
- endpoints_[port]->Close();
- endpoints_[port].reset();
} else if (endpoints_[peer_port]->GetType() ==
- MessagePipeEndpoint::kTypeLocal) {
- // Case 2.
+ MessagePipeEndpoint::kTypeLocal) { // Case 2.
channel_endpoint = new ChannelEndpoint(
this, port, static_cast<LocalMessagePipeEndpoint*>(
endpoints_[port].get())->message_queue());
- endpoints_[port]->Close();
- endpoints_[port].reset(
- new ProxyMessagePipeEndpoint(channel_endpoint.get()));
- } else {
- // Case 3.
+ replacement_endpoint =
+ new ProxyMessagePipeEndpoint(channel_endpoint.get());
+ } else { // Case 3.
DLOG(WARNING) << "Direct message pipe passing across multiple channels "
"not yet implemented; will proxy";
@@ -302,22 +307,18 @@ bool MessagePipe::EndSerialize(
relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get());
peer_channel_endpoint->ReplaceClient(relayer.get(), 1);
- endpoints_[port]->Close();
- endpoints_[port].reset();
// No need to call |Close()| after |ReleaseChannelEndpoint()|.
endpoints_[peer_port].reset();
}
- }
- SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination);
+ endpoints_[port]->Close();
+ endpoints_[port].reset(replacement_endpoint);
+ }
- // Convert the local endpoint to a proxy endpoint (moving the message queue)
- // and attach it to the channel.
- s->receiver_endpoint_id =
- channel->AttachAndRunEndpoint(channel_endpoint, false);
- DVLOG(2) << "Serializing message pipe (remote ID = "
- << s->receiver_endpoint_id << ")";
- *actual_size = sizeof(SerializedMessagePipe);
+ // TODO(vtl): More/most of the above should be moved into (some variant of)
+ // |Channel::SerializeEndpoint()|.
+ channel->SerializeEndpoint(channel_endpoint, destination);
+ *actual_size = channel->GetSerializedEndpointSize();
return true;
}
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_test_utils.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698