Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(136)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2466433002: Add mojo::edk::ClosePeerConnection. (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/test/mojo_test_base.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 209 matching lines...) Expand 10 before | Expand all | Expand 10 after
220 reserved_ports_.erase(token); 220 reserved_ports_.erase(token);
221 } 221 }
222 222
223 for (const auto& port : ports_to_close) 223 for (const auto& port : ports_to_close)
224 node_->ClosePort(port); 224 node_->ClosePort(port);
225 225
226 // Ensure local port closure messages are processed. 226 // Ensure local port closure messages are processed.
227 AcceptIncomingMessages(); 227 AcceptIncomingMessages();
228 } 228 }
229 229
230 void NodeController::ClosePeerConnection(const std::string& peer_token) {
231 io_task_runner_->PostTask(
232 FROM_HERE, base::Bind(&NodeController::ClosePeerConnectionOnIOThread,
233 base::Unretained(this), peer_token));
234 }
235
230 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { 236 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
231 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) 237 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
232 // Use the bootstrap channel for the broker and receive the node's channel 238 // Use the bootstrap channel for the broker and receive the node's channel
233 // synchronously as the first message from the broker. 239 // synchronously as the first message from the broker.
234 base::ElapsedTimer timer; 240 base::ElapsedTimer timer;
235 broker_.reset(new Broker(std::move(platform_handle))); 241 broker_.reset(new Broker(std::move(platform_handle)));
236 platform_handle = broker_->GetParentPlatformHandle(); 242 platform_handle = broker_->GetParentPlatformHandle();
237 UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime", 243 UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime",
238 timer.Elapsed()); 244 timer.Elapsed());
239 245
240 if (!platform_handle.is_valid()) { 246 if (!platform_handle.is_valid()) {
241 // Most likely the browser side of the channel has already been closed and 247 // Most likely the browser side of the channel has already been closed and
242 // the broker was unable to negotiate a NodeChannel pipe. In this case we 248 // the broker was unable to negotiate a NodeChannel pipe. In this case we
243 // can cancel parent connection. 249 // can cancel parent connection.
244 DVLOG(1) << "Cannot connect to invalid parent channel."; 250 DVLOG(1) << "Cannot connect to invalid parent channel.";
245 CancelPendingPortMerges(); 251 CancelPendingPortMerges();
246 return; 252 return;
247 } 253 }
248 #endif 254 #endif
249 255
250 io_task_runner_->PostTask( 256 io_task_runner_->PostTask(
251 FROM_HERE, 257 FROM_HERE,
252 base::Bind(&NodeController::ConnectToParentOnIOThread, 258 base::Bind(&NodeController::ConnectToParentOnIOThread,
253 base::Unretained(this), 259 base::Unretained(this),
254 base::Passed(&platform_handle))); 260 base::Passed(&platform_handle)));
255 } 261 }
256 262
257 void NodeController::ConnectToPeer(ScopedPlatformHandle handle, 263 void NodeController::ConnectToPeer(ScopedPlatformHandle handle,
258 const ports::PortRef& port) { 264 const ports::PortRef& port,
265 const std::string& peer_token) {
259 ports::NodeName node_name; 266 ports::NodeName node_name;
260 GenerateRandomName(&node_name); 267 GenerateRandomName(&node_name);
261 io_task_runner_->PostTask( 268 io_task_runner_->PostTask(
262 FROM_HERE, base::Bind(&NodeController::ConnectToPeerOnIOThread, 269 FROM_HERE, base::Bind(&NodeController::ConnectToPeerOnIOThread,
263 base::Unretained(this), base::Passed(&handle), 270 base::Unretained(this), base::Passed(&handle),
264 node_name, port)); 271 node_name, port, peer_token));
265 } 272 }
266 273
267 void NodeController::SetPortObserver( 274 void NodeController::SetPortObserver(
268 const ports::PortRef& port, 275 const ports::PortRef& port,
269 const scoped_refptr<PortObserver>& observer) { 276 const scoped_refptr<PortObserver>& observer) {
270 node_->SetUserData(port, observer); 277 node_->SetUserData(port, observer);
271 } 278 }
272 279
273 void NodeController::ClosePort(const ports::PortRef& port) { 280 void NodeController::ClosePort(const ports::PortRef& port) {
274 SetPortObserver(port, nullptr); 281 SetPortObserver(port, nullptr);
(...skipping 165 matching lines...) Expand 10 before | Expand all | Expand 10 after
440 // Relying on message pipes to be closed is not enough because the parent 447 // Relying on message pipes to be closed is not enough because the parent
441 // may see the message pipe closure before the child is dead, causing the 448 // may see the message pipe closure before the child is dead, causing the
442 // child process to be unexpectedly SIGKILL'd. 449 // child process to be unexpectedly SIGKILL'd.
443 bootstrap_parent_channel_->LeakHandleOnShutdown(); 450 bootstrap_parent_channel_->LeakHandleOnShutdown();
444 } 451 }
445 bootstrap_parent_channel_->Start(); 452 bootstrap_parent_channel_->Start();
446 } 453 }
447 454
448 void NodeController::ConnectToPeerOnIOThread(ScopedPlatformHandle handle, 455 void NodeController::ConnectToPeerOnIOThread(ScopedPlatformHandle handle,
449 ports::NodeName token, 456 ports::NodeName token,
450 ports::PortRef port) { 457 ports::PortRef port,
458 const std::string& peer_token) {
451 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 459 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
452 460
453 scoped_refptr<NodeChannel> channel = 461 scoped_refptr<NodeChannel> channel =
454 NodeChannel::Create(this, std::move(handle), io_task_runner_, {}); 462 NodeChannel::Create(this, std::move(handle), io_task_runner_, {});
455 pending_peers_.insert({token, {channel, port}}); 463 peer_connections_.insert(
464 {token, PeerConnection{channel, port, peer_token}});
465 peers_by_token_.insert({peer_token, token});
456 466
457 channel->SetRemoteNodeName(token); 467 channel->SetRemoteNodeName(token);
458 channel->Start(); 468 channel->Start();
459 469
460 channel->AcceptPeer(name_, token, port.name()); 470 channel->AcceptPeer(name_, token, port.name());
461 } 471 }
462 472
473 void NodeController::ClosePeerConnectionOnIOThread(
474 const std::string& peer_token) {
475 RequestContext request_context(RequestContext::Source::SYSTEM);
476 auto peer = peers_by_token_.find(peer_token);
477 // The connection may already be closed.
478 if (peer == peers_by_token_.end())
479 return;
480
481 // |peer| may be removed so make a copy of |name|.
482 ports::NodeName name = peer->second;
483 DropPeer(name, nullptr);
484 }
485
463 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( 486 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
464 const ports::NodeName& name) { 487 const ports::NodeName& name) {
465 base::AutoLock lock(peers_lock_); 488 base::AutoLock lock(peers_lock_);
466 auto it = peers_.find(name); 489 auto it = peers_.find(name);
467 if (it == peers_.end()) 490 if (it == peers_.end())
468 return nullptr; 491 return nullptr;
469 return it->second; 492 return it->second;
470 } 493 }
471 494
472 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { 495 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
585 base::AutoLock lock(parent_lock_); 608 base::AutoLock lock(parent_lock_);
586 is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_); 609 is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_);
587 } 610 }
588 611
589 // If the error comes from the parent channel, we also need to cancel any 612 // If the error comes from the parent channel, we also need to cancel any
590 // port merge requests, so that errors can be propagated to the message 613 // port merge requests, so that errors can be propagated to the message
591 // pipes. 614 // pipes.
592 if (is_parent) 615 if (is_parent)
593 CancelPendingPortMerges(); 616 CancelPendingPortMerges();
594 617
618 auto peer = peer_connections_.find(name);
619 if (peer != peer_connections_.end()) {
620 peers_by_token_.erase(peer->second.peer_token);
621 ports_to_close.push_back(peer->second.local_port);
622 peer_connections_.erase(peer);
623 }
624
595 for (const auto& port : ports_to_close) 625 for (const auto& port : ports_to_close)
596 node_->ClosePort(port); 626 node_->ClosePort(port);
597 627
598 node_->LostConnectionToNode(name); 628 node_->LostConnectionToNode(name);
599 629
600 AcceptIncomingMessages(); 630 AcceptIncomingMessages();
601 } 631 }
602 632
603 void NodeController::SendPeerMessage(const ports::NodeName& name, 633 void NodeController::SendPeerMessage(const ports::NodeName& name,
604 ports::ScopedMessage message) { 634 ports::ScopedMessage message) {
(...skipping 130 matching lines...) Expand 10 before | Expand all | Expand 10 after
735 765
736 { 766 {
737 base::AutoLock lock(peers_lock_); 767 base::AutoLock lock(peers_lock_);
738 for (const auto& peer : peers_) 768 for (const auto& peer : peers_)
739 all_peers.push_back(peer.second); 769 all_peers.push_back(peer.second);
740 for (const auto& peer : pending_children_) 770 for (const auto& peer : pending_children_)
741 all_peers.push_back(peer.second); 771 all_peers.push_back(peer.second);
742 peers_.clear(); 772 peers_.clear();
743 pending_children_.clear(); 773 pending_children_.clear();
744 pending_peer_messages_.clear(); 774 pending_peer_messages_.clear();
745 pending_peers_.clear(); 775 peer_connections_.clear();
746 } 776 }
747 777
748 for (const auto& peer : all_peers) 778 for (const auto& peer : all_peers)
749 peer->ShutDown(); 779 peer->ShutDown();
750 780
751 if (destroy_on_io_thread_shutdown_) 781 if (destroy_on_io_thread_shutdown_)
752 delete this; 782 delete this;
753 } 783 }
754 784
755 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { 785 void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
(...skipping 494 matching lines...) Expand 10 before | Expand all | Expand 10 after
1250 OnPortsMessage(source_node, std::move(message)); 1280 OnPortsMessage(source_node, std::move(message));
1251 } 1281 }
1252 #endif 1282 #endif
1253 1283
1254 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, 1284 void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
1255 const ports::NodeName& token, 1285 const ports::NodeName& token,
1256 const ports::NodeName& peer_name, 1286 const ports::NodeName& peer_name,
1257 const ports::PortName& port_name) { 1287 const ports::PortName& port_name) {
1258 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1288 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1259 1289
1260 auto it = pending_peers_.find(from_node); 1290 auto it = peer_connections_.find(from_node);
1261 if (it == pending_peers_.end()) { 1291 if (it == peer_connections_.end()) {
1262 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; 1292 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
1263 DropPeer(from_node, nullptr); 1293 DropPeer(from_node, nullptr);
1264 return; 1294 return;
1265 } 1295 }
1266 1296
1267 scoped_refptr<NodeChannel> channel = it->second.first; 1297 scoped_refptr<NodeChannel> channel = std::move(it->second.channel);
1268 ports::PortRef local_port = it->second.second; 1298 ports::PortRef local_port = it->second.local_port;
1269 pending_peers_.erase(it); 1299 std::string peer_token = std::move(it->second.peer_token);
1300 peer_connections_.erase(it);
1270 DCHECK(channel); 1301 DCHECK(channel);
1271 1302
1272 // If the peer connection is a self connection (which is used in tests), 1303 // If the peer connection is a self connection (which is used in tests),
1273 // drop the channel to it and skip straight to merging the ports. 1304 // drop the channel to it and skip straight to merging the ports.
1274 if (name_ != peer_name) { 1305 if (name_ == peer_name) {
1306 peers_by_token_.erase(peer_token);
1307 } else {
1308 peers_by_token_[peer_token] = peer_name;
1309 peer_connections_.insert(
1310 {peer_name, PeerConnection{nullptr, local_port, peer_token}});
1275 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name; 1311 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name;
1276 1312
1277 AddPeer(peer_name, channel, false /* start_channel */); 1313 AddPeer(peer_name, channel, false /* start_channel */);
1278 } 1314 }
1279 1315
1280 // We need to choose one side to initiate the port merge. It doesn't matter 1316 // We need to choose one side to initiate the port merge. It doesn't matter
1281 // who does it as long as they don't both try. Simple solution: pick the one 1317 // who does it as long as they don't both try. Simple solution: pick the one
1282 // with the "smaller" port name. 1318 // with the "smaller" port name.
1283 if (local_port.name() < port_name) { 1319 if (local_port.name() < port_name) {
1284 node()->MergePorts(local_port, peer_name, port_name); 1320 node()->MergePorts(local_port, peer_name, port_name);
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
1351 callback = shutdown_callback_; 1387 callback = shutdown_callback_;
1352 shutdown_callback_.Reset(); 1388 shutdown_callback_.Reset();
1353 shutdown_callback_flag_.Set(false); 1389 shutdown_callback_flag_.Set(false);
1354 } 1390 }
1355 1391
1356 DCHECK(!callback.is_null()); 1392 DCHECK(!callback.is_null());
1357 1393
1358 callback.Run(); 1394 callback.Run();
1359 } 1395 }
1360 1396
1397 NodeController::PeerConnection::PeerConnection() = default;
1398
1399 NodeController::PeerConnection::PeerConnection(
1400 const PeerConnection& other) = default;
1401
1402 NodeController::PeerConnection::PeerConnection(
1403 PeerConnection&& other) = default;
1404
1405 NodeController::PeerConnection::PeerConnection(
1406 const scoped_refptr<NodeChannel>& channel,
1407 const ports::PortRef& local_port,
1408 const std::string& peer_token)
1409 : channel(channel), local_port(local_port), peer_token(peer_token) {}
1410
1411 NodeController::PeerConnection::~PeerConnection() = default;
1412
1413 NodeController::PeerConnection& NodeController::PeerConnection::
1414 operator=(const PeerConnection& other) = default;
1415
1416 NodeController::PeerConnection& NodeController::PeerConnection::
1417 operator=(PeerConnection&& other) = default;
1418
1361 } // namespace edk 1419 } // namespace edk
1362 } // namespace mojo 1420 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/test/mojo_test_base.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698