OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
247 } | 247 } |
248 #endif | 248 #endif |
249 | 249 |
250 io_task_runner_->PostTask( | 250 io_task_runner_->PostTask( |
251 FROM_HERE, | 251 FROM_HERE, |
252 base::Bind(&NodeController::ConnectToParentOnIOThread, | 252 base::Bind(&NodeController::ConnectToParentOnIOThread, |
253 base::Unretained(this), | 253 base::Unretained(this), |
254 base::Passed(&platform_handle))); | 254 base::Passed(&platform_handle))); |
255 } | 255 } |
256 | 256 |
| 257 void NodeController::ConnectToPeer(ScopedPlatformHandle handle, |
| 258 const ports::PortRef& port) { |
| 259 ports::NodeName node_name; |
| 260 GenerateRandomName(&node_name); |
| 261 io_task_runner_->PostTask( |
| 262 FROM_HERE, base::Bind(&NodeController::ConnectToPeerOnIOThread, |
| 263 base::Unretained(this), base::Passed(&handle), |
| 264 node_name, port)); |
| 265 } |
| 266 |
257 void NodeController::SetPortObserver( | 267 void NodeController::SetPortObserver( |
258 const ports::PortRef& port, | 268 const ports::PortRef& port, |
259 const scoped_refptr<PortObserver>& observer) { | 269 const scoped_refptr<PortObserver>& observer) { |
260 node_->SetUserData(port, observer); | 270 node_->SetUserData(port, observer); |
261 } | 271 } |
262 | 272 |
263 void NodeController::ClosePort(const ports::PortRef& port) { | 273 void NodeController::ClosePort(const ports::PortRef& port) { |
264 SetPortObserver(port, nullptr); | 274 SetPortObserver(port, nullptr); |
265 int rv = node_->ClosePort(port); | 275 int rv = node_->ClosePort(port); |
266 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); | 276 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); |
(...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
427 // Prevent the parent pipe handle from being closed on shutdown. Pipe | 437 // Prevent the parent pipe handle from being closed on shutdown. Pipe |
428 // closure is used by the parent to detect the child process has exited. | 438 // closure is used by the parent to detect the child process has exited. |
429 // Relying on message pipes to be closed is not enough because the parent | 439 // Relying on message pipes to be closed is not enough because the parent |
430 // may see the message pipe closure before the child is dead, causing the | 440 // may see the message pipe closure before the child is dead, causing the |
431 // child process to be unexpectedly SIGKILL'd. | 441 // child process to be unexpectedly SIGKILL'd. |
432 bootstrap_parent_channel_->LeakHandleOnShutdown(); | 442 bootstrap_parent_channel_->LeakHandleOnShutdown(); |
433 } | 443 } |
434 bootstrap_parent_channel_->Start(); | 444 bootstrap_parent_channel_->Start(); |
435 } | 445 } |
436 | 446 |
| 447 void NodeController::ConnectToPeerOnIOThread(ScopedPlatformHandle handle, |
| 448 ports::NodeName token, |
| 449 ports::PortRef port) { |
| 450 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 451 |
| 452 scoped_refptr<NodeChannel> channel = |
| 453 NodeChannel::Create(this, std::move(handle), io_task_runner_, {}); |
| 454 pending_peers_.insert({token, {channel, port}}); |
| 455 |
| 456 channel->SetRemoteNodeName(token); |
| 457 channel->Start(); |
| 458 |
| 459 channel->AcceptPeer(name_, token, port.name()); |
| 460 } |
| 461 |
437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( | 462 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( |
438 const ports::NodeName& name) { | 463 const ports::NodeName& name) { |
439 base::AutoLock lock(peers_lock_); | 464 base::AutoLock lock(peers_lock_); |
440 auto it = peers_.find(name); | 465 auto it = peers_.find(name); |
441 if (it == peers_.end()) | 466 if (it == peers_.end()) |
442 return nullptr; | 467 return nullptr; |
443 return it->second; | 468 return it->second; |
444 } | 469 } |
445 | 470 |
446 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { | 471 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { |
(...skipping 258 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
705 | 730 |
706 { | 731 { |
707 base::AutoLock lock(peers_lock_); | 732 base::AutoLock lock(peers_lock_); |
708 for (const auto& peer : peers_) | 733 for (const auto& peer : peers_) |
709 all_peers.push_back(peer.second); | 734 all_peers.push_back(peer.second); |
710 for (const auto& peer : pending_children_) | 735 for (const auto& peer : pending_children_) |
711 all_peers.push_back(peer.second); | 736 all_peers.push_back(peer.second); |
712 peers_.clear(); | 737 peers_.clear(); |
713 pending_children_.clear(); | 738 pending_children_.clear(); |
714 pending_peer_messages_.clear(); | 739 pending_peer_messages_.clear(); |
| 740 pending_peers_.clear(); |
715 } | 741 } |
716 | 742 |
717 for (const auto& peer : all_peers) | 743 for (const auto& peer : all_peers) |
718 peer->ShutDown(); | 744 peer->ShutDown(); |
719 | 745 |
720 if (destroy_on_io_thread_shutdown_) | 746 if (destroy_on_io_thread_shutdown_) |
721 delete this; | 747 delete this; |
722 } | 748 } |
723 | 749 |
724 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { | 750 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { |
(...skipping 487 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1212 if (GetPeerChannel(from_node) != GetBrokerChannel()) { | 1238 if (GetPeerChannel(from_node) != GetBrokerChannel()) { |
1213 LOG(ERROR) << "Refusing relayed message from non-broker node."; | 1239 LOG(ERROR) << "Refusing relayed message from non-broker node."; |
1214 DropPeer(from_node, nullptr); | 1240 DropPeer(from_node, nullptr); |
1215 return; | 1241 return; |
1216 } | 1242 } |
1217 | 1243 |
1218 OnPortsMessage(source_node, std::move(message)); | 1244 OnPortsMessage(source_node, std::move(message)); |
1219 } | 1245 } |
1220 #endif | 1246 #endif |
1221 | 1247 |
| 1248 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, |
| 1249 const ports::NodeName& token, |
| 1250 const ports::NodeName& peer_name, |
| 1251 const ports::PortName& port_name) { |
| 1252 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 1253 |
| 1254 auto it = pending_peers_.find(from_node); |
| 1255 if (it == pending_peers_.end()) { |
| 1256 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; |
| 1257 DropPeer(from_node, nullptr); |
| 1258 return; |
| 1259 } |
| 1260 |
| 1261 scoped_refptr<NodeChannel> channel = it->second.first; |
| 1262 ports::PortRef local_port = it->second.second; |
| 1263 pending_peers_.erase(it); |
| 1264 DCHECK(channel); |
| 1265 |
| 1266 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name; |
| 1267 |
| 1268 AddPeer(peer_name, channel, false /* start_channel */); |
| 1269 |
| 1270 // We need to choose one side to initiate the port merge. It doesn't matter |
| 1271 // who does it as long as they don't both try. Simple solution: pick the one |
| 1272 // with the "smaller" port name. |
| 1273 if (local_port.name() < port_name) { |
| 1274 node()->MergePorts(local_port, peer_name, port_name); |
| 1275 } |
| 1276 } |
| 1277 |
1222 void NodeController::OnChannelError(const ports::NodeName& from_node, | 1278 void NodeController::OnChannelError(const ports::NodeName& from_node, |
1223 NodeChannel* channel) { | 1279 NodeChannel* channel) { |
1224 if (io_task_runner_->RunsTasksOnCurrentThread()) { | 1280 if (io_task_runner_->RunsTasksOnCurrentThread()) { |
1225 DropPeer(from_node, channel); | 1281 DropPeer(from_node, channel); |
1226 // DropPeer may have caused local port closures, so be sure to process any | 1282 // DropPeer may have caused local port closures, so be sure to process any |
1227 // pending local messages. | 1283 // pending local messages. |
1228 AcceptIncomingMessages(); | 1284 AcceptIncomingMessages(); |
1229 } else { | 1285 } else { |
1230 io_task_runner_->PostTask( | 1286 io_task_runner_->PostTask( |
1231 FROM_HERE, | 1287 FROM_HERE, |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1271 shutdown_callback_flag_.Set(false); | 1327 shutdown_callback_flag_.Set(false); |
1272 } | 1328 } |
1273 | 1329 |
1274 DCHECK(!callback.is_null()); | 1330 DCHECK(!callback.is_null()); |
1275 | 1331 |
1276 callback.Run(); | 1332 callback.Run(); |
1277 } | 1333 } |
1278 | 1334 |
1279 } // namespace edk | 1335 } // namespace edk |
1280 } // namespace mojo | 1336 } // namespace mojo |
OLD | NEW |