Index: mojo/edk/system/node_controller.cc |
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc |
index 6410caef5f71213d8757f8228fd884db6c30f0eb..38c53df3b4cfc6d9e98f039e20a36c30fe12a346 100644 |
--- a/mojo/edk/system/node_controller.cc |
+++ b/mojo/edk/system/node_controller.cc |
@@ -99,18 +99,6 @@ class ThreadDestructionObserver : |
} // namespace |
-NodeController::PendingPortRequest::PendingPortRequest() {} |
- |
-NodeController::PendingPortRequest::~PendingPortRequest() {} |
- |
-NodeController::ReservedPort::ReservedPort() {} |
- |
-NodeController::ReservedPort::~ReservedPort() {} |
- |
-NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {} |
- |
-NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {} |
- |
NodeController::~NodeController() {} |
NodeController::NodeController(Core* core) |
@@ -182,18 +170,25 @@ int NodeController::SendMessage(const ports::PortRef& port, |
} |
void NodeController::ReservePort(const std::string& token, |
- const ReservePortCallback& callback) { |
- ports::PortRef port; |
- node_->CreateUninitializedPort(&port); |
- |
+ const ports::PortRef& port) { |
DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " |
<< token; |
base::AutoLock lock(reserved_ports_lock_); |
- ReservedPort reservation; |
- reservation.local_port = port; |
- reservation.callback = callback; |
- reserved_ports_.insert(std::make_pair(token, reservation)); |
+ auto result = reserved_ports_.insert(std::make_pair(token, port)); |
+ DCHECK(result.second); |
+} |
+ |
+void NodeController::MergePortIntoParent(const std::string& token, |
+ const ports::PortRef& port) { |
+ scoped_refptr<NodeChannel> parent = GetParentChannel(); |
+ if (parent) { |
+ parent->RequestPortMerge(port.name(), token); |
+ return; |
+ } |
+ |
+ base::AutoLock lock(pending_port_merges_lock_); |
+ pending_port_merges_.push_back(std::make_pair(token, port)); |
} |
scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( |
@@ -210,42 +205,6 @@ scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( |
return buffer; |
} |
-void NodeController::ConnectToParentPort(const ports::PortRef& local_port, |
- const std::string& token, |
- const base::Closure& callback) { |
- io_task_runner_->PostTask( |
- FROM_HERE, |
- base::Bind(&NodeController::RequestParentPortConnectionOnIOThread, |
- base::Unretained(this), local_port, token, callback)); |
-} |
- |
-void NodeController::ConnectToRemotePort( |
- const ports::PortRef& local_port, |
- const ports::NodeName& remote_node_name, |
- const ports::PortName& remote_port_name, |
- const base::Closure& callback) { |
- if (remote_node_name == name_) { |
- // It's possible that two different code paths on the node are trying to |
- // bootstrap ports to each other (e.g. in Chrome single-process mode) |
- // without being aware of the fact. In this case we can initialize the port |
- // immediately (which can fail silently if it's already been initialized by |
- // the request on the other side), and invoke |callback|. |
- node_->InitializePort(local_port, name_, remote_port_name); |
- callback.Run(); |
- return; |
- } |
- |
- PendingRemotePortConnection connection; |
- connection.local_port = local_port; |
- connection.remote_node_name = remote_node_name; |
- connection.remote_port_name = remote_port_name; |
- connection.callback = callback; |
- io_task_runner_->PostTask( |
- FROM_HERE, |
- base::Bind(&NodeController::ConnectToRemotePortOnIOThread, |
- base::Unretained(this), connection)); |
-} |
- |
void NodeController::RequestShutdown(const base::Closure& callback) { |
{ |
base::AutoLock lock(shutdown_lock_); |
@@ -304,46 +263,6 @@ void NodeController::ConnectToParentOnIOThread( |
bootstrap_parent_channel_->Start(); |
} |
-void NodeController::RequestParentPortConnectionOnIOThread( |
- const ports::PortRef& local_port, |
- const std::string& token, |
- const base::Closure& callback) { |
- DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
- |
- scoped_refptr<NodeChannel> parent = GetParentChannel(); |
- if (!parent) { |
- PendingPortRequest request; |
- request.token = token; |
- request.local_port = local_port; |
- request.callback = callback; |
- pending_port_requests_.push_back(request); |
- return; |
- } |
- |
- pending_parent_port_connections_.insert( |
- std::make_pair(local_port.name(), callback)); |
- parent->RequestPortConnection(local_port.name(), token); |
-} |
- |
-void NodeController::ConnectToRemotePortOnIOThread( |
- const PendingRemotePortConnection& connection) { |
- scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name); |
- if (peer) { |
- // It's safe to initialize the port since we already have a channel to its |
- // peer. No need to actually send them a message. |
- int rv = node_->InitializePort(connection.local_port, |
- connection.remote_node_name, |
- connection.remote_port_name); |
- DCHECK_EQ(rv, ports::OK); |
- connection.callback.Run(); |
- return; |
- } |
- |
- // Save this for later. We'll initialize the port once this peer is added. |
- pending_remote_port_connections_[connection.remote_node_name].push_back( |
- connection); |
-} |
- |
scoped_refptr<NodeChannel> NodeController::GetPeerChannel( |
const ports::NodeName& name) { |
base::AutoLock lock(peers_lock_); |
@@ -414,19 +333,6 @@ void NodeController::AddPeer(const ports::NodeName& name, |
channel->PortsMessage(std::move(pending_messages.front())); |
pending_messages.pop(); |
} |
- |
- // Complete any pending port connections to this peer. |
- auto connections_it = pending_remote_port_connections_.find(name); |
- if (connections_it != pending_remote_port_connections_.end()) { |
- for (const auto& connection : connections_it->second) { |
- int rv = node_->InitializePort(connection.local_port, |
- connection.remote_node_name, |
- connection.remote_port_name); |
- DCHECK_EQ(rv, ports::OK); |
- connection.callback.Run(); |
- } |
- pending_remote_port_connections_.erase(connections_it); |
- } |
} |
void NodeController::DropPeer(const ports::NodeName& name) { |
@@ -616,6 +522,8 @@ void NodeController::OnAcceptChild(const ports::NodeName& from_node, |
// NOTE: The child does not actually add its parent as a peer until |
// receiving an AcceptBrokerClient message from the broker. The parent |
// will request that said message be sent upon receiving AcceptParent. |
+ |
+ DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; |
} |
void NodeController::OnAcceptParent(const ports::NodeName& from_node, |
@@ -763,15 +671,16 @@ void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node, |
io_task_runner_); |
AddPeer(broker_name, broker, true /* start_channel */); |
} |
+ |
AddPeer(parent_name, parent, false /* start_channel */); |
- // Resolve any pending port connections to the parent. |
- for (const auto& request : pending_port_requests_) { |
- pending_parent_port_connections_.insert( |
- std::make_pair(request.local_port.name(), request.callback)); |
- parent->RequestPortConnection(request.local_port.name(), request.token); |
+ { |
+ // Complete any port merge requests we have waiting for the parent. |
+ base::AutoLock lock(pending_port_merges_lock_); |
+ for (const auto& request : pending_port_merges_) |
+ parent->RequestPortMerge(request.second.name(), request.first); |
+ pending_port_merges_.clear(); |
} |
- pending_port_requests_.clear(); |
// Feed the broker any pending children of our own. |
while (!pending_broker_clients.empty()) { |
@@ -824,16 +733,15 @@ void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) { |
AttemptShutdownIfRequested(); |
} |
-void NodeController::OnRequestPortConnection( |
+void NodeController::OnRequestPortMerge( |
const ports::NodeName& from_node, |
const ports::PortName& connector_port_name, |
const std::string& token) { |
DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
- DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token " |
+ DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " |
<< token << " and port " << connector_port_name << "@" << from_node; |
- ReservePortCallback callback; |
ports::PortRef local_port; |
{ |
base::AutoLock lock(reserved_ports_lock_); |
@@ -843,64 +751,12 @@ void NodeController::OnRequestPortConnection( |
<< token; |
return; |
} |
- local_port = it->second.local_port; |
- callback = it->second.callback; |
- reserved_ports_.erase(it); |
- } |
- |
- DCHECK(!callback.is_null()); |
- |
- scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node); |
- if (!peer) { |
- DVLOG(1) << "Ignoring request to connect to port from unknown node " |
- << from_node; |
- return; |
+ local_port = it->second; |
} |
- // This reserved port should not have been initialized yet. |
- CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node, |
- connector_port_name)); |
- |
- peer->ConnectToPort(local_port.name(), connector_port_name); |
- callback.Run(local_port); |
-} |
- |
-void NodeController::OnConnectToPort( |
- const ports::NodeName& from_node, |
- const ports::PortName& connector_port_name, |
- const ports::PortName& connectee_port_name) { |
- DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
- |
- DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port " |
- << connectee_port_name << " to port " << connector_port_name << "@" |
- << from_node; |
- |
- ports::PortRef connectee_port; |
- int rv = node_->GetPort(connectee_port_name, &connectee_port); |
- if (rv != ports::OK) { |
- DLOG(ERROR) << "Ignoring ConnectToPort for unknown port " |
- << connectee_port_name; |
- return; |
- } |
- |
- // It's OK if this port has already been initialized. This message is only |
- // sent by the remote peer to ensure the port is ready before it starts |
- // us sending messages to it. |
- ports::PortStatus port_status; |
- rv = node_->GetStatus(connectee_port, &port_status); |
- if (rv == ports::OK) { |
- DVLOG(1) << "Ignoring ConnectToPort for already-initialized port " |
- << connectee_port_name; |
- return; |
- } |
- |
- CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node, |
- connector_port_name)); |
- |
- auto it = pending_parent_port_connections_.find(connectee_port_name); |
- DCHECK(it != pending_parent_port_connections_.end()); |
- it->second.Run(); |
- pending_parent_port_connections_.erase(it); |
+ int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
+ if (rv != ports::OK) |
+ DLOG(ERROR) << "MergePorts failed: " << rv; |
} |
void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |