Index: mojo/edk/system/node_controller.cc |
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc |
index e106063f971e5be9450ab0a8da8eae8365f12ea5..de0faaca4b11f633e09f451fcd97190c0c3fc73a 100644 |
--- a/mojo/edk/system/node_controller.cc |
+++ b/mojo/edk/system/node_controller.cc |
@@ -131,13 +131,53 @@ void NodeController::SetIOTaskRunner( |
} |
void NodeController::ConnectToChild(base::ProcessHandle process_handle, |
- ScopedPlatformHandle platform_handle) { |
+ ScopedPlatformHandle platform_handle, |
+ const std::string& child_token) { |
+ // Generate the temporary remote node name here so that it can be associated |
+ // with the embedder's child_token. If an error occurs in the child process |
+ // after it is launched, but before any reserved ports are connected, this can |
+ // be used to clean up any dangling ports. |
+ ports::NodeName node_name; |
+ GenerateRandomName(&node_name); |
+ |
+ { |
+ base::AutoLock lock(reserved_ports_lock_); |
+ bool inserted = pending_child_tokens_.insert( |
+ std::make_pair(node_name, child_token)).second; |
+ DCHECK(inserted); |
+ } |
+ |
io_task_runner_->PostTask( |
FROM_HERE, |
base::Bind(&NodeController::ConnectToChildOnIOThread, |
base::Unretained(this), |
process_handle, |
- base::Passed(&platform_handle))); |
+ base::Passed(&platform_handle), |
+ node_name)); |
+} |
+ |
+void NodeController::CloseChildPorts(const std::string& child_token) { |
+ std::vector<ports::PortRef> ports_to_close; |
+ { |
+ std::vector<std::string> port_tokens; |
+ base::AutoLock lock(reserved_ports_lock_); |
+ for (const auto& port : reserved_ports_) { |
+ if (port.second.child_token == child_token) { |
+ DVLOG(1) << "Closing reserved port " << port.second.port.name(); |
+ ports_to_close.push_back(port.second.port); |
+ port_tokens.push_back(port.first); |
+ } |
+ } |
+ |
+ for (const auto& token : port_tokens) |
+ reserved_ports_.erase(token); |
+ } |
+ |
+ for (const auto& port : ports_to_close) |
+ node_->ClosePort(port); |
+ |
+ // Ensure local port closure messages are processed. |
+ AcceptIncomingMessages(); |
} |
void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { |
@@ -183,12 +223,14 @@ int NodeController::SendMessage(const ports::PortRef& port, |
} |
void NodeController::ReservePort(const std::string& token, |
- const ports::PortRef& port) { |
+ const ports::PortRef& port, |
+ const std::string& child_token) { |
DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " |
<< token; |
base::AutoLock lock(reserved_ports_lock_); |
- auto result = reserved_ports_.insert(std::make_pair(token, port)); |
+ auto result = reserved_ports_.insert( |
+ std::make_pair(token, ReservedPort{port, child_token})); |
DCHECK(result.second); |
} |
@@ -202,7 +244,7 @@ void NodeController::MergePortIntoParent(const std::string& token, |
base::AutoLock lock(reserved_ports_lock_); |
auto it = reserved_ports_.find(token); |
if (it != reserved_ports_.end()) { |
- node_->MergePorts(port, name_, it->second.name()); |
+ node_->MergePorts(port, name_, it->second.port.name()); |
reserved_ports_.erase(it); |
was_merged = true; |
} |
@@ -261,7 +303,8 @@ void NodeController::RequestShutdown(const base::Closure& callback) { |
void NodeController::ConnectToChildOnIOThread( |
base::ProcessHandle process_handle, |
- ScopedPlatformHandle platform_handle) { |
+ ScopedPlatformHandle platform_handle, |
+ ports::NodeName token) { |
DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
#if defined(OS_POSIX) && !defined(OS_MACOSX) |
@@ -280,8 +323,6 @@ void NodeController::ConnectToChildOnIOThread( |
// as a pending child if it writes any messages to the channel. We may start |
// receiving messages from it (though we shouldn't) as soon as Start() is |
// called below. |
- ports::NodeName token; |
- GenerateRandomName(&token); |
pending_children_.insert(std::make_pair(token, channel)); |
RecordPendingChildCount(pending_children_.size()); |
@@ -402,6 +443,36 @@ void NodeController::DropPeer(const ports::NodeName& name) { |
RecordPendingChildCount(pending_children_.size()); |
} |
+ std::vector<ports::PortRef> ports_to_close; |
+ { |
+ // Clean up any reserved ports. |
+ base::AutoLock lock(reserved_ports_lock_); |
+ auto it = pending_child_tokens_.find(name); |
+ if (it != pending_child_tokens_.end()) { |
+ const std::string& child_token = it->second; |
+ |
+ std::vector<std::string> port_tokens; |
+ for (const auto& port : reserved_ports_) { |
+ if (port.second.child_token == child_token) { |
+ DVLOG(1) << "Closing reserved port: " << port.second.port.name(); |
+ ports_to_close.push_back(port.second.port); |
+ port_tokens.push_back(port.first); |
+ } |
+ } |
+ |
+ // We have to erase reserved ports in a two-step manner because the usual |
+ // manner of using the returned iterator from map::erase isn't technically |
+ // valid in C++11 (although it is in C++14). |
+ for (const auto& token : port_tokens) |
+ reserved_ports_.erase(token); |
+ |
+ pending_child_tokens_.erase(it); |
+ } |
+ } |
+ |
+ for (const auto& port : ports_to_close) |
+ node_->ClosePort(port); |
+ |
node_->LostConnectionToNode(name); |
} |
@@ -843,7 +914,7 @@ void NodeController::OnRequestPortMerge( |
<< token; |
return; |
} |
- local_port = it->second; |
+ local_port = it->second.port; |
} |
int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
@@ -965,10 +1036,13 @@ void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
void NodeController::OnChannelError(const ports::NodeName& from_node) { |
if (io_task_runner_->RunsTasksOnCurrentThread()) { |
DropPeer(from_node); |
+ // DropPeer may have caused local port closures, so be sure to process any |
+ // pending local messages. |
+ AcceptIncomingMessages(); |
} else { |
io_task_runner_->PostTask( |
FROM_HERE, |
- base::Bind(&NodeController::DropPeer, base::Unretained(this), |
+ base::Bind(&NodeController::OnChannelError, base::Unretained(this), |
from_node)); |
} |
} |