| Index: mojo/edk/system/node_controller.cc
|
| diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
|
| index a3cf0fa26bd20d9eed867713dc8167a782fc07bc..8dd1824c731fdad4fdd975f1a9adb745202f1a28 100644
|
| --- a/mojo/edk/system/node_controller.cc
|
| +++ b/mojo/edk/system/node_controller.cc
|
| @@ -227,6 +227,12 @@ void NodeController::CloseChildPorts(const std::string& child_token) {
|
| AcceptIncomingMessages();
|
| }
|
|
|
| +void NodeController::ClosePeerConnection(const std::string& peer_token) {
|
| + io_task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(&NodeController::ClosePeerConnectionOnIOThread,
|
| + base::Unretained(this), peer_token));
|
| +}
|
| +
|
| void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
|
| #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
|
| // Use the bootstrap channel for the broker and receive the node's channel
|
| @@ -255,13 +261,14 @@ void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
|
| }
|
|
|
| void NodeController::ConnectToPeer(ScopedPlatformHandle handle,
|
| - const ports::PortRef& port) {
|
| + const ports::PortRef& port,
|
| + const std::string& peer_token) {
|
| ports::NodeName node_name;
|
| GenerateRandomName(&node_name);
|
| io_task_runner_->PostTask(
|
| FROM_HERE, base::Bind(&NodeController::ConnectToPeerOnIOThread,
|
| base::Unretained(this), base::Passed(&handle),
|
| - node_name, port));
|
| + node_name, port, peer_token));
|
| }
|
|
|
| void NodeController::SetPortObserver(
|
| @@ -447,12 +454,15 @@ void NodeController::ConnectToParentOnIOThread(
|
|
|
| void NodeController::ConnectToPeerOnIOThread(ScopedPlatformHandle handle,
|
| ports::NodeName token,
|
| - ports::PortRef port) {
|
| + ports::PortRef port,
|
| + const std::string& peer_token) {
|
| DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
|
|
| scoped_refptr<NodeChannel> channel =
|
| NodeChannel::Create(this, std::move(handle), io_task_runner_, {});
|
| - pending_peers_.insert({token, {channel, port}});
|
| + peer_connections_.insert(
|
| + {token, PeerConnection{channel, port, peer_token}});
|
| + peers_by_token_.insert({peer_token, token});
|
|
|
| channel->SetRemoteNodeName(token);
|
| channel->Start();
|
| @@ -460,6 +470,19 @@ void NodeController::ConnectToPeerOnIOThread(ScopedPlatformHandle handle,
|
| channel->AcceptPeer(name_, token, port.name());
|
| }
|
|
|
| +void NodeController::ClosePeerConnectionOnIOThread(
|
| + const std::string& peer_token) {
|
| + RequestContext request_context(RequestContext::Source::SYSTEM);
|
| + auto peer = peers_by_token_.find(peer_token);
|
| + // The connection may already be closed.
|
| + if (peer == peers_by_token_.end())
|
| + return;
|
| +
|
| + // |peer| may be removed so make a copy of |name|.
|
| + ports::NodeName name = peer->second;
|
| + DropPeer(name, nullptr);
|
| +}
|
| +
|
| scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
|
| const ports::NodeName& name) {
|
| base::AutoLock lock(peers_lock_);
|
| @@ -592,6 +615,13 @@ void NodeController::DropPeer(const ports::NodeName& name,
|
| if (is_parent)
|
| CancelPendingPortMerges();
|
|
|
| + auto peer = peer_connections_.find(name);
|
| + if (peer != peer_connections_.end()) {
|
| + peers_by_token_.erase(peer->second.peer_token);
|
| + ports_to_close.push_back(peer->second.local_port);
|
| + peer_connections_.erase(peer);
|
| + }
|
| +
|
| for (const auto& port : ports_to_close)
|
| node_->ClosePort(port);
|
|
|
| @@ -742,7 +772,7 @@ void NodeController::DropAllPeers() {
|
| peers_.clear();
|
| pending_children_.clear();
|
| pending_peer_messages_.clear();
|
| - pending_peers_.clear();
|
| + peer_connections_.clear();
|
| }
|
|
|
| for (const auto& peer : all_peers)
|
| @@ -1257,21 +1287,27 @@ void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
|
| const ports::PortName& port_name) {
|
| DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
|
|
| - auto it = pending_peers_.find(from_node);
|
| - if (it == pending_peers_.end()) {
|
| + auto it = peer_connections_.find(from_node);
|
| + if (it == peer_connections_.end()) {
|
| DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
|
| DropPeer(from_node, nullptr);
|
| return;
|
| }
|
|
|
| - scoped_refptr<NodeChannel> channel = it->second.first;
|
| - ports::PortRef local_port = it->second.second;
|
| - pending_peers_.erase(it);
|
| + scoped_refptr<NodeChannel> channel = std::move(it->second.channel);
|
| + ports::PortRef local_port = it->second.local_port;
|
| + std::string peer_token = std::move(it->second.peer_token);
|
| + peer_connections_.erase(it);
|
| DCHECK(channel);
|
|
|
| // If the peer connection is a self connection (which is used in tests),
|
| // drop the channel to it and skip straight to merging the ports.
|
| - if (name_ != peer_name) {
|
| + if (name_ == peer_name) {
|
| + peers_by_token_.erase(peer_token);
|
| + } else {
|
| + peers_by_token_[peer_token] = peer_name;
|
| + peer_connections_.insert(
|
| + {peer_name, PeerConnection{nullptr, local_port, peer_token}});
|
| DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name;
|
|
|
| AddPeer(peer_name, channel, false /* start_channel */);
|
| @@ -1358,5 +1394,27 @@ void NodeController::AttemptShutdownIfRequested() {
|
| callback.Run();
|
| }
|
|
|
| +NodeController::PeerConnection::PeerConnection() = default;
|
| +
|
| +NodeController::PeerConnection::PeerConnection(
|
| + const PeerConnection& other) = default;
|
| +
|
| +NodeController::PeerConnection::PeerConnection(
|
| + PeerConnection&& other) = default;
|
| +
|
| +NodeController::PeerConnection::PeerConnection(
|
| + const scoped_refptr<NodeChannel>& channel,
|
| + const ports::PortRef& local_port,
|
| + const std::string& peer_token)
|
| + : channel(channel), local_port(local_port), peer_token(peer_token) {}
|
| +
|
| +NodeController::PeerConnection::~PeerConnection() = default;
|
| +
|
| +NodeController::PeerConnection& NodeController::PeerConnection::
|
| +operator=(const PeerConnection& other) = default;
|
| +
|
| +NodeController::PeerConnection& NodeController::PeerConnection::
|
| +operator=(PeerConnection&& other) = default;
|
| +
|
| } // namespace edk
|
| } // namespace mojo
|
|
|