OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
247 } | 247 } |
248 #endif | 248 #endif |
249 | 249 |
250 io_task_runner_->PostTask( | 250 io_task_runner_->PostTask( |
251 FROM_HERE, | 251 FROM_HERE, |
252 base::Bind(&NodeController::ConnectToParentOnIOThread, | 252 base::Bind(&NodeController::ConnectToParentOnIOThread, |
253 base::Unretained(this), | 253 base::Unretained(this), |
254 base::Passed(&platform_handle))); | 254 base::Passed(&platform_handle))); |
255 } | 255 } |
256 | 256 |
| 257 void NodeController::ConnectToPeer(ScopedPlatformHandle handle, |
| 258 const ports::PortRef& port) { |
| 259 ports::NodeName node_name; |
| 260 GenerateRandomName(&node_name); |
| 261 io_task_runner_->PostTask( |
| 262 FROM_HERE, base::Bind(&NodeController::ConnectToPeerOnIOThread, |
| 263 base::Unretained(this), base::Passed(&handle), |
| 264 node_name, port)); |
| 265 } |
| 266 |
257 void NodeController::SetPortObserver( | 267 void NodeController::SetPortObserver( |
258 const ports::PortRef& port, | 268 const ports::PortRef& port, |
259 const scoped_refptr<PortObserver>& observer) { | 269 const scoped_refptr<PortObserver>& observer) { |
260 node_->SetUserData(port, observer); | 270 node_->SetUserData(port, observer); |
261 } | 271 } |
262 | 272 |
263 void NodeController::ClosePort(const ports::PortRef& port) { | 273 void NodeController::ClosePort(const ports::PortRef& port) { |
264 SetPortObserver(port, nullptr); | 274 SetPortObserver(port, nullptr); |
265 int rv = node_->ClosePort(port); | 275 int rv = node_->ClosePort(port); |
266 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); | 276 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); |
(...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
427 // Prevent the parent pipe handle from being closed on shutdown. Pipe | 437 // Prevent the parent pipe handle from being closed on shutdown. Pipe |
428 // closure is used by the parent to detect the child process has exited. | 438 // closure is used by the parent to detect the child process has exited. |
429 // Relying on message pipes to be closed is not enough because the parent | 439 // Relying on message pipes to be closed is not enough because the parent |
430 // may see the message pipe closure before the child is dead, causing the | 440 // may see the message pipe closure before the child is dead, causing the |
431 // child process to be unexpectedly SIGKILL'd. | 441 // child process to be unexpectedly SIGKILL'd. |
432 bootstrap_parent_channel_->LeakHandleOnShutdown(); | 442 bootstrap_parent_channel_->LeakHandleOnShutdown(); |
433 } | 443 } |
434 bootstrap_parent_channel_->Start(); | 444 bootstrap_parent_channel_->Start(); |
435 } | 445 } |
436 | 446 |
| 447 void NodeController::ConnectToPeerOnIOThread(ScopedPlatformHandle handle, |
| 448 ports::NodeName token, |
| 449 ports::PortRef port) { |
| 450 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 451 |
| 452 scoped_refptr<NodeChannel> channel = |
| 453 NodeChannel::Create(this, std::move(handle), io_task_runner_, {}); |
| 454 pending_peers_.insert({token, {channel, port}}); |
| 455 |
| 456 channel->SetRemoteNodeName(token); |
| 457 channel->Start(); |
| 458 |
| 459 channel->AcceptPeer(name_, token, port.name()); |
| 460 } |
| 461 |
437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( | 462 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( |
438 const ports::NodeName& name) { | 463 const ports::NodeName& name) { |
439 base::AutoLock lock(peers_lock_); | 464 base::AutoLock lock(peers_lock_); |
440 auto it = peers_.find(name); | 465 auto it = peers_.find(name); |
441 if (it == peers_.end()) | 466 if (it == peers_.end()) |
442 return nullptr; | 467 return nullptr; |
443 return it->second; | 468 return it->second; |
444 } | 469 } |
445 | 470 |
446 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { | 471 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { |
(...skipping 267 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
714 | 739 |
715 { | 740 { |
716 base::AutoLock lock(peers_lock_); | 741 base::AutoLock lock(peers_lock_); |
717 for (const auto& peer : peers_) | 742 for (const auto& peer : peers_) |
718 all_peers.push_back(peer.second); | 743 all_peers.push_back(peer.second); |
719 for (const auto& peer : pending_children_) | 744 for (const auto& peer : pending_children_) |
720 all_peers.push_back(peer.second); | 745 all_peers.push_back(peer.second); |
721 peers_.clear(); | 746 peers_.clear(); |
722 pending_children_.clear(); | 747 pending_children_.clear(); |
723 pending_peer_messages_.clear(); | 748 pending_peer_messages_.clear(); |
| 749 pending_peers_.clear(); |
724 } | 750 } |
725 | 751 |
726 for (const auto& peer : all_peers) | 752 for (const auto& peer : all_peers) |
727 peer->ShutDown(); | 753 peer->ShutDown(); |
728 | 754 |
729 if (destroy_on_io_thread_shutdown_) | 755 if (destroy_on_io_thread_shutdown_) |
730 delete this; | 756 delete this; |
731 } | 757 } |
732 | 758 |
733 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { | 759 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { |
(...skipping 488 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1222 if (GetPeerChannel(from_node) != GetBrokerChannel()) { | 1248 if (GetPeerChannel(from_node) != GetBrokerChannel()) { |
1223 LOG(ERROR) << "Refusing relayed message from non-broker node."; | 1249 LOG(ERROR) << "Refusing relayed message from non-broker node."; |
1224 DropPeer(from_node, nullptr); | 1250 DropPeer(from_node, nullptr); |
1225 return; | 1251 return; |
1226 } | 1252 } |
1227 | 1253 |
1228 OnPortsMessage(source_node, std::move(message)); | 1254 OnPortsMessage(source_node, std::move(message)); |
1229 } | 1255 } |
1230 #endif | 1256 #endif |
1231 | 1257 |
| 1258 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, |
| 1259 const ports::NodeName& token, |
| 1260 const ports::NodeName& peer_name, |
| 1261 const ports::PortName& port_name) { |
| 1262 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 1263 |
| 1264 auto it = pending_peers_.find(from_node); |
| 1265 if (it == pending_peers_.end()) { |
| 1266 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; |
| 1267 DropPeer(from_node, nullptr); |
| 1268 return; |
| 1269 } |
| 1270 |
| 1271 scoped_refptr<NodeChannel> channel = it->second.first; |
| 1272 ports::PortRef local_port = it->second.second; |
| 1273 pending_peers_.erase(it); |
| 1274 DCHECK(channel); |
| 1275 |
| 1276 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name; |
| 1277 |
| 1278 AddPeer(peer_name, channel, false /* start_channel */); |
| 1279 |
| 1280 // We need to choose one side to initiate the port merge. It doesn't matter |
| 1281 // who does it as long as they don't both try. Simple solution: pick the one |
| 1282 // with the "smaller" port name. |
| 1283 if (local_port.name() < port_name) { |
| 1284 node()->MergePorts(local_port, peer_name, port_name); |
| 1285 } |
| 1286 } |
| 1287 |
1232 void NodeController::OnChannelError(const ports::NodeName& from_node, | 1288 void NodeController::OnChannelError(const ports::NodeName& from_node, |
1233 NodeChannel* channel) { | 1289 NodeChannel* channel) { |
1234 if (io_task_runner_->RunsTasksOnCurrentThread()) { | 1290 if (io_task_runner_->RunsTasksOnCurrentThread()) { |
1235 DropPeer(from_node, channel); | 1291 DropPeer(from_node, channel); |
1236 // DropPeer may have caused local port closures, so be sure to process any | 1292 // DropPeer may have caused local port closures, so be sure to process any |
1237 // pending local messages. | 1293 // pending local messages. |
1238 AcceptIncomingMessages(); | 1294 AcceptIncomingMessages(); |
1239 } else { | 1295 } else { |
1240 io_task_runner_->PostTask( | 1296 io_task_runner_->PostTask( |
1241 FROM_HERE, | 1297 FROM_HERE, |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1282 shutdown_callback_flag_.Set(false); | 1338 shutdown_callback_flag_.Set(false); |
1283 } | 1339 } |
1284 | 1340 |
1285 DCHECK(!callback.is_null()); | 1341 DCHECK(!callback.is_null()); |
1286 | 1342 |
1287 callback.Run(); | 1343 callback.Run(); |
1288 } | 1344 } |
1289 | 1345 |
1290 } // namespace edk | 1346 } // namespace edk |
1291 } // namespace mojo | 1347 } // namespace mojo |
OLD | NEW |