OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
124 | 124 |
125 void NodeController::SetIOTaskRunner( | 125 void NodeController::SetIOTaskRunner( |
126 scoped_refptr<base::TaskRunner> task_runner) { | 126 scoped_refptr<base::TaskRunner> task_runner) { |
127 io_task_runner_ = task_runner; | 127 io_task_runner_ = task_runner; |
128 ThreadDestructionObserver::Create( | 128 ThreadDestructionObserver::Create( |
129 io_task_runner_, | 129 io_task_runner_, |
130 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); | 130 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); |
131 } | 131 } |
132 | 132 |
133 void NodeController::ConnectToChild(base::ProcessHandle process_handle, | 133 void NodeController::ConnectToChild(base::ProcessHandle process_handle, |
134 ScopedPlatformHandle platform_handle) { | 134 ScopedPlatformHandle platform_handle, |
135 const std::string& child_token) { | |
136 // Generate the temporary remote node name here so that it can be associated | |
137 // with the embedder's child_token. If an error occurs in the child process | |
138 // after it is launched, but before any reserved ports are connected, this can | |
139 // be used to clean up any dangling ports. | |
140 ports::NodeName node_name; | |
141 GenerateRandomName(&node_name); | |
142 | |
143 { | |
144 base::AutoLock lock(reserved_ports_lock_); | |
145 bool inserted = pending_child_tokens_.insert( | |
146 std::make_pair(node_name, child_token)).second; | |
147 DCHECK(inserted); | |
148 } | |
149 | |
135 io_task_runner_->PostTask( | 150 io_task_runner_->PostTask( |
136 FROM_HERE, | 151 FROM_HERE, |
137 base::Bind(&NodeController::ConnectToChildOnIOThread, | 152 base::Bind(&NodeController::ConnectToChildOnIOThread, |
138 base::Unretained(this), | 153 base::Unretained(this), |
139 process_handle, | 154 process_handle, |
140 base::Passed(&platform_handle))); | 155 base::Passed(&platform_handle), |
156 node_name)); | |
157 } | |
158 | |
159 void NodeController::CloseChildPorts(const std::string& child_token) { | |
160 std::vector<ports::PortRef> ports_to_close; | |
161 { | |
162 std::vector<std::string> port_tokens; | |
163 base::AutoLock lock(reserved_ports_lock_); | |
164 for (const auto& port : reserved_ports_) { | |
165 if (port.second.second == child_token) { | |
166 DVLOG(1) << "Closing reserved port " << port.second.first.name(); | |
167 ports_to_close.push_back(port.second.first); | |
168 port_tokens.push_back(port.first); | |
169 } | |
170 } | |
171 | |
172 for (const auto& token : port_tokens) | |
173 reserved_ports_.erase(token); | |
174 } | |
175 | |
176 for (const auto& port : ports_to_close) | |
177 node_->ClosePort(port); | |
178 | |
179 // Ensure local port closure messages are processed. | |
180 AcceptIncomingMessages(); | |
141 } | 181 } |
142 | 182 |
143 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { | 183 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { |
144 // TODO(amistry): Consider the need for a broker on Windows. | 184 // TODO(amistry): Consider the need for a broker on Windows. |
145 #if defined(OS_POSIX) && !defined(OS_MACOSX) | 185 #if defined(OS_POSIX) && !defined(OS_MACOSX) |
146 // On posix, use the bootstrap channel for the broker and receive the node's | 186 // On posix, use the bootstrap channel for the broker and receive the node's |
147 // channel synchronously as the first message from the broker. | 187 // channel synchronously as the first message from the broker. |
148 base::ElapsedTimer timer; | 188 base::ElapsedTimer timer; |
149 broker_.reset(new Broker(std::move(platform_handle))); | 189 broker_.reset(new Broker(std::move(platform_handle))); |
150 platform_handle = broker_->GetParentPlatformHandle(); | 190 platform_handle = broker_->GetParentPlatformHandle(); |
(...skipping 25 matching lines...) Expand all Loading... | |
176 int NodeController::SendMessage(const ports::PortRef& port, | 216 int NodeController::SendMessage(const ports::PortRef& port, |
177 std::unique_ptr<PortsMessage> message) { | 217 std::unique_ptr<PortsMessage> message) { |
178 ports::ScopedMessage ports_message(message.release()); | 218 ports::ScopedMessage ports_message(message.release()); |
179 int rv = node_->SendMessage(port, std::move(ports_message)); | 219 int rv = node_->SendMessage(port, std::move(ports_message)); |
180 | 220 |
181 AcceptIncomingMessages(); | 221 AcceptIncomingMessages(); |
182 return rv; | 222 return rv; |
183 } | 223 } |
184 | 224 |
185 void NodeController::ReservePort(const std::string& token, | 225 void NodeController::ReservePort(const std::string& token, |
186 const ports::PortRef& port) { | 226 const ports::PortRef& port, |
227 const std::string& child_token) { | |
187 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " | 228 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " |
188 << token; | 229 << token; |
189 | 230 |
190 base::AutoLock lock(reserved_ports_lock_); | 231 base::AutoLock lock(reserved_ports_lock_); |
191 auto result = reserved_ports_.insert(std::make_pair(token, port)); | 232 auto result = reserved_ports_.insert( |
233 std::make_pair(token, std::make_pair(port, child_token))); | |
192 DCHECK(result.second); | 234 DCHECK(result.second); |
193 } | 235 } |
194 | 236 |
195 void NodeController::MergePortIntoParent(const std::string& token, | 237 void NodeController::MergePortIntoParent(const std::string& token, |
196 const ports::PortRef& port) { | 238 const ports::PortRef& port) { |
197 bool was_merged = false; | 239 bool was_merged = false; |
198 { | 240 { |
199 // This request may be coming from within the process that reserved the | 241 // This request may be coming from within the process that reserved the |
200 // "parent" side (e.g. for Chrome single-process mode), so if this token is | 242 // "parent" side (e.g. for Chrome single-process mode), so if this token is |
201 // reserved locally, merge locally instead. | 243 // reserved locally, merge locally instead. |
202 base::AutoLock lock(reserved_ports_lock_); | 244 base::AutoLock lock(reserved_ports_lock_); |
203 auto it = reserved_ports_.find(token); | 245 auto it = reserved_ports_.find(token); |
204 if (it != reserved_ports_.end()) { | 246 if (it != reserved_ports_.end()) { |
205 node_->MergePorts(port, name_, it->second.name()); | 247 std::string child_token = std::move(it->second.second); |
ncarter (slow)
2016/06/02 17:43:28
Why is this line needed?
Anand Mistry (off Chromium)
2016/06/03 04:39:40
Oops. Relic from a different day.
| |
248 node_->MergePorts(port, name_, it->second.first.name()); | |
206 reserved_ports_.erase(it); | 249 reserved_ports_.erase(it); |
207 was_merged = true; | 250 was_merged = true; |
208 } | 251 } |
209 } | 252 } |
210 if (was_merged) { | 253 if (was_merged) { |
211 AcceptIncomingMessages(); | 254 AcceptIncomingMessages(); |
212 return; | 255 return; |
213 } | 256 } |
214 | 257 |
215 scoped_refptr<NodeChannel> parent; | 258 scoped_refptr<NodeChannel> parent; |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
254 base::AutoLock lock(shutdown_lock_); | 297 base::AutoLock lock(shutdown_lock_); |
255 shutdown_callback_ = callback; | 298 shutdown_callback_ = callback; |
256 shutdown_callback_flag_.Set(true); | 299 shutdown_callback_flag_.Set(true); |
257 } | 300 } |
258 | 301 |
259 AttemptShutdownIfRequested(); | 302 AttemptShutdownIfRequested(); |
260 } | 303 } |
261 | 304 |
262 void NodeController::ConnectToChildOnIOThread( | 305 void NodeController::ConnectToChildOnIOThread( |
263 base::ProcessHandle process_handle, | 306 base::ProcessHandle process_handle, |
264 ScopedPlatformHandle platform_handle) { | 307 ScopedPlatformHandle platform_handle, |
308 ports::NodeName token) { | |
265 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 309 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
266 | 310 |
267 #if defined(OS_POSIX) && !defined(OS_MACOSX) | 311 #if defined(OS_POSIX) && !defined(OS_MACOSX) |
268 PlatformChannelPair node_channel; | 312 PlatformChannelPair node_channel; |
269 // BrokerHost owns itself. | 313 // BrokerHost owns itself. |
270 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); | 314 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); |
271 broker_host->SendChannel(node_channel.PassClientHandle()); | 315 broker_host->SendChannel(node_channel.PassClientHandle()); |
272 scoped_refptr<NodeChannel> channel = NodeChannel::Create( | 316 scoped_refptr<NodeChannel> channel = NodeChannel::Create( |
273 this, node_channel.PassServerHandle(), io_task_runner_); | 317 this, node_channel.PassServerHandle(), io_task_runner_); |
274 #else | 318 #else |
275 scoped_refptr<NodeChannel> channel = | 319 scoped_refptr<NodeChannel> channel = |
276 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); | 320 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); |
277 #endif | 321 #endif |
278 | 322 |
279 // We set up the child channel with a temporary name so it can be identified | 323 // We set up the child channel with a temporary name so it can be identified |
280 // as a pending child if it writes any messages to the channel. We may start | 324 // as a pending child if it writes any messages to the channel. We may start |
281 // receiving messages from it (though we shouldn't) as soon as Start() is | 325 // receiving messages from it (though we shouldn't) as soon as Start() is |
282 // called below. | 326 // called below. |
283 ports::NodeName token; | |
284 GenerateRandomName(&token); | |
285 | 327 |
286 pending_children_.insert(std::make_pair(token, channel)); | 328 pending_children_.insert(std::make_pair(token, channel)); |
287 RecordPendingChildCount(pending_children_.size()); | 329 RecordPendingChildCount(pending_children_.size()); |
288 | 330 |
289 channel->SetRemoteNodeName(token); | 331 channel->SetRemoteNodeName(token); |
290 channel->SetRemoteProcessHandle(process_handle); | 332 channel->SetRemoteProcessHandle(process_handle); |
291 channel->Start(); | 333 channel->Start(); |
292 | 334 |
293 channel->AcceptChild(name_, token); | 335 channel->AcceptChild(name_, token); |
294 } | 336 } |
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
395 DVLOG(1) << "Dropped peer " << peer; | 437 DVLOG(1) << "Dropped peer " << peer; |
396 } | 438 } |
397 | 439 |
398 pending_peer_messages_.erase(name); | 440 pending_peer_messages_.erase(name); |
399 pending_children_.erase(name); | 441 pending_children_.erase(name); |
400 | 442 |
401 RecordPeerCount(peers_.size()); | 443 RecordPeerCount(peers_.size()); |
402 RecordPendingChildCount(pending_children_.size()); | 444 RecordPendingChildCount(pending_children_.size()); |
403 } | 445 } |
404 | 446 |
447 std::vector<ports::PortRef> ports_to_close; | |
448 { | |
449 // Clean up any reserved ports. | |
450 base::AutoLock lock(reserved_ports_lock_); | |
451 auto it = pending_child_tokens_.find(name); | |
452 if (it != pending_child_tokens_.end()) { | |
453 const std::string& child_token = it->second; | |
454 | |
455 std::vector<std::string> port_tokens; | |
456 for (const auto& port : reserved_ports_) { | |
457 if (port.second.second == child_token) { | |
458 DVLOG(1) << "Closing reserved port: " << port.second.first.name(); | |
459 ports_to_close.push_back(port.second.first); | |
460 port_tokens.push_back(port.first); | |
461 } | |
462 } | |
463 | |
464 // We have to erase reserved ports in a two-step manner because the usual | |
465 // manner of using the returned iterator from map::erase isn't technically | |
466 // valid in C++11 (although it is in C++14). | |
467 for (const auto& token : port_tokens) | |
468 reserved_ports_.erase(token); | |
469 | |
470 pending_child_tokens_.erase(it); | |
471 } | |
472 } | |
473 | |
474 for (const auto& port : ports_to_close) | |
475 node_->ClosePort(port); | |
476 | |
405 node_->LostConnectionToNode(name); | 477 node_->LostConnectionToNode(name); |
406 } | 478 } |
407 | 479 |
408 void NodeController::SendPeerMessage(const ports::NodeName& name, | 480 void NodeController::SendPeerMessage(const ports::NodeName& name, |
409 ports::ScopedMessage message) { | 481 ports::ScopedMessage message) { |
410 Channel::MessagePtr channel_message = | 482 Channel::MessagePtr channel_message = |
411 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); | 483 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); |
412 | 484 |
413 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); | 485 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); |
414 #if defined(OS_WIN) | 486 #if defined(OS_WIN) |
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
525 base::AutoLock lock(peers_lock_); | 597 base::AutoLock lock(peers_lock_); |
526 for (const auto& peer : peers_) | 598 for (const auto& peer : peers_) |
527 all_peers.push_back(peer.second); | 599 all_peers.push_back(peer.second); |
528 for (const auto& peer : pending_children_) | 600 for (const auto& peer : pending_children_) |
529 all_peers.push_back(peer.second); | 601 all_peers.push_back(peer.second); |
530 peers_.clear(); | 602 peers_.clear(); |
531 pending_children_.clear(); | 603 pending_children_.clear(); |
532 pending_peer_messages_.clear(); | 604 pending_peer_messages_.clear(); |
533 } | 605 } |
534 | 606 |
607 std::vector<ports::PortRef> all_reserved_ports; | |
608 { | |
609 base::AutoLock lock(reserved_ports_lock_); | |
610 pending_child_tokens_.clear(); | |
611 for (const auto& port : reserved_ports_) | |
612 all_reserved_ports.push_back(port.second.first); | |
613 reserved_ports_.clear(); | |
614 } | |
615 | |
616 for (const auto& port : all_reserved_ports) | |
617 node_->ClosePort(port); | |
618 | |
535 for (const auto& peer : all_peers) | 619 for (const auto& peer : all_peers) |
536 peer->ShutDown(); | 620 peer->ShutDown(); |
537 | 621 |
538 if (destroy_on_io_thread_shutdown_) | 622 if (destroy_on_io_thread_shutdown_) |
539 delete this; | 623 delete this; |
540 } | 624 } |
541 | 625 |
542 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { | 626 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { |
543 GenerateRandomName(port_name); | 627 GenerateRandomName(port_name); |
544 } | 628 } |
(...skipping 291 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
836 | 920 |
837 ports::PortRef local_port; | 921 ports::PortRef local_port; |
838 { | 922 { |
839 base::AutoLock lock(reserved_ports_lock_); | 923 base::AutoLock lock(reserved_ports_lock_); |
840 auto it = reserved_ports_.find(token); | 924 auto it = reserved_ports_.find(token); |
841 if (it == reserved_ports_.end()) { | 925 if (it == reserved_ports_.end()) { |
842 DVLOG(1) << "Ignoring request to connect to port for unknown token " | 926 DVLOG(1) << "Ignoring request to connect to port for unknown token " |
843 << token; | 927 << token; |
844 return; | 928 return; |
845 } | 929 } |
846 local_port = it->second; | 930 local_port = it->second.first; |
847 } | 931 } |
848 | 932 |
849 int rv = node_->MergePorts(local_port, from_node, connector_port_name); | 933 int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
850 if (rv != ports::OK) | 934 if (rv != ports::OK) |
851 DLOG(ERROR) << "MergePorts failed: " << rv; | 935 DLOG(ERROR) << "MergePorts failed: " << rv; |
852 } | 936 } |
853 | 937 |
854 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, | 938 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |
855 const ports::NodeName& name) { | 939 const ports::NodeName& name) { |
856 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 940 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
958 if (peer) | 1042 if (peer) |
959 peer->PortsMessage(std::move(message)); | 1043 peer->PortsMessage(std::move(message)); |
960 else | 1044 else |
961 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; | 1045 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; |
962 } | 1046 } |
963 #endif | 1047 #endif |
964 | 1048 |
965 void NodeController::OnChannelError(const ports::NodeName& from_node) { | 1049 void NodeController::OnChannelError(const ports::NodeName& from_node) { |
966 if (io_task_runner_->RunsTasksOnCurrentThread()) { | 1050 if (io_task_runner_->RunsTasksOnCurrentThread()) { |
967 DropPeer(from_node); | 1051 DropPeer(from_node); |
1052 // DropPeer may have caused local port closures, so be sure to process any | |
1053 // pending local messages. | |
1054 AcceptIncomingMessages(); | |
968 } else { | 1055 } else { |
969 io_task_runner_->PostTask( | 1056 io_task_runner_->PostTask( |
970 FROM_HERE, | 1057 FROM_HERE, |
971 base::Bind(&NodeController::DropPeer, base::Unretained(this), | 1058 base::Bind(&NodeController::OnChannelError, base::Unretained(this), |
972 from_node)); | 1059 from_node)); |
973 } | 1060 } |
974 } | 1061 } |
975 | 1062 |
976 #if defined(OS_MACOSX) && !defined(OS_IOS) | 1063 #if defined(OS_MACOSX) && !defined(OS_IOS) |
977 MachPortRelay* NodeController::GetMachPortRelay() { | 1064 MachPortRelay* NodeController::GetMachPortRelay() { |
978 { | 1065 { |
979 base::AutoLock lock(parent_lock_); | 1066 base::AutoLock lock(parent_lock_); |
980 // Return null if we're not the root. | 1067 // Return null if we're not the root. |
981 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) | 1068 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) |
(...skipping 28 matching lines...) Expand all Loading... | |
1010 shutdown_callback_flag_.Set(false); | 1097 shutdown_callback_flag_.Set(false); |
1011 } | 1098 } |
1012 | 1099 |
1013 DCHECK(!callback.is_null()); | 1100 DCHECK(!callback.is_null()); |
1014 | 1101 |
1015 callback.Run(); | 1102 callback.Run(); |
1016 } | 1103 } |
1017 | 1104 |
1018 } // namespace edk | 1105 } // namespace edk |
1019 } // namespace mojo | 1106 } // namespace mojo |
OLD | NEW |