Index: mojo/edk/system/channel.cc |
diff --git a/mojo/edk/system/channel.cc b/mojo/edk/system/channel.cc |
index 1e26145a6e0c38f0c102071fc638f77162400e63..a7162ddbdb0f77dcc15182ad554fdc24dbac5f5f 100644 |
--- a/mojo/edk/system/channel.cc |
+++ b/mojo/edk/system/channel.cc |
@@ -16,6 +16,18 @@ |
namespace mojo { |
namespace system { |
+namespace { |
+ |
+struct SerializedEndpoint { |
+ // This is the endpoint ID on the receiving side, and should be a "remote ID". |
+ // (The receiving side should already have had an endpoint attached and been |
+ // run via the |Channel|s. This endpoint will have both IDs assigned, so this |
+ // ID is only needed to associate that endpoint with a particular dispatcher.) |
+ ChannelEndpointId receiver_endpoint_id; |
+}; |
+ |
+} // namespace |
+ |
Channel::Channel(embedder::PlatformSupport* platform_support) |
: platform_support_(platform_support), |
is_running_(false), |
@@ -90,57 +102,25 @@ void Channel::WillShutdownSoon() { |
channel_manager_ = nullptr; |
} |
-// Note: |endpoint| being a |scoped_refptr| makes this function safe, since it |
-// keeps the endpoint alive even after the lock is released. Otherwise, there's |
-// the temptation to simply pass the result of |new ChannelEndpoint(...)| |
-// directly to this function, which wouldn't be sufficient for safety. |
-ChannelEndpointId Channel::AttachAndRunEndpoint( |
- scoped_refptr<ChannelEndpoint> endpoint, |
- bool is_bootstrap) { |
+void Channel::SetBootstrapEndpoint(scoped_refptr<ChannelEndpoint> endpoint) { |
DCHECK(endpoint); |
- ChannelEndpointId local_id; |
- ChannelEndpointId remote_id; |
+ // Used for both local and remote IDs. |
+ ChannelEndpointId bootstrap_id = ChannelEndpointId::GetBootstrap(); |
+ |
{ |
base::AutoLock locker(lock_); |
DLOG_IF(WARNING, is_shutting_down_) |
- << "AttachEndpoint() while shutting down"; |
+ << "SetBootstrapEndpoint() while shutting down"; |
- if (is_bootstrap) { |
- local_id = ChannelEndpointId::GetBootstrap(); |
- DCHECK(local_id_to_endpoint_map_.find(local_id) == |
- local_id_to_endpoint_map_.end()); |
- |
- remote_id = ChannelEndpointId::GetBootstrap(); |
- } else { |
- do { |
- local_id = local_id_generator_.GetNext(); |
- } while (local_id_to_endpoint_map_.find(local_id) != |
- local_id_to_endpoint_map_.end()); |
+ // Bootstrap endpoint should be the first. |
+ DCHECK(local_id_to_endpoint_map_.empty()); |
- // TODO(vtl): We also need to check for collisions of remote IDs here. |
- remote_id = remote_id_generator_.GetNext(); |
- } |
- |
- local_id_to_endpoint_map_[local_id] = endpoint; |
+ local_id_to_endpoint_map_[bootstrap_id] = endpoint; |
} |
- if (!is_bootstrap) { |
- if (!SendControlMessage( |
- MessageInTransit::kSubtypeChannelAttachAndRunEndpoint, local_id, |
- remote_id)) { |
- HandleLocalError(base::StringPrintf( |
- "Failed to send message to run remote message pipe endpoint (local " |
- "ID %u, remote ID %u)", |
- static_cast<unsigned>(local_id.value()), |
- static_cast<unsigned>(remote_id.value()))); |
- // TODO(vtl): Should we continue on to |AttachAndRun()|? |
- } |
- } |
- |
- endpoint->AttachAndRun(this, local_id, remote_id); |
- return remote_id; |
+ endpoint->AttachAndRun(this, bootstrap_id, bootstrap_id); |
} |
bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) { |
@@ -191,33 +171,50 @@ void Channel::DetachEndpoint(ChannelEndpoint* endpoint, |
// Send a remove message outside the lock. |
} |
- if (!SendControlMessage( |
- MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint, local_id, |
- remote_id)) { |
+ if (!SendControlMessage(MessageInTransit::kSubtypeChannelRemoveEndpoint, |
+ local_id, remote_id)) { |
HandleLocalError(base::StringPrintf( |
- "Failed to send message to remove remote message pipe endpoint (local " |
- "ID %u, remote ID %u)", |
+ "Failed to send message to remove remote endpoint (local ID %u, remote " |
+ "ID %u)", |
static_cast<unsigned>(local_id.value()), |
static_cast<unsigned>(remote_id.value()))); |
} |
} |
-scoped_refptr<MessagePipe> Channel::PassIncomingMessagePipe( |
- ChannelEndpointId local_id) { |
+size_t Channel::GetSerializedEndpointSize() const { |
+ return sizeof(SerializedEndpoint); |
+} |
+ |
+void Channel::SerializeEndpoint(scoped_refptr<ChannelEndpoint> endpoint, |
+ void* destination) { |
+ SerializedEndpoint* s = static_cast<SerializedEndpoint*>(destination); |
+ s->receiver_endpoint_id = AttachAndRunEndpoint(endpoint); |
+ DVLOG(2) << "Serializing endpoint (remote ID = " << s->receiver_endpoint_id |
+ << ")"; |
+} |
+ |
+scoped_refptr<IncomingEndpoint> Channel::DeserializeEndpoint( |
+ const void* source) { |
+ const SerializedEndpoint* s = static_cast<const SerializedEndpoint*>(source); |
+ ChannelEndpointId local_id = s->receiver_endpoint_id; |
// No need to check the validity of |local_id| -- if it's not valid, it simply |
- // won't be in |incoming_message_pipes_|. |
+ // won't be in |incoming_endpoints_|. |
DVLOG_IF(2, !local_id.is_valid() || !local_id.is_remote()) |
- << "Attempt to get invalid incoming message pipe for ID " << local_id; |
+ << "Attempt to get incoming endpoint for invalid ID " << local_id; |
base::AutoLock locker(lock_); |
- auto it = incoming_message_pipes_.find(local_id); |
- if (it == incoming_message_pipes_.end()) |
+ auto it = incoming_endpoints_.find(local_id); |
+ if (it == incoming_endpoints_.end()) { |
+ LOG(ERROR) << "Failed to deserialize endpoint (ID = " << local_id << ")"; |
return nullptr; |
+ } |
+ |
+ DVLOG(2) << "Deserializing endpoint (new local ID = " << local_id << ")"; |
- scoped_refptr<MessagePipe> rv; |
+ scoped_refptr<IncomingEndpoint> rv; |
rv.swap(it->second); |
- incoming_message_pipes_.erase(it); |
+ incoming_endpoints_.erase(it); |
return rv; |
} |
@@ -356,32 +353,32 @@ void Channel::OnReadMessageForChannel( |
switch (message_view.subtype()) { |
case MessageInTransit::kSubtypeChannelAttachAndRunEndpoint: |
- DVLOG(2) << "Handling channel message to attach and run message pipe " |
- "(local ID " << message_view.destination_id() |
- << ", remote ID " << message_view.source_id() << ")"; |
+ DVLOG(2) << "Handling channel message to attach and run endpoint (local " |
+ "ID " << message_view.destination_id() << ", remote ID " |
+ << message_view.source_id() << ")"; |
if (!OnAttachAndRunEndpoint(message_view.destination_id(), |
message_view.source_id())) { |
HandleRemoteError( |
- "Received invalid channel message to attach and run message pipe"); |
+ "Received invalid channel message to attach and run endpoint"); |
} |
break; |
- case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint: |
- DVLOG(2) << "Handling channel message to remove message pipe (local ID " |
+ case MessageInTransit::kSubtypeChannelRemoveEndpoint: |
+ DVLOG(2) << "Handling channel message to remove endpoint (local ID " |
<< message_view.destination_id() << ", remote ID " |
<< message_view.source_id() << ")"; |
- if (!OnRemoveMessagePipeEndpoint(message_view.destination_id(), |
- message_view.source_id())) { |
+ if (!OnRemoveEndpoint(message_view.destination_id(), |
+ message_view.source_id())) { |
HandleRemoteError( |
- "Received invalid channel message to remove message pipe"); |
+ "Received invalid channel message to remove endpoint"); |
} |
break; |
- case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck: |
- DVLOG(2) << "Handling channel message to ack remove message pipe (local " |
- "ID " << message_view.destination_id() << ", remote ID " |
+ case MessageInTransit::kSubtypeChannelRemoveEndpointAck: |
+ DVLOG(2) << "Handling channel message to ack remove endpoint (local ID " |
+ << message_view.destination_id() << ", remote ID " |
<< message_view.source_id() << ")"; |
- if (!OnRemoveMessagePipeEndpointAck(message_view.destination_id())) { |
+ if (!OnRemoveEndpointAck(message_view.destination_id())) { |
HandleRemoteError( |
- "Received invalid channel message to ack remove message pipe"); |
+ "Received invalid channel message to ack remove endpoint"); |
} |
break; |
default: |
@@ -406,10 +403,10 @@ bool Channel::OnAttachAndRunEndpoint(ChannelEndpointId local_id, |
return false; |
} |
- // Create a message pipe and thus an endpoint (outside the lock). |
- scoped_refptr<ChannelEndpoint> endpoint; |
- scoped_refptr<MessagePipe> message_pipe( |
- MessagePipe::CreateLocalProxy(&endpoint)); |
+ // Create/initialize an |IncomingEndpoint| and thus an endpoint (outside the |
+ // lock). |
+ scoped_refptr<IncomingEndpoint> incoming_endpoint(new IncomingEndpoint()); |
+ scoped_refptr<ChannelEndpoint> endpoint = incoming_endpoint->Init(); |
bool success = true; |
{ |
@@ -417,21 +414,20 @@ bool Channel::OnAttachAndRunEndpoint(ChannelEndpointId local_id, |
if (local_id_to_endpoint_map_.find(local_id) == |
local_id_to_endpoint_map_.end()) { |
- DCHECK(incoming_message_pipes_.find(local_id) == |
- incoming_message_pipes_.end()); |
+ DCHECK(incoming_endpoints_.find(local_id) == incoming_endpoints_.end()); |
// TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll |
// avoid some refcount churn.) |
local_id_to_endpoint_map_[local_id] = endpoint; |
- incoming_message_pipes_[local_id] = message_pipe; |
+ incoming_endpoints_[local_id] = incoming_endpoint; |
} else { |
- // We need to call |Close()| on the message pipe outside the lock. |
+ // We need to call |Close()| outside the lock. |
success = false; |
} |
} |
if (!success) { |
DVLOG(2) << "Received attach and run endpoint for existing local ID"; |
- message_pipe->Close(0); |
+ incoming_endpoint->Close(); |
return false; |
} |
@@ -439,8 +435,8 @@ bool Channel::OnAttachAndRunEndpoint(ChannelEndpointId local_id, |
return true; |
} |
-bool Channel::OnRemoveMessagePipeEndpoint(ChannelEndpointId local_id, |
- ChannelEndpointId remote_id) { |
+bool Channel::OnRemoveEndpoint(ChannelEndpointId local_id, |
+ ChannelEndpointId remote_id) { |
DCHECK(creation_thread_checker_.CalledOnValidThread()); |
scoped_refptr<ChannelEndpoint> endpoint; |
@@ -449,7 +445,7 @@ bool Channel::OnRemoveMessagePipeEndpoint(ChannelEndpointId local_id, |
IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id); |
if (it == local_id_to_endpoint_map_.end()) { |
- DVLOG(2) << "Remove message pipe endpoint error: not found"; |
+ DVLOG(2) << "Remove endpoint error: not found"; |
return false; |
} |
@@ -465,12 +461,11 @@ bool Channel::OnRemoveMessagePipeEndpoint(ChannelEndpointId local_id, |
endpoint->DetachFromChannel(); |
- if (!SendControlMessage( |
- MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck, |
- local_id, remote_id)) { |
+ if (!SendControlMessage(MessageInTransit::kSubtypeChannelRemoveEndpointAck, |
+ local_id, remote_id)) { |
HandleLocalError(base::StringPrintf( |
- "Failed to send message to remove remote message pipe endpoint ack " |
- "(local ID %u, remote ID %u)", |
+ "Failed to send message to ack remove remote endpoint (local ID %u, " |
+ "remote ID %u)", |
static_cast<unsigned>(local_id.value()), |
static_cast<unsigned>(remote_id.value()))); |
} |
@@ -478,19 +473,19 @@ bool Channel::OnRemoveMessagePipeEndpoint(ChannelEndpointId local_id, |
return true; |
} |
-bool Channel::OnRemoveMessagePipeEndpointAck(ChannelEndpointId local_id) { |
+bool Channel::OnRemoveEndpointAck(ChannelEndpointId local_id) { |
DCHECK(creation_thread_checker_.CalledOnValidThread()); |
base::AutoLock locker(lock_); |
IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id); |
if (it == local_id_to_endpoint_map_.end()) { |
- DVLOG(2) << "Remove message pipe endpoint ack error: not found"; |
+ DVLOG(2) << "Remove endpoint ack error: not found"; |
return false; |
} |
if (it->second) { |
- DVLOG(2) << "Remove message pipe endpoint ack error: wrong state"; |
+ DVLOG(2) << "Remove endpoint ack error: wrong state"; |
return false; |
} |
@@ -498,18 +493,6 @@ bool Channel::OnRemoveMessagePipeEndpointAck(ChannelEndpointId local_id) { |
return true; |
} |
-bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, |
- ChannelEndpointId local_id, |
- ChannelEndpointId remote_id) { |
- DVLOG(2) << "Sending channel control message: subtype " << subtype |
- << ", local ID " << local_id << ", remote ID " << remote_id; |
- scoped_ptr<MessageInTransit> message(new MessageInTransit( |
- MessageInTransit::kTypeChannel, subtype, 0, nullptr)); |
- message->set_source_id(local_id); |
- message->set_destination_id(remote_id); |
- return WriteMessage(message.Pass()); |
-} |
- |
void Channel::HandleRemoteError(const base::StringPiece& error_message) { |
// TODO(vtl): Is this how we really want to handle this? Probably we want to |
// terminate the connection, since it's spewing invalid stuff. |
@@ -525,5 +508,58 @@ void Channel::HandleLocalError(const base::StringPiece& error_message) { |
LOG(WARNING) << error_message; |
} |
+// Note: |endpoint| being a |scoped_refptr| makes this function safe, since it |
+// keeps the endpoint alive even after the lock is released. Otherwise, there's |
+// the temptation to simply pass the result of |new ChannelEndpoint(...)| |
+// directly to this function, which wouldn't be sufficient for safety. |
+ChannelEndpointId Channel::AttachAndRunEndpoint( |
+ scoped_refptr<ChannelEndpoint> endpoint) { |
+ DCHECK(endpoint); |
+ |
+ ChannelEndpointId local_id; |
+ ChannelEndpointId remote_id; |
+ { |
+ base::AutoLock locker(lock_); |
+ |
+ DLOG_IF(WARNING, is_shutting_down_) |
+ << "AttachAndRunEndpoint() while shutting down"; |
+ |
+ do { |
+ local_id = local_id_generator_.GetNext(); |
+ } while (local_id_to_endpoint_map_.find(local_id) != |
+ local_id_to_endpoint_map_.end()); |
+ |
+ // TODO(vtl): We also need to check for collisions of remote IDs here. |
+ remote_id = remote_id_generator_.GetNext(); |
+ |
+ local_id_to_endpoint_map_[local_id] = endpoint; |
+ } |
+ |
+ if (!SendControlMessage(MessageInTransit::kSubtypeChannelAttachAndRunEndpoint, |
+ local_id, remote_id)) { |
+ HandleLocalError(base::StringPrintf( |
+ "Failed to send message to run remote endpoint (local ID %u, remote ID " |
+ "%u)", |
+ static_cast<unsigned>(local_id.value()), |
+ static_cast<unsigned>(remote_id.value()))); |
+ // TODO(vtl): Should we continue on to |AttachAndRun()|? |
+ } |
+ |
+ endpoint->AttachAndRun(this, local_id, remote_id); |
+ return remote_id; |
+} |
+ |
+bool Channel::SendControlMessage(MessageInTransit::Subtype subtype, |
+ ChannelEndpointId local_id, |
+ ChannelEndpointId remote_id) { |
+ DVLOG(2) << "Sending channel control message: subtype " << subtype |
+ << ", local ID " << local_id << ", remote ID " << remote_id; |
+ scoped_ptr<MessageInTransit> message(new MessageInTransit( |
+ MessageInTransit::kTypeChannel, subtype, 0, nullptr)); |
+ message->set_source_id(local_id); |
+ message->set_destination_id(remote_id); |
+ return WriteMessage(message.Pass()); |
+} |
+ |
} // namespace system |
} // namespace mojo |