| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 124 | 124 |
| 125 void NodeController::SetIOTaskRunner( | 125 void NodeController::SetIOTaskRunner( |
| 126 scoped_refptr<base::TaskRunner> task_runner) { | 126 scoped_refptr<base::TaskRunner> task_runner) { |
| 127 io_task_runner_ = task_runner; | 127 io_task_runner_ = task_runner; |
| 128 ThreadDestructionObserver::Create( | 128 ThreadDestructionObserver::Create( |
| 129 io_task_runner_, | 129 io_task_runner_, |
| 130 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); | 130 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); |
| 131 } | 131 } |
| 132 | 132 |
| 133 void NodeController::ConnectToChild(base::ProcessHandle process_handle, | 133 void NodeController::ConnectToChild(base::ProcessHandle process_handle, |
| 134 ScopedPlatformHandle platform_handle) { | 134 ScopedPlatformHandle platform_handle, |
| 135 const std::string& child_token) { |
| 136 // Generate the temporary remote node name here so that it can be associated |
| 137 // with the embedder's child_token. If an error occurs in the child process |
| 138 // after it is launched, but before any reserved ports are connected, this can |
| 139 // be used to clean up any dangling ports. |
| 140 ports::NodeName node_name; |
| 141 GenerateRandomName(&node_name); |
| 142 |
| 143 { |
| 144 base::AutoLock lock(reserved_ports_lock_); |
| 145 bool inserted = pending_child_tokens_.insert( |
| 146 std::make_pair(node_name, child_token)).second; |
| 147 DCHECK(inserted); |
| 148 } |
| 149 |
| 135 io_task_runner_->PostTask( | 150 io_task_runner_->PostTask( |
| 136 FROM_HERE, | 151 FROM_HERE, |
| 137 base::Bind(&NodeController::ConnectToChildOnIOThread, | 152 base::Bind(&NodeController::ConnectToChildOnIOThread, |
| 138 base::Unretained(this), | 153 base::Unretained(this), |
| 139 process_handle, | 154 process_handle, |
| 140 base::Passed(&platform_handle))); | 155 base::Passed(&platform_handle), |
| 156 node_name)); |
| 157 } |
| 158 |
| 159 void NodeController::CloseChildPorts(const std::string& child_token) { |
| 160 std::vector<ports::PortRef> ports_to_close; |
| 161 { |
| 162 std::vector<std::string> port_tokens; |
| 163 base::AutoLock lock(reserved_ports_lock_); |
| 164 for (const auto& port : reserved_ports_) { |
| 165 if (port.second.child_token == child_token) { |
| 166 DVLOG(1) << "Closing reserved port " << port.second.port.name(); |
| 167 ports_to_close.push_back(port.second.port); |
| 168 port_tokens.push_back(port.first); |
| 169 } |
| 170 } |
| 171 |
| 172 for (const auto& token : port_tokens) |
| 173 reserved_ports_.erase(token); |
| 174 } |
| 175 |
| 176 for (const auto& port : ports_to_close) |
| 177 node_->ClosePort(port); |
| 178 |
| 179 // Ensure local port closure messages are processed. |
| 180 AcceptIncomingMessages(); |
| 141 } | 181 } |
| 142 | 182 |
| 143 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { | 183 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { |
| 144 // TODO(amistry): Consider the need for a broker on Windows. | 184 // TODO(amistry): Consider the need for a broker on Windows. |
| 145 #if defined(OS_POSIX) && !defined(OS_MACOSX) | 185 #if defined(OS_POSIX) && !defined(OS_MACOSX) |
| 146 // On posix, use the bootstrap channel for the broker and receive the node's | 186 // On posix, use the bootstrap channel for the broker and receive the node's |
| 147 // channel synchronously as the first message from the broker. | 187 // channel synchronously as the first message from the broker. |
| 148 base::ElapsedTimer timer; | 188 base::ElapsedTimer timer; |
| 149 broker_.reset(new Broker(std::move(platform_handle))); | 189 broker_.reset(new Broker(std::move(platform_handle))); |
| 150 platform_handle = broker_->GetParentPlatformHandle(); | 190 platform_handle = broker_->GetParentPlatformHandle(); |
| (...skipping 25 matching lines...) Expand all Loading... |
| 176 int NodeController::SendMessage(const ports::PortRef& port, | 216 int NodeController::SendMessage(const ports::PortRef& port, |
| 177 std::unique_ptr<PortsMessage> message) { | 217 std::unique_ptr<PortsMessage> message) { |
| 178 ports::ScopedMessage ports_message(message.release()); | 218 ports::ScopedMessage ports_message(message.release()); |
| 179 int rv = node_->SendMessage(port, std::move(ports_message)); | 219 int rv = node_->SendMessage(port, std::move(ports_message)); |
| 180 | 220 |
| 181 AcceptIncomingMessages(); | 221 AcceptIncomingMessages(); |
| 182 return rv; | 222 return rv; |
| 183 } | 223 } |
| 184 | 224 |
| 185 void NodeController::ReservePort(const std::string& token, | 225 void NodeController::ReservePort(const std::string& token, |
| 186 const ports::PortRef& port) { | 226 const ports::PortRef& port, |
| 227 const std::string& child_token) { |
| 187 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " | 228 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " |
| 188 << token; | 229 << token; |
| 189 | 230 |
| 190 base::AutoLock lock(reserved_ports_lock_); | 231 base::AutoLock lock(reserved_ports_lock_); |
| 191 auto result = reserved_ports_.insert(std::make_pair(token, port)); | 232 auto result = reserved_ports_.insert( |
| 233 std::make_pair(token, ReservedPort{port, child_token})); |
| 192 DCHECK(result.second); | 234 DCHECK(result.second); |
| 193 } | 235 } |
| 194 | 236 |
| 195 void NodeController::MergePortIntoParent(const std::string& token, | 237 void NodeController::MergePortIntoParent(const std::string& token, |
| 196 const ports::PortRef& port) { | 238 const ports::PortRef& port) { |
| 197 bool was_merged = false; | 239 bool was_merged = false; |
| 198 { | 240 { |
| 199 // This request may be coming from within the process that reserved the | 241 // This request may be coming from within the process that reserved the |
| 200 // "parent" side (e.g. for Chrome single-process mode), so if this token is | 242 // "parent" side (e.g. for Chrome single-process mode), so if this token is |
| 201 // reserved locally, merge locally instead. | 243 // reserved locally, merge locally instead. |
| 202 base::AutoLock lock(reserved_ports_lock_); | 244 base::AutoLock lock(reserved_ports_lock_); |
| 203 auto it = reserved_ports_.find(token); | 245 auto it = reserved_ports_.find(token); |
| 204 if (it != reserved_ports_.end()) { | 246 if (it != reserved_ports_.end()) { |
| 205 node_->MergePorts(port, name_, it->second.name()); | 247 node_->MergePorts(port, name_, it->second.port.name()); |
| 206 reserved_ports_.erase(it); | 248 reserved_ports_.erase(it); |
| 207 was_merged = true; | 249 was_merged = true; |
| 208 } | 250 } |
| 209 } | 251 } |
| 210 if (was_merged) { | 252 if (was_merged) { |
| 211 AcceptIncomingMessages(); | 253 AcceptIncomingMessages(); |
| 212 return; | 254 return; |
| 213 } | 255 } |
| 214 | 256 |
| 215 scoped_refptr<NodeChannel> parent; | 257 scoped_refptr<NodeChannel> parent; |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 254 base::AutoLock lock(shutdown_lock_); | 296 base::AutoLock lock(shutdown_lock_); |
| 255 shutdown_callback_ = callback; | 297 shutdown_callback_ = callback; |
| 256 shutdown_callback_flag_.Set(true); | 298 shutdown_callback_flag_.Set(true); |
| 257 } | 299 } |
| 258 | 300 |
| 259 AttemptShutdownIfRequested(); | 301 AttemptShutdownIfRequested(); |
| 260 } | 302 } |
| 261 | 303 |
| 262 void NodeController::ConnectToChildOnIOThread( | 304 void NodeController::ConnectToChildOnIOThread( |
| 263 base::ProcessHandle process_handle, | 305 base::ProcessHandle process_handle, |
| 264 ScopedPlatformHandle platform_handle) { | 306 ScopedPlatformHandle platform_handle, |
| 307 ports::NodeName token) { |
| 265 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 308 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 266 | 309 |
| 267 #if defined(OS_POSIX) && !defined(OS_MACOSX) | 310 #if defined(OS_POSIX) && !defined(OS_MACOSX) |
| 268 PlatformChannelPair node_channel; | 311 PlatformChannelPair node_channel; |
| 269 // BrokerHost owns itself. | 312 // BrokerHost owns itself. |
| 270 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); | 313 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); |
| 271 broker_host->SendChannel(node_channel.PassClientHandle()); | 314 broker_host->SendChannel(node_channel.PassClientHandle()); |
| 272 scoped_refptr<NodeChannel> channel = NodeChannel::Create( | 315 scoped_refptr<NodeChannel> channel = NodeChannel::Create( |
| 273 this, node_channel.PassServerHandle(), io_task_runner_); | 316 this, node_channel.PassServerHandle(), io_task_runner_); |
| 274 #else | 317 #else |
| 275 scoped_refptr<NodeChannel> channel = | 318 scoped_refptr<NodeChannel> channel = |
| 276 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); | 319 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); |
| 277 #endif | 320 #endif |
| 278 | 321 |
| 279 // We set up the child channel with a temporary name so it can be identified | 322 // We set up the child channel with a temporary name so it can be identified |
| 280 // as a pending child if it writes any messages to the channel. We may start | 323 // as a pending child if it writes any messages to the channel. We may start |
| 281 // receiving messages from it (though we shouldn't) as soon as Start() is | 324 // receiving messages from it (though we shouldn't) as soon as Start() is |
| 282 // called below. | 325 // called below. |
| 283 ports::NodeName token; | |
| 284 GenerateRandomName(&token); | |
| 285 | 326 |
| 286 pending_children_.insert(std::make_pair(token, channel)); | 327 pending_children_.insert(std::make_pair(token, channel)); |
| 287 RecordPendingChildCount(pending_children_.size()); | 328 RecordPendingChildCount(pending_children_.size()); |
| 288 | 329 |
| 289 channel->SetRemoteNodeName(token); | 330 channel->SetRemoteNodeName(token); |
| 290 channel->SetRemoteProcessHandle(process_handle); | 331 channel->SetRemoteProcessHandle(process_handle); |
| 291 channel->Start(); | 332 channel->Start(); |
| 292 | 333 |
| 293 channel->AcceptChild(name_, token); | 334 channel->AcceptChild(name_, token); |
| 294 } | 335 } |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 395 DVLOG(1) << "Dropped peer " << peer; | 436 DVLOG(1) << "Dropped peer " << peer; |
| 396 } | 437 } |
| 397 | 438 |
| 398 pending_peer_messages_.erase(name); | 439 pending_peer_messages_.erase(name); |
| 399 pending_children_.erase(name); | 440 pending_children_.erase(name); |
| 400 | 441 |
| 401 RecordPeerCount(peers_.size()); | 442 RecordPeerCount(peers_.size()); |
| 402 RecordPendingChildCount(pending_children_.size()); | 443 RecordPendingChildCount(pending_children_.size()); |
| 403 } | 444 } |
| 404 | 445 |
| 446 std::vector<ports::PortRef> ports_to_close; |
| 447 { |
| 448 // Clean up any reserved ports. |
| 449 base::AutoLock lock(reserved_ports_lock_); |
| 450 auto it = pending_child_tokens_.find(name); |
| 451 if (it != pending_child_tokens_.end()) { |
| 452 const std::string& child_token = it->second; |
| 453 |
| 454 std::vector<std::string> port_tokens; |
| 455 for (const auto& port : reserved_ports_) { |
| 456 if (port.second.child_token == child_token) { |
| 457 DVLOG(1) << "Closing reserved port: " << port.second.port.name(); |
| 458 ports_to_close.push_back(port.second.port); |
| 459 port_tokens.push_back(port.first); |
| 460 } |
| 461 } |
| 462 |
| 463 // We have to erase reserved ports in a two-step manner because the usual |
| 464 // manner of using the returned iterator from map::erase isn't technically |
| 465 // valid in C++11 (although it is in C++14). |
| 466 for (const auto& token : port_tokens) |
| 467 reserved_ports_.erase(token); |
| 468 |
| 469 pending_child_tokens_.erase(it); |
| 470 } |
| 471 } |
| 472 |
| 473 for (const auto& port : ports_to_close) |
| 474 node_->ClosePort(port); |
| 475 |
| 405 node_->LostConnectionToNode(name); | 476 node_->LostConnectionToNode(name); |
| 406 } | 477 } |
| 407 | 478 |
| 408 void NodeController::SendPeerMessage(const ports::NodeName& name, | 479 void NodeController::SendPeerMessage(const ports::NodeName& name, |
| 409 ports::ScopedMessage message) { | 480 ports::ScopedMessage message) { |
| 410 Channel::MessagePtr channel_message = | 481 Channel::MessagePtr channel_message = |
| 411 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); | 482 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); |
| 412 | 483 |
| 413 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); | 484 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); |
| 414 #if defined(OS_WIN) | 485 #if defined(OS_WIN) |
| (...skipping 421 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 836 | 907 |
| 837 ports::PortRef local_port; | 908 ports::PortRef local_port; |
| 838 { | 909 { |
| 839 base::AutoLock lock(reserved_ports_lock_); | 910 base::AutoLock lock(reserved_ports_lock_); |
| 840 auto it = reserved_ports_.find(token); | 911 auto it = reserved_ports_.find(token); |
| 841 if (it == reserved_ports_.end()) { | 912 if (it == reserved_ports_.end()) { |
| 842 DVLOG(1) << "Ignoring request to connect to port for unknown token " | 913 DVLOG(1) << "Ignoring request to connect to port for unknown token " |
| 843 << token; | 914 << token; |
| 844 return; | 915 return; |
| 845 } | 916 } |
| 846 local_port = it->second; | 917 local_port = it->second.port; |
| 847 } | 918 } |
| 848 | 919 |
| 849 int rv = node_->MergePorts(local_port, from_node, connector_port_name); | 920 int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
| 850 if (rv != ports::OK) | 921 if (rv != ports::OK) |
| 851 DLOG(ERROR) << "MergePorts failed: " << rv; | 922 DLOG(ERROR) << "MergePorts failed: " << rv; |
| 852 } | 923 } |
| 853 | 924 |
| 854 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, | 925 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |
| 855 const ports::NodeName& name) { | 926 const ports::NodeName& name) { |
| 856 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 927 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 958 if (peer) | 1029 if (peer) |
| 959 peer->PortsMessage(std::move(message)); | 1030 peer->PortsMessage(std::move(message)); |
| 960 else | 1031 else |
| 961 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; | 1032 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; |
| 962 } | 1033 } |
| 963 #endif | 1034 #endif |
| 964 | 1035 |
| 965 void NodeController::OnChannelError(const ports::NodeName& from_node) { | 1036 void NodeController::OnChannelError(const ports::NodeName& from_node) { |
| 966 if (io_task_runner_->RunsTasksOnCurrentThread()) { | 1037 if (io_task_runner_->RunsTasksOnCurrentThread()) { |
| 967 DropPeer(from_node); | 1038 DropPeer(from_node); |
| 1039 // DropPeer may have caused local port closures, so be sure to process any |
| 1040 // pending local messages. |
| 1041 AcceptIncomingMessages(); |
| 968 } else { | 1042 } else { |
| 969 io_task_runner_->PostTask( | 1043 io_task_runner_->PostTask( |
| 970 FROM_HERE, | 1044 FROM_HERE, |
| 971 base::Bind(&NodeController::DropPeer, base::Unretained(this), | 1045 base::Bind(&NodeController::OnChannelError, base::Unretained(this), |
| 972 from_node)); | 1046 from_node)); |
| 973 } | 1047 } |
| 974 } | 1048 } |
| 975 | 1049 |
| 976 #if defined(OS_MACOSX) && !defined(OS_IOS) | 1050 #if defined(OS_MACOSX) && !defined(OS_IOS) |
| 977 MachPortRelay* NodeController::GetMachPortRelay() { | 1051 MachPortRelay* NodeController::GetMachPortRelay() { |
| 978 { | 1052 { |
| 979 base::AutoLock lock(parent_lock_); | 1053 base::AutoLock lock(parent_lock_); |
| 980 // Return null if we're not the root. | 1054 // Return null if we're not the root. |
| 981 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) | 1055 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) |
| (...skipping 28 matching lines...) Expand all Loading... |
| 1010 shutdown_callback_flag_.Set(false); | 1084 shutdown_callback_flag_.Set(false); |
| 1011 } | 1085 } |
| 1012 | 1086 |
| 1013 DCHECK(!callback.is_null()); | 1087 DCHECK(!callback.is_null()); |
| 1014 | 1088 |
| 1015 callback.Run(); | 1089 callback.Run(); |
| 1016 } | 1090 } |
| 1017 | 1091 |
| 1018 } // namespace edk | 1092 } // namespace edk |
| 1019 } // namespace mojo | 1093 } // namespace mojo |
| OLD | NEW |