Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2227553002: Support mojo connections between unrelated peer processes. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/test/mojo_test_base.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after
247 } 247 }
248 #endif 248 #endif
249 249
250 io_task_runner_->PostTask( 250 io_task_runner_->PostTask(
251 FROM_HERE, 251 FROM_HERE,
252 base::Bind(&NodeController::ConnectToParentOnIOThread, 252 base::Bind(&NodeController::ConnectToParentOnIOThread,
253 base::Unretained(this), 253 base::Unretained(this),
254 base::Passed(&platform_handle))); 254 base::Passed(&platform_handle)));
255 } 255 }
256 256
257 void NodeController::ConnectToPeer(ScopedPlatformHandle handle,
258 const ports::PortRef& port) {
259 ports::NodeName node_name;
260 GenerateRandomName(&node_name);
261 io_task_runner_->PostTask(
262 FROM_HERE, base::Bind(&NodeController::ConnectToPeerOnIOThread,
263 base::Unretained(this), base::Passed(&handle),
264 node_name, port));
265 }
266
257 void NodeController::SetPortObserver( 267 void NodeController::SetPortObserver(
258 const ports::PortRef& port, 268 const ports::PortRef& port,
259 const scoped_refptr<PortObserver>& observer) { 269 const scoped_refptr<PortObserver>& observer) {
260 node_->SetUserData(port, observer); 270 node_->SetUserData(port, observer);
261 } 271 }
262 272
263 void NodeController::ClosePort(const ports::PortRef& port) { 273 void NodeController::ClosePort(const ports::PortRef& port) {
264 SetPortObserver(port, nullptr); 274 SetPortObserver(port, nullptr);
265 int rv = node_->ClosePort(port); 275 int rv = node_->ClosePort(port);
266 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); 276 DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
(...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after
427 // Prevent the parent pipe handle from being closed on shutdown. Pipe 437 // Prevent the parent pipe handle from being closed on shutdown. Pipe
428 // closure is used by the parent to detect the child process has exited. 438 // closure is used by the parent to detect the child process has exited.
429 // Relying on message pipes to be closed is not enough because the parent 439 // Relying on message pipes to be closed is not enough because the parent
430 // may see the message pipe closure before the child is dead, causing the 440 // may see the message pipe closure before the child is dead, causing the
431 // child process to be unexpectedly SIGKILL'd. 441 // child process to be unexpectedly SIGKILL'd.
432 bootstrap_parent_channel_->LeakHandleOnShutdown(); 442 bootstrap_parent_channel_->LeakHandleOnShutdown();
433 } 443 }
434 bootstrap_parent_channel_->Start(); 444 bootstrap_parent_channel_->Start();
435 } 445 }
436 446
447 void NodeController::ConnectToPeerOnIOThread(ScopedPlatformHandle handle,
448 ports::NodeName token,
449 ports::PortRef port) {
450 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
451
452 scoped_refptr<NodeChannel> channel =
453 NodeChannel::Create(this, std::move(handle), io_task_runner_, {});
454 pending_peers_.insert({token, {channel, port}});
455
456 channel->SetRemoteNodeName(token);
457 channel->Start();
458
459 channel->AcceptPeer(name_, token, port.name());
460 }
461
437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( 462 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
438 const ports::NodeName& name) { 463 const ports::NodeName& name) {
439 base::AutoLock lock(peers_lock_); 464 base::AutoLock lock(peers_lock_);
440 auto it = peers_.find(name); 465 auto it = peers_.find(name);
441 if (it == peers_.end()) 466 if (it == peers_.end())
442 return nullptr; 467 return nullptr;
443 return it->second; 468 return it->second;
444 } 469 }
445 470
446 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { 471 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
(...skipping 267 matching lines...) Expand 10 before | Expand all | Expand 10 after
714 739
715 { 740 {
716 base::AutoLock lock(peers_lock_); 741 base::AutoLock lock(peers_lock_);
717 for (const auto& peer : peers_) 742 for (const auto& peer : peers_)
718 all_peers.push_back(peer.second); 743 all_peers.push_back(peer.second);
719 for (const auto& peer : pending_children_) 744 for (const auto& peer : pending_children_)
720 all_peers.push_back(peer.second); 745 all_peers.push_back(peer.second);
721 peers_.clear(); 746 peers_.clear();
722 pending_children_.clear(); 747 pending_children_.clear();
723 pending_peer_messages_.clear(); 748 pending_peer_messages_.clear();
749 pending_peers_.clear();
724 } 750 }
725 751
726 for (const auto& peer : all_peers) 752 for (const auto& peer : all_peers)
727 peer->ShutDown(); 753 peer->ShutDown();
728 754
729 if (destroy_on_io_thread_shutdown_) 755 if (destroy_on_io_thread_shutdown_)
730 delete this; 756 delete this;
731 } 757 }
732 758
733 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { 759 void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
(...skipping 488 matching lines...) Expand 10 before | Expand all | Expand 10 after
1222 if (GetPeerChannel(from_node) != GetBrokerChannel()) { 1248 if (GetPeerChannel(from_node) != GetBrokerChannel()) {
1223 LOG(ERROR) << "Refusing relayed message from non-broker node."; 1249 LOG(ERROR) << "Refusing relayed message from non-broker node.";
1224 DropPeer(from_node, nullptr); 1250 DropPeer(from_node, nullptr);
1225 return; 1251 return;
1226 } 1252 }
1227 1253
1228 OnPortsMessage(source_node, std::move(message)); 1254 OnPortsMessage(source_node, std::move(message));
1229 } 1255 }
1230 #endif 1256 #endif
1231 1257
1258 void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
1259 const ports::NodeName& token,
1260 const ports::NodeName& peer_name,
1261 const ports::PortName& port_name) {
1262 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1263
1264 auto it = pending_peers_.find(from_node);
1265 if (it == pending_peers_.end()) {
1266 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
1267 DropPeer(from_node, nullptr);
1268 return;
1269 }
1270
1271 scoped_refptr<NodeChannel> channel = it->second.first;
1272 ports::PortRef local_port = it->second.second;
1273 pending_peers_.erase(it);
1274 DCHECK(channel);
1275
1276 DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name;
1277
1278 AddPeer(peer_name, channel, false /* start_channel */);
1279
1280 // We need to choose one side to initiate the port merge. It doesn't matter
1281 // who does it as long as they don't both try. Simple solution: pick the one
1282 // with the "smaller" port name.
1283 if (local_port.name() < port_name) {
1284 node()->MergePorts(local_port, peer_name, port_name);
1285 }
1286 }
1287
1232 void NodeController::OnChannelError(const ports::NodeName& from_node, 1288 void NodeController::OnChannelError(const ports::NodeName& from_node,
1233 NodeChannel* channel) { 1289 NodeChannel* channel) {
1234 if (io_task_runner_->RunsTasksOnCurrentThread()) { 1290 if (io_task_runner_->RunsTasksOnCurrentThread()) {
1235 DropPeer(from_node, channel); 1291 DropPeer(from_node, channel);
1236 // DropPeer may have caused local port closures, so be sure to process any 1292 // DropPeer may have caused local port closures, so be sure to process any
1237 // pending local messages. 1293 // pending local messages.
1238 AcceptIncomingMessages(); 1294 AcceptIncomingMessages();
1239 } else { 1295 } else {
1240 io_task_runner_->PostTask( 1296 io_task_runner_->PostTask(
1241 FROM_HERE, 1297 FROM_HERE,
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
1282 shutdown_callback_flag_.Set(false); 1338 shutdown_callback_flag_.Set(false);
1283 } 1339 }
1284 1340
1285 DCHECK(!callback.is_null()); 1341 DCHECK(!callback.is_null());
1286 1342
1287 callback.Run(); 1343 callback.Run();
1288 } 1344 }
1289 1345
1290 } // namespace edk 1346 } // namespace edk
1291 } // namespace mojo 1347 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/test/mojo_test_base.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698