OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
124 | 124 |
125 void NodeController::SetIOTaskRunner( | 125 void NodeController::SetIOTaskRunner( |
126 scoped_refptr<base::TaskRunner> task_runner) { | 126 scoped_refptr<base::TaskRunner> task_runner) { |
127 io_task_runner_ = task_runner; | 127 io_task_runner_ = task_runner; |
128 ThreadDestructionObserver::Create( | 128 ThreadDestructionObserver::Create( |
129 io_task_runner_, | 129 io_task_runner_, |
130 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); | 130 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); |
131 } | 131 } |
132 | 132 |
133 void NodeController::ConnectToChild(base::ProcessHandle process_handle, | 133 void NodeController::ConnectToChild(base::ProcessHandle process_handle, |
134 ScopedPlatformHandle platform_handle) { | 134 ScopedPlatformHandle platform_handle, |
| 135 const std::string& child_token) { |
| 136 // Generate the temporary remote node name here so that it can be associated |
| 137 // with the embedder's child_token. If an error occurs in the child process |
| 138 // after it is launched, but before any reserved ports are connected, this can |
| 139 // be used to clean up any dangling ports. |
| 140 ports::NodeName node_name; |
| 141 GenerateRandomName(&node_name); |
| 142 |
| 143 { |
| 144 base::AutoLock lock(reserved_ports_lock_); |
| 145 bool inserted = pending_child_tokens_.insert( |
| 146 std::make_pair(node_name, child_token)).second; |
| 147 DCHECK(inserted); |
| 148 } |
| 149 |
135 io_task_runner_->PostTask( | 150 io_task_runner_->PostTask( |
136 FROM_HERE, | 151 FROM_HERE, |
137 base::Bind(&NodeController::ConnectToChildOnIOThread, | 152 base::Bind(&NodeController::ConnectToChildOnIOThread, |
138 base::Unretained(this), | 153 base::Unretained(this), |
139 process_handle, | 154 process_handle, |
140 base::Passed(&platform_handle))); | 155 base::Passed(&platform_handle), |
| 156 node_name)); |
| 157 } |
| 158 |
| 159 void NodeController::CloseChildPorts(const std::string& child_token) { |
| 160 std::vector<ports::PortRef> ports_to_close; |
| 161 { |
| 162 std::vector<std::string> port_tokens; |
| 163 base::AutoLock lock(reserved_ports_lock_); |
| 164 for (const auto& port : reserved_ports_) { |
| 165 if (port.second.child_token == child_token) { |
| 166 DVLOG(1) << "Closing reserved port " << port.second.port.name(); |
| 167 ports_to_close.push_back(port.second.port); |
| 168 port_tokens.push_back(port.first); |
| 169 } |
| 170 } |
| 171 |
| 172 for (const auto& token : port_tokens) |
| 173 reserved_ports_.erase(token); |
| 174 } |
| 175 |
| 176 for (const auto& port : ports_to_close) |
| 177 node_->ClosePort(port); |
| 178 |
| 179 // Ensure local port closure messages are processed. |
| 180 AcceptIncomingMessages(); |
141 } | 181 } |
142 | 182 |
143 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { | 183 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { |
144 // TODO(amistry): Consider the need for a broker on Windows. | 184 // TODO(amistry): Consider the need for a broker on Windows. |
145 #if defined(OS_POSIX) && !defined(OS_MACOSX) | 185 #if defined(OS_POSIX) && !defined(OS_MACOSX) |
146 // On posix, use the bootstrap channel for the broker and receive the node's | 186 // On posix, use the bootstrap channel for the broker and receive the node's |
147 // channel synchronously as the first message from the broker. | 187 // channel synchronously as the first message from the broker. |
148 base::ElapsedTimer timer; | 188 base::ElapsedTimer timer; |
149 broker_.reset(new Broker(std::move(platform_handle))); | 189 broker_.reset(new Broker(std::move(platform_handle))); |
150 platform_handle = broker_->GetParentPlatformHandle(); | 190 platform_handle = broker_->GetParentPlatformHandle(); |
(...skipping 25 matching lines...) Expand all Loading... |
176 int NodeController::SendMessage(const ports::PortRef& port, | 216 int NodeController::SendMessage(const ports::PortRef& port, |
177 std::unique_ptr<PortsMessage> message) { | 217 std::unique_ptr<PortsMessage> message) { |
178 ports::ScopedMessage ports_message(message.release()); | 218 ports::ScopedMessage ports_message(message.release()); |
179 int rv = node_->SendMessage(port, std::move(ports_message)); | 219 int rv = node_->SendMessage(port, std::move(ports_message)); |
180 | 220 |
181 AcceptIncomingMessages(); | 221 AcceptIncomingMessages(); |
182 return rv; | 222 return rv; |
183 } | 223 } |
184 | 224 |
185 void NodeController::ReservePort(const std::string& token, | 225 void NodeController::ReservePort(const std::string& token, |
186 const ports::PortRef& port) { | 226 const ports::PortRef& port, |
| 227 const std::string& child_token) { |
187 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " | 228 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " |
188 << token; | 229 << token; |
189 | 230 |
190 base::AutoLock lock(reserved_ports_lock_); | 231 base::AutoLock lock(reserved_ports_lock_); |
191 auto result = reserved_ports_.insert(std::make_pair(token, port)); | 232 auto result = reserved_ports_.insert( |
| 233 std::make_pair(token, ReservedPort{port, child_token})); |
192 DCHECK(result.second); | 234 DCHECK(result.second); |
193 } | 235 } |
194 | 236 |
195 void NodeController::MergePortIntoParent(const std::string& token, | 237 void NodeController::MergePortIntoParent(const std::string& token, |
196 const ports::PortRef& port) { | 238 const ports::PortRef& port) { |
197 bool was_merged = false; | 239 bool was_merged = false; |
198 { | 240 { |
199 // This request may be coming from within the process that reserved the | 241 // This request may be coming from within the process that reserved the |
200 // "parent" side (e.g. for Chrome single-process mode), so if this token is | 242 // "parent" side (e.g. for Chrome single-process mode), so if this token is |
201 // reserved locally, merge locally instead. | 243 // reserved locally, merge locally instead. |
202 base::AutoLock lock(reserved_ports_lock_); | 244 base::AutoLock lock(reserved_ports_lock_); |
203 auto it = reserved_ports_.find(token); | 245 auto it = reserved_ports_.find(token); |
204 if (it != reserved_ports_.end()) { | 246 if (it != reserved_ports_.end()) { |
205 node_->MergePorts(port, name_, it->second.name()); | 247 node_->MergePorts(port, name_, it->second.port.name()); |
206 reserved_ports_.erase(it); | 248 reserved_ports_.erase(it); |
207 was_merged = true; | 249 was_merged = true; |
208 } | 250 } |
209 } | 251 } |
210 if (was_merged) { | 252 if (was_merged) { |
211 AcceptIncomingMessages(); | 253 AcceptIncomingMessages(); |
212 return; | 254 return; |
213 } | 255 } |
214 | 256 |
215 scoped_refptr<NodeChannel> parent; | 257 scoped_refptr<NodeChannel> parent; |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
254 base::AutoLock lock(shutdown_lock_); | 296 base::AutoLock lock(shutdown_lock_); |
255 shutdown_callback_ = callback; | 297 shutdown_callback_ = callback; |
256 shutdown_callback_flag_.Set(true); | 298 shutdown_callback_flag_.Set(true); |
257 } | 299 } |
258 | 300 |
259 AttemptShutdownIfRequested(); | 301 AttemptShutdownIfRequested(); |
260 } | 302 } |
261 | 303 |
262 void NodeController::ConnectToChildOnIOThread( | 304 void NodeController::ConnectToChildOnIOThread( |
263 base::ProcessHandle process_handle, | 305 base::ProcessHandle process_handle, |
264 ScopedPlatformHandle platform_handle) { | 306 ScopedPlatformHandle platform_handle, |
| 307 ports::NodeName token) { |
265 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 308 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
266 | 309 |
267 #if defined(OS_POSIX) && !defined(OS_MACOSX) | 310 #if defined(OS_POSIX) && !defined(OS_MACOSX) |
268 PlatformChannelPair node_channel; | 311 PlatformChannelPair node_channel; |
269 // BrokerHost owns itself. | 312 // BrokerHost owns itself. |
270 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); | 313 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); |
271 broker_host->SendChannel(node_channel.PassClientHandle()); | 314 broker_host->SendChannel(node_channel.PassClientHandle()); |
272 scoped_refptr<NodeChannel> channel = NodeChannel::Create( | 315 scoped_refptr<NodeChannel> channel = NodeChannel::Create( |
273 this, node_channel.PassServerHandle(), io_task_runner_); | 316 this, node_channel.PassServerHandle(), io_task_runner_); |
274 #else | 317 #else |
275 scoped_refptr<NodeChannel> channel = | 318 scoped_refptr<NodeChannel> channel = |
276 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); | 319 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); |
277 #endif | 320 #endif |
278 | 321 |
279 // We set up the child channel with a temporary name so it can be identified | 322 // We set up the child channel with a temporary name so it can be identified |
280 // as a pending child if it writes any messages to the channel. We may start | 323 // as a pending child if it writes any messages to the channel. We may start |
281 // receiving messages from it (though we shouldn't) as soon as Start() is | 324 // receiving messages from it (though we shouldn't) as soon as Start() is |
282 // called below. | 325 // called below. |
283 ports::NodeName token; | |
284 GenerateRandomName(&token); | |
285 | 326 |
286 pending_children_.insert(std::make_pair(token, channel)); | 327 pending_children_.insert(std::make_pair(token, channel)); |
287 RecordPendingChildCount(pending_children_.size()); | 328 RecordPendingChildCount(pending_children_.size()); |
288 | 329 |
289 channel->SetRemoteNodeName(token); | 330 channel->SetRemoteNodeName(token); |
290 channel->SetRemoteProcessHandle(process_handle); | 331 channel->SetRemoteProcessHandle(process_handle); |
291 channel->Start(); | 332 channel->Start(); |
292 | 333 |
293 channel->AcceptChild(name_, token); | 334 channel->AcceptChild(name_, token); |
294 } | 335 } |
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
395 DVLOG(1) << "Dropped peer " << peer; | 436 DVLOG(1) << "Dropped peer " << peer; |
396 } | 437 } |
397 | 438 |
398 pending_peer_messages_.erase(name); | 439 pending_peer_messages_.erase(name); |
399 pending_children_.erase(name); | 440 pending_children_.erase(name); |
400 | 441 |
401 RecordPeerCount(peers_.size()); | 442 RecordPeerCount(peers_.size()); |
402 RecordPendingChildCount(pending_children_.size()); | 443 RecordPendingChildCount(pending_children_.size()); |
403 } | 444 } |
404 | 445 |
| 446 std::vector<ports::PortRef> ports_to_close; |
| 447 { |
| 448 // Clean up any reserved ports. |
| 449 base::AutoLock lock(reserved_ports_lock_); |
| 450 auto it = pending_child_tokens_.find(name); |
| 451 if (it != pending_child_tokens_.end()) { |
| 452 const std::string& child_token = it->second; |
| 453 |
| 454 std::vector<std::string> port_tokens; |
| 455 for (const auto& port : reserved_ports_) { |
| 456 if (port.second.child_token == child_token) { |
| 457 DVLOG(1) << "Closing reserved port: " << port.second.port.name(); |
| 458 ports_to_close.push_back(port.second.port); |
| 459 port_tokens.push_back(port.first); |
| 460 } |
| 461 } |
| 462 |
| 463 // We have to erase reserved ports in a two-step manner because the usual |
| 464 // manner of using the returned iterator from map::erase isn't technically |
| 465 // valid in C++11 (although it is in C++14). |
| 466 for (const auto& token : port_tokens) |
| 467 reserved_ports_.erase(token); |
| 468 |
| 469 pending_child_tokens_.erase(it); |
| 470 } |
| 471 } |
| 472 |
| 473 for (const auto& port : ports_to_close) |
| 474 node_->ClosePort(port); |
| 475 |
405 node_->LostConnectionToNode(name); | 476 node_->LostConnectionToNode(name); |
406 } | 477 } |
407 | 478 |
408 void NodeController::SendPeerMessage(const ports::NodeName& name, | 479 void NodeController::SendPeerMessage(const ports::NodeName& name, |
409 ports::ScopedMessage message) { | 480 ports::ScopedMessage message) { |
410 Channel::MessagePtr channel_message = | 481 Channel::MessagePtr channel_message = |
411 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); | 482 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); |
412 | 483 |
413 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); | 484 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); |
414 #if defined(OS_WIN) | 485 #if defined(OS_WIN) |
(...skipping 421 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
836 | 907 |
837 ports::PortRef local_port; | 908 ports::PortRef local_port; |
838 { | 909 { |
839 base::AutoLock lock(reserved_ports_lock_); | 910 base::AutoLock lock(reserved_ports_lock_); |
840 auto it = reserved_ports_.find(token); | 911 auto it = reserved_ports_.find(token); |
841 if (it == reserved_ports_.end()) { | 912 if (it == reserved_ports_.end()) { |
842 DVLOG(1) << "Ignoring request to connect to port for unknown token " | 913 DVLOG(1) << "Ignoring request to connect to port for unknown token " |
843 << token; | 914 << token; |
844 return; | 915 return; |
845 } | 916 } |
846 local_port = it->second; | 917 local_port = it->second.port; |
847 } | 918 } |
848 | 919 |
849 int rv = node_->MergePorts(local_port, from_node, connector_port_name); | 920 int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
850 if (rv != ports::OK) | 921 if (rv != ports::OK) |
851 DLOG(ERROR) << "MergePorts failed: " << rv; | 922 DLOG(ERROR) << "MergePorts failed: " << rv; |
852 } | 923 } |
853 | 924 |
854 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, | 925 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |
855 const ports::NodeName& name) { | 926 const ports::NodeName& name) { |
856 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 927 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
958 if (peer) | 1029 if (peer) |
959 peer->PortsMessage(std::move(message)); | 1030 peer->PortsMessage(std::move(message)); |
960 else | 1031 else |
961 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; | 1032 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; |
962 } | 1033 } |
963 #endif | 1034 #endif |
964 | 1035 |
965 void NodeController::OnChannelError(const ports::NodeName& from_node) { | 1036 void NodeController::OnChannelError(const ports::NodeName& from_node) { |
966 if (io_task_runner_->RunsTasksOnCurrentThread()) { | 1037 if (io_task_runner_->RunsTasksOnCurrentThread()) { |
967 DropPeer(from_node); | 1038 DropPeer(from_node); |
| 1039 // DropPeer may have caused local port closures, so be sure to process any |
| 1040 // pending local messages. |
| 1041 AcceptIncomingMessages(); |
968 } else { | 1042 } else { |
969 io_task_runner_->PostTask( | 1043 io_task_runner_->PostTask( |
970 FROM_HERE, | 1044 FROM_HERE, |
971 base::Bind(&NodeController::DropPeer, base::Unretained(this), | 1045 base::Bind(&NodeController::OnChannelError, base::Unretained(this), |
972 from_node)); | 1046 from_node)); |
973 } | 1047 } |
974 } | 1048 } |
975 | 1049 |
976 #if defined(OS_MACOSX) && !defined(OS_IOS) | 1050 #if defined(OS_MACOSX) && !defined(OS_IOS) |
977 MachPortRelay* NodeController::GetMachPortRelay() { | 1051 MachPortRelay* NodeController::GetMachPortRelay() { |
978 { | 1052 { |
979 base::AutoLock lock(parent_lock_); | 1053 base::AutoLock lock(parent_lock_); |
980 // Return null if we're not the root. | 1054 // Return null if we're not the root. |
981 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) | 1055 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) |
(...skipping 28 matching lines...) Expand all Loading... |
1010 shutdown_callback_flag_.Set(false); | 1084 shutdown_callback_flag_.Set(false); |
1011 } | 1085 } |
1012 | 1086 |
1013 DCHECK(!callback.is_null()); | 1087 DCHECK(!callback.is_null()); |
1014 | 1088 |
1015 callback.Run(); | 1089 callback.Run(); |
1016 } | 1090 } |
1017 | 1091 |
1018 } // namespace edk | 1092 } // namespace edk |
1019 } // namespace mojo | 1093 } // namespace mojo |
OLD | NEW |