Index: mojo/edk/system/node_controller.cc |
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc |
index 912ecee77468d2fa0bfb4c7d3526b96ee7b4e3fa..78c80d1331a0fd7141ee5336bb75d90df2430c94 100644 |
--- a/mojo/edk/system/node_controller.cc |
+++ b/mojo/edk/system/node_controller.cc |
@@ -254,6 +254,16 @@ void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { |
base::Passed(&platform_handle))); |
} |
+void NodeController::ConnectToPeer(ScopedPlatformHandle handle, |
+ const ports::PortRef& port) { |
+ ports::NodeName node_name; |
+ GenerateRandomName(&node_name); |
+ io_task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&NodeController::ConnectToPeerOnIOThread, |
+ base::Unretained(this), base::Passed(&handle), |
+ node_name, port)); |
+} |
+ |
void NodeController::SetPortObserver( |
const ports::PortRef& port, |
const scoped_refptr<PortObserver>& observer) { |
@@ -434,6 +444,21 @@ void NodeController::ConnectToParentOnIOThread( |
bootstrap_parent_channel_->Start(); |
} |
+void NodeController::ConnectToPeerOnIOThread(ScopedPlatformHandle handle, |
+ ports::NodeName token, |
+ ports::PortRef port) { |
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ scoped_refptr<NodeChannel> channel = |
+ NodeChannel::Create(this, std::move(handle), io_task_runner_, {}); |
+ pending_peers_.insert({token, {channel, port}}); |
+ |
+ channel->SetRemoteNodeName(token); |
+ channel->Start(); |
+ |
+ channel->AcceptPeer(name_, token, port.name()); |
+} |
+ |
scoped_refptr<NodeChannel> NodeController::GetPeerChannel( |
const ports::NodeName& name) { |
base::AutoLock lock(peers_lock_); |
@@ -721,6 +746,7 @@ void NodeController::DropAllPeers() { |
peers_.clear(); |
pending_children_.clear(); |
pending_peer_messages_.clear(); |
+ pending_peers_.clear(); |
} |
for (const auto& peer : all_peers) |
@@ -1229,6 +1255,36 @@ void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node, |
} |
#endif |
+void NodeController::OnAcceptPeer(const ports::NodeName& from_node, |
+ const ports::NodeName& token, |
+ const ports::NodeName& peer_name, |
+ const ports::PortName& port_name) { |
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
+ |
+ auto it = pending_peers_.find(from_node); |
+ if (it == pending_peers_.end()) { |
+ DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; |
+ DropPeer(from_node, nullptr); |
+ return; |
+ } |
+ |
+ scoped_refptr<NodeChannel> channel = it->second.first; |
+ ports::PortRef local_port = it->second.second; |
+ pending_peers_.erase(it); |
+ DCHECK(channel); |
+ |
+ DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name; |
+ |
+ AddPeer(peer_name, channel, false /* start_channel */); |
+ |
+ // We need to choose one side to initiate the port merge. It doesn't matter |
+ // who does it as long as they don't both try. Simple solution: pick the one |
+ // with the "smaller" port name. |
+ if (local_port.name() < port_name) { |
+ node()->MergePorts(local_port, peer_name, port_name); |
+ } |
+} |
+ |
void NodeController::OnChannelError(const ports::NodeName& from_node, |
NodeChannel* channel) { |
if (io_task_runner_->RunsTasksOnCurrentThread()) { |