Chromium Code Reviews| Index: mojo/edk/system/node_controller.cc |
| diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc |
| index e106063f971e5be9450ab0a8da8eae8365f12ea5..4ac7db0ccb2ffe996bebaf21c2addbccc7294467 100644 |
| --- a/mojo/edk/system/node_controller.cc |
| +++ b/mojo/edk/system/node_controller.cc |
| @@ -131,13 +131,53 @@ void NodeController::SetIOTaskRunner( |
| } |
| void NodeController::ConnectToChild(base::ProcessHandle process_handle, |
| - ScopedPlatformHandle platform_handle) { |
| + ScopedPlatformHandle platform_handle, |
| + const std::string& child_token) { |
| + // Generate the temporary remote node name here so that it can be associated |
| + // with the embedder's child_token. If an error occurs in the child process |
| + // after it is launched, but before any reserved ports are connected, this can |
| + // be used to clean up any dangling ports. |
| + ports::NodeName node_name; |
| + GenerateRandomName(&node_name); |
| + |
| + { |
| + base::AutoLock lock(reserved_ports_lock_); |
| + bool inserted = pending_child_tokens_.insert( |
| + std::make_pair(node_name, child_token)).second; |
| + DCHECK(inserted); |
| + } |
| + |
| io_task_runner_->PostTask( |
| FROM_HERE, |
| base::Bind(&NodeController::ConnectToChildOnIOThread, |
| base::Unretained(this), |
| process_handle, |
| - base::Passed(&platform_handle))); |
| + base::Passed(&platform_handle), |
| + node_name)); |
| +} |
| + |
| +void NodeController::CloseChildPorts(const std::string& child_token) { |
| + std::vector<ports::PortRef> ports_to_close; |
| + { |
| + std::vector<std::string> port_tokens; |
| + base::AutoLock lock(reserved_ports_lock_); |
| + for (const auto& port : reserved_ports_) { |
| + if (port.second.second == child_token) { |
| + DVLOG(1) << "Closing reserved port " << port.second.first.name(); |
| + ports_to_close.push_back(port.second.first); |
| + port_tokens.push_back(port.first); |
| + } |
| + } |
| + |
| + for (const auto& token : port_tokens) |
| + reserved_ports_.erase(token); |
| + } |
| + |
| + for (const auto& port : ports_to_close) |
| + node_->ClosePort(port); |
| + |
| + // Ensure local port closure messages are processes. |
|
Ken Rockot(use gerrit already)
2016/06/01 15:40:48
nit: processed*
Anand Mistry (off Chromium)
2016/06/02 04:35:36
Done.
|
| + AcceptIncomingMessages(); |
| } |
| void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { |
| @@ -183,12 +223,14 @@ int NodeController::SendMessage(const ports::PortRef& port, |
| } |
| void NodeController::ReservePort(const std::string& token, |
| - const ports::PortRef& port) { |
| + const ports::PortRef& port, |
| + const std::string& child_token) { |
| DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " |
| << token; |
| base::AutoLock lock(reserved_ports_lock_); |
| - auto result = reserved_ports_.insert(std::make_pair(token, port)); |
| + auto result = reserved_ports_.insert( |
| + std::make_pair(token, std::make_pair(port, child_token))); |
| DCHECK(result.second); |
| } |
| @@ -202,7 +244,8 @@ void NodeController::MergePortIntoParent(const std::string& token, |
| base::AutoLock lock(reserved_ports_lock_); |
| auto it = reserved_ports_.find(token); |
| if (it != reserved_ports_.end()) { |
| - node_->MergePorts(port, name_, it->second.name()); |
| + std::string child_token = std::move(it->second.second); |
| + node_->MergePorts(port, name_, it->second.first.name()); |
| reserved_ports_.erase(it); |
| was_merged = true; |
| } |
| @@ -261,7 +304,8 @@ void NodeController::RequestShutdown(const base::Closure& callback) { |
| void NodeController::ConnectToChildOnIOThread( |
| base::ProcessHandle process_handle, |
| - ScopedPlatformHandle platform_handle) { |
| + ScopedPlatformHandle platform_handle, |
| + ports::NodeName token) { |
| DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| #if defined(OS_POSIX) && !defined(OS_MACOSX) |
| @@ -280,8 +324,6 @@ void NodeController::ConnectToChildOnIOThread( |
| // as a pending child if it writes any messages to the channel. We may start |
| // receiving messages from it (though we shouldn't) as soon as Start() is |
| // called below. |
| - ports::NodeName token; |
| - GenerateRandomName(&token); |
| pending_children_.insert(std::make_pair(token, channel)); |
| RecordPendingChildCount(pending_children_.size()); |
| @@ -402,6 +444,36 @@ void NodeController::DropPeer(const ports::NodeName& name) { |
| RecordPendingChildCount(pending_children_.size()); |
| } |
| + std::vector<ports::PortRef> ports_to_close; |
| + { |
| + // Clean up any reserved ports. |
| + base::AutoLock lock(reserved_ports_lock_); |
| + auto it = pending_child_tokens_.find(name); |
| + if (it != pending_child_tokens_.end()) { |
| + const std::string& child_token = it->second; |
| + |
| + std::vector<std::string> port_tokens; |
| + for (const auto& port : reserved_ports_) { |
| + if (port.second.second == child_token) { |
| + DVLOG(1) << "Closing reserved port: " << port.second.first.name(); |
| + ports_to_close.push_back(port.second.first); |
| + port_tokens.push_back(port.first); |
| + } |
| + } |
| + |
| + // We have to erase reserved ports in a two-step manner because the usual |
| + // manner of using the returned iterator from map::erase isn't technically |
| + // valid in C++11 (although it is in C++14). |
| + for (const auto& token : port_tokens) |
| + reserved_ports_.erase(token); |
| + |
| + pending_child_tokens_.erase(it); |
| + } |
| + } |
| + |
| + for (const auto& port : ports_to_close) |
| + node_->ClosePort(port); |
| + |
| node_->LostConnectionToNode(name); |
| } |
| @@ -532,6 +604,18 @@ void NodeController::DropAllPeers() { |
| pending_peer_messages_.clear(); |
| } |
| + std::vector<ports::PortRef> all_reserved_ports; |
| + { |
| + base::AutoLock lock(reserved_ports_lock_); |
| + pending_child_tokens_.clear(); |
| + for (const auto& port : reserved_ports_) |
| + all_reserved_ports.push_back(port.second.first); |
| + reserved_ports_.clear(); |
| + } |
| + |
| + for (const auto& port : all_reserved_ports) |
| + node_->ClosePort(port); |
| + |
| for (const auto& peer : all_peers) |
| peer->ShutDown(); |
| @@ -843,7 +927,7 @@ void NodeController::OnRequestPortMerge( |
| << token; |
| return; |
| } |
| - local_port = it->second; |
| + local_port = it->second.first; |
| } |
| int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
| @@ -965,10 +1049,13 @@ void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
| void NodeController::OnChannelError(const ports::NodeName& from_node) { |
| if (io_task_runner_->RunsTasksOnCurrentThread()) { |
| DropPeer(from_node); |
| + // DropPeer may have caused local port closures, so be sure to process any |
| + // pending local messages. |
| + AcceptIncomingMessages(); |
| } else { |
| io_task_runner_->PostTask( |
| FROM_HERE, |
| - base::Bind(&NodeController::DropPeer, base::Unretained(this), |
| + base::Bind(&NodeController::OnChannelError, base::Unretained(this), |
| from_node)); |
| } |
| } |