| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 209 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 220 reserved_ports_.erase(token); | 220 reserved_ports_.erase(token); |
| 221 } | 221 } |
| 222 | 222 |
| 223 for (const auto& port : ports_to_close) | 223 for (const auto& port : ports_to_close) |
| 224 node_->ClosePort(port); | 224 node_->ClosePort(port); |
| 225 | 225 |
| 226 // Ensure local port closure messages are processed. | 226 // Ensure local port closure messages are processed. |
| 227 AcceptIncomingMessages(); | 227 AcceptIncomingMessages(); |
| 228 } | 228 } |
| 229 | 229 |
| 230 void NodeController::ClosePeerConnection(const std::string& peer_token) { |
| 231 io_task_runner_->PostTask( |
| 232 FROM_HERE, base::Bind(&NodeController::ClosePeerConnectionOnIOThread, |
| 233 base::Unretained(this), peer_token)); |
| 234 } |
| 235 |
| 230 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { | 236 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { |
| 231 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) | 237 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) |
| 232 // Use the bootstrap channel for the broker and receive the node's channel | 238 // Use the bootstrap channel for the broker and receive the node's channel |
| 233 // synchronously as the first message from the broker. | 239 // synchronously as the first message from the broker. |
| 234 base::ElapsedTimer timer; | 240 base::ElapsedTimer timer; |
| 235 broker_.reset(new Broker(std::move(platform_handle))); | 241 broker_.reset(new Broker(std::move(platform_handle))); |
| 236 platform_handle = broker_->GetParentPlatformHandle(); | 242 platform_handle = broker_->GetParentPlatformHandle(); |
| 237 UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime", | 243 UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime", |
| 238 timer.Elapsed()); | 244 timer.Elapsed()); |
| 239 | 245 |
| 240 if (!platform_handle.is_valid()) { | 246 if (!platform_handle.is_valid()) { |
| 241 // Most likely the browser side of the channel has already been closed and | 247 // Most likely the browser side of the channel has already been closed and |
| 242 // the broker was unable to negotiate a NodeChannel pipe. In this case we | 248 // the broker was unable to negotiate a NodeChannel pipe. In this case we |
| 243 // can cancel parent connection. | 249 // can cancel parent connection. |
| 244 DVLOG(1) << "Cannot connect to invalid parent channel."; | 250 DVLOG(1) << "Cannot connect to invalid parent channel."; |
| 245 CancelPendingPortMerges(); | 251 CancelPendingPortMerges(); |
| 246 return; | 252 return; |
| 247 } | 253 } |
| 248 #endif | 254 #endif |
| 249 | 255 |
| 250 io_task_runner_->PostTask( | 256 io_task_runner_->PostTask( |
| 251 FROM_HERE, | 257 FROM_HERE, |
| 252 base::Bind(&NodeController::ConnectToParentOnIOThread, | 258 base::Bind(&NodeController::ConnectToParentOnIOThread, |
| 253 base::Unretained(this), | 259 base::Unretained(this), |
| 254 base::Passed(&platform_handle))); | 260 base::Passed(&platform_handle))); |
| 255 } | 261 } |
| 256 | 262 |
| 257 void NodeController::ConnectToPeer(ScopedPlatformHandle handle, | 263 void NodeController::ConnectToPeer(ScopedPlatformHandle handle, |
| 258 const ports::PortRef& port) { | 264 const ports::PortRef& port, |
| 265 const std::string& peer_token) { |
| 259 ports::NodeName node_name; | 266 ports::NodeName node_name; |
| 260 GenerateRandomName(&node_name); | 267 GenerateRandomName(&node_name); |
| 261 io_task_runner_->PostTask( | 268 io_task_runner_->PostTask( |
| 262 FROM_HERE, base::Bind(&NodeController::ConnectToPeerOnIOThread, | 269 FROM_HERE, base::Bind(&NodeController::ConnectToPeerOnIOThread, |
| 263 base::Unretained(this), base::Passed(&handle), | 270 base::Unretained(this), base::Passed(&handle), |
| 264 node_name, port)); | 271 node_name, port, peer_token)); |
| 265 } | 272 } |
| 266 | 273 |
| 267 void NodeController::SetPortObserver( | 274 void NodeController::SetPortObserver( |
| 268 const ports::PortRef& port, | 275 const ports::PortRef& port, |
| 269 const scoped_refptr<PortObserver>& observer) { | 276 const scoped_refptr<PortObserver>& observer) { |
| 270 node_->SetUserData(port, observer); | 277 node_->SetUserData(port, observer); |
| 271 } | 278 } |
| 272 | 279 |
| 273 void NodeController::ClosePort(const ports::PortRef& port) { | 280 void NodeController::ClosePort(const ports::PortRef& port) { |
| 274 SetPortObserver(port, nullptr); | 281 SetPortObserver(port, nullptr); |
| (...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 440 // Relying on message pipes to be closed is not enough because the parent | 447 // Relying on message pipes to be closed is not enough because the parent |
| 441 // may see the message pipe closure before the child is dead, causing the | 448 // may see the message pipe closure before the child is dead, causing the |
| 442 // child process to be unexpectedly SIGKILL'd. | 449 // child process to be unexpectedly SIGKILL'd. |
| 443 bootstrap_parent_channel_->LeakHandleOnShutdown(); | 450 bootstrap_parent_channel_->LeakHandleOnShutdown(); |
| 444 } | 451 } |
| 445 bootstrap_parent_channel_->Start(); | 452 bootstrap_parent_channel_->Start(); |
| 446 } | 453 } |
| 447 | 454 |
| 448 void NodeController::ConnectToPeerOnIOThread(ScopedPlatformHandle handle, | 455 void NodeController::ConnectToPeerOnIOThread(ScopedPlatformHandle handle, |
| 449 ports::NodeName token, | 456 ports::NodeName token, |
| 450 ports::PortRef port) { | 457 ports::PortRef port, |
| 458 const std::string& peer_token) { |
| 451 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 459 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 452 | 460 |
| 453 scoped_refptr<NodeChannel> channel = | 461 scoped_refptr<NodeChannel> channel = |
| 454 NodeChannel::Create(this, std::move(handle), io_task_runner_, {}); | 462 NodeChannel::Create(this, std::move(handle), io_task_runner_, {}); |
| 455 pending_peers_.insert({token, {channel, port}}); | 463 peer_connections_.insert( |
| 464 {token, PeerConnection{channel, port, peer_token}}); |
| 465 peers_by_token_.insert({peer_token, token}); |
| 456 | 466 |
| 457 channel->SetRemoteNodeName(token); | 467 channel->SetRemoteNodeName(token); |
| 458 channel->Start(); | 468 channel->Start(); |
| 459 | 469 |
| 460 channel->AcceptPeer(name_, token, port.name()); | 470 channel->AcceptPeer(name_, token, port.name()); |
| 461 } | 471 } |
| 462 | 472 |
| 473 void NodeController::ClosePeerConnectionOnIOThread( |
| 474 const std::string& peer_token) { |
| 475 RequestContext request_context(RequestContext::Source::SYSTEM); |
| 476 auto peer = peers_by_token_.find(peer_token); |
| 477 // The connection may already be closed. |
| 478 if (peer == peers_by_token_.end()) |
| 479 return; |
| 480 |
| 481 // |peer| may be removed so make a copy of |name|. |
| 482 ports::NodeName name = peer->second; |
| 483 DropPeer(name, nullptr); |
| 484 } |
| 485 |
| 463 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( | 486 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( |
| 464 const ports::NodeName& name) { | 487 const ports::NodeName& name) { |
| 465 base::AutoLock lock(peers_lock_); | 488 base::AutoLock lock(peers_lock_); |
| 466 auto it = peers_.find(name); | 489 auto it = peers_.find(name); |
| 467 if (it == peers_.end()) | 490 if (it == peers_.end()) |
| 468 return nullptr; | 491 return nullptr; |
| 469 return it->second; | 492 return it->second; |
| 470 } | 493 } |
| 471 | 494 |
| 472 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { | 495 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { |
| (...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 585 base::AutoLock lock(parent_lock_); | 608 base::AutoLock lock(parent_lock_); |
| 586 is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_); | 609 is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_); |
| 587 } | 610 } |
| 588 | 611 |
| 589 // If the error comes from the parent channel, we also need to cancel any | 612 // If the error comes from the parent channel, we also need to cancel any |
| 590 // port merge requests, so that errors can be propagated to the message | 613 // port merge requests, so that errors can be propagated to the message |
| 591 // pipes. | 614 // pipes. |
| 592 if (is_parent) | 615 if (is_parent) |
| 593 CancelPendingPortMerges(); | 616 CancelPendingPortMerges(); |
| 594 | 617 |
| 618 auto peer = peer_connections_.find(name); |
| 619 if (peer != peer_connections_.end()) { |
| 620 peers_by_token_.erase(peer->second.peer_token); |
| 621 ports_to_close.push_back(peer->second.local_port); |
| 622 peer_connections_.erase(peer); |
| 623 } |
| 624 |
| 595 for (const auto& port : ports_to_close) | 625 for (const auto& port : ports_to_close) |
| 596 node_->ClosePort(port); | 626 node_->ClosePort(port); |
| 597 | 627 |
| 598 node_->LostConnectionToNode(name); | 628 node_->LostConnectionToNode(name); |
| 599 | 629 |
| 600 AcceptIncomingMessages(); | 630 AcceptIncomingMessages(); |
| 601 } | 631 } |
| 602 | 632 |
| 603 void NodeController::SendPeerMessage(const ports::NodeName& name, | 633 void NodeController::SendPeerMessage(const ports::NodeName& name, |
| 604 ports::ScopedMessage message) { | 634 ports::ScopedMessage message) { |
| (...skipping 130 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 735 | 765 |
| 736 { | 766 { |
| 737 base::AutoLock lock(peers_lock_); | 767 base::AutoLock lock(peers_lock_); |
| 738 for (const auto& peer : peers_) | 768 for (const auto& peer : peers_) |
| 739 all_peers.push_back(peer.second); | 769 all_peers.push_back(peer.second); |
| 740 for (const auto& peer : pending_children_) | 770 for (const auto& peer : pending_children_) |
| 741 all_peers.push_back(peer.second); | 771 all_peers.push_back(peer.second); |
| 742 peers_.clear(); | 772 peers_.clear(); |
| 743 pending_children_.clear(); | 773 pending_children_.clear(); |
| 744 pending_peer_messages_.clear(); | 774 pending_peer_messages_.clear(); |
| 745 pending_peers_.clear(); | 775 peer_connections_.clear(); |
| 746 } | 776 } |
| 747 | 777 |
| 748 for (const auto& peer : all_peers) | 778 for (const auto& peer : all_peers) |
| 749 peer->ShutDown(); | 779 peer->ShutDown(); |
| 750 | 780 |
| 751 if (destroy_on_io_thread_shutdown_) | 781 if (destroy_on_io_thread_shutdown_) |
| 752 delete this; | 782 delete this; |
| 753 } | 783 } |
| 754 | 784 |
| 755 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { | 785 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { |
| (...skipping 494 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1250 OnPortsMessage(source_node, std::move(message)); | 1280 OnPortsMessage(source_node, std::move(message)); |
| 1251 } | 1281 } |
| 1252 #endif | 1282 #endif |
| 1253 | 1283 |
| 1254 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, | 1284 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, |
| 1255 const ports::NodeName& token, | 1285 const ports::NodeName& token, |
| 1256 const ports::NodeName& peer_name, | 1286 const ports::NodeName& peer_name, |
| 1257 const ports::PortName& port_name) { | 1287 const ports::PortName& port_name) { |
| 1258 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1288 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 1259 | 1289 |
| 1260 auto it = pending_peers_.find(from_node); | 1290 auto it = peer_connections_.find(from_node); |
| 1261 if (it == pending_peers_.end()) { | 1291 if (it == peer_connections_.end()) { |
| 1262 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; | 1292 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; |
| 1263 DropPeer(from_node, nullptr); | 1293 DropPeer(from_node, nullptr); |
| 1264 return; | 1294 return; |
| 1265 } | 1295 } |
| 1266 | 1296 |
| 1267 scoped_refptr<NodeChannel> channel = it->second.first; | 1297 scoped_refptr<NodeChannel> channel = std::move(it->second.channel); |
| 1268 ports::PortRef local_port = it->second.second; | 1298 ports::PortRef local_port = it->second.local_port; |
| 1269 pending_peers_.erase(it); | 1299 std::string peer_token = std::move(it->second.peer_token); |
| 1300 peer_connections_.erase(it); |
| 1270 DCHECK(channel); | 1301 DCHECK(channel); |
| 1271 | 1302 |
| 1272 // If the peer connection is a self connection (which is used in tests), | 1303 // If the peer connection is a self connection (which is used in tests), |
| 1273 // drop the channel to it and skip straight to merging the ports. | 1304 // drop the channel to it and skip straight to merging the ports. |
| 1274 if (name_ != peer_name) { | 1305 if (name_ == peer_name) { |
| 1306 peers_by_token_.erase(peer_token); |
| 1307 } else { |
| 1308 peers_by_token_[peer_token] = peer_name; |
| 1309 peer_connections_.insert( |
| 1310 {peer_name, PeerConnection{nullptr, local_port, peer_token}}); |
| 1275 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name; | 1311 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name; |
| 1276 | 1312 |
| 1277 AddPeer(peer_name, channel, false /* start_channel */); | 1313 AddPeer(peer_name, channel, false /* start_channel */); |
| 1278 } | 1314 } |
| 1279 | 1315 |
| 1280 // We need to choose one side to initiate the port merge. It doesn't matter | 1316 // We need to choose one side to initiate the port merge. It doesn't matter |
| 1281 // who does it as long as they don't both try. Simple solution: pick the one | 1317 // who does it as long as they don't both try. Simple solution: pick the one |
| 1282 // with the "smaller" port name. | 1318 // with the "smaller" port name. |
| 1283 if (local_port.name() < port_name) { | 1319 if (local_port.name() < port_name) { |
| 1284 node()->MergePorts(local_port, peer_name, port_name); | 1320 node()->MergePorts(local_port, peer_name, port_name); |
| (...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1351 callback = shutdown_callback_; | 1387 callback = shutdown_callback_; |
| 1352 shutdown_callback_.Reset(); | 1388 shutdown_callback_.Reset(); |
| 1353 shutdown_callback_flag_.Set(false); | 1389 shutdown_callback_flag_.Set(false); |
| 1354 } | 1390 } |
| 1355 | 1391 |
| 1356 DCHECK(!callback.is_null()); | 1392 DCHECK(!callback.is_null()); |
| 1357 | 1393 |
| 1358 callback.Run(); | 1394 callback.Run(); |
| 1359 } | 1395 } |
| 1360 | 1396 |
| 1397 NodeController::PeerConnection::PeerConnection() = default; |
| 1398 |
| 1399 NodeController::PeerConnection::PeerConnection( |
| 1400 const PeerConnection& other) = default; |
| 1401 |
| 1402 NodeController::PeerConnection::PeerConnection( |
| 1403 PeerConnection&& other) = default; |
| 1404 |
| 1405 NodeController::PeerConnection::PeerConnection( |
| 1406 const scoped_refptr<NodeChannel>& channel, |
| 1407 const ports::PortRef& local_port, |
| 1408 const std::string& peer_token) |
| 1409 : channel(channel), local_port(local_port), peer_token(peer_token) {} |
| 1410 |
| 1411 NodeController::PeerConnection::~PeerConnection() = default; |
| 1412 |
| 1413 NodeController::PeerConnection& NodeController::PeerConnection:: |
| 1414 operator=(const PeerConnection& other) = default; |
| 1415 |
| 1416 NodeController::PeerConnection& NodeController::PeerConnection:: |
| 1417 operator=(PeerConnection&& other) = default; |
| 1418 |
| 1361 } // namespace edk | 1419 } // namespace edk |
| 1362 } // namespace mojo | 1420 } // namespace mojo |
| OLD | NEW |