Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(89)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2019973002: [mojo-edk] Bind a child token to child launches and port reservations. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/test/multiprocess_test_helper.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after
124 124
125 void NodeController::SetIOTaskRunner( 125 void NodeController::SetIOTaskRunner(
126 scoped_refptr<base::TaskRunner> task_runner) { 126 scoped_refptr<base::TaskRunner> task_runner) {
127 io_task_runner_ = task_runner; 127 io_task_runner_ = task_runner;
128 ThreadDestructionObserver::Create( 128 ThreadDestructionObserver::Create(
129 io_task_runner_, 129 io_task_runner_,
130 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); 130 base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
131 } 131 }
132 132
133 void NodeController::ConnectToChild(base::ProcessHandle process_handle, 133 void NodeController::ConnectToChild(base::ProcessHandle process_handle,
134 ScopedPlatformHandle platform_handle) { 134 ScopedPlatformHandle platform_handle,
135 const std::string& child_token) {
136 // Generate the temporary remote node name here so that it can be associated
137 // with the embedder's child_token. If an error occurs in the child process
138 // after it is launched, but before any reserved ports are connected, this can
139 // be used to clean up any dangling ports.
140 ports::NodeName node_name;
141 GenerateRandomName(&node_name);
142
143 {
144 base::AutoLock lock(reserved_ports_lock_);
145 bool inserted = pending_child_tokens_.insert(
146 std::make_pair(node_name, child_token)).second;
147 DCHECK(inserted);
148 }
149
135 io_task_runner_->PostTask( 150 io_task_runner_->PostTask(
136 FROM_HERE, 151 FROM_HERE,
137 base::Bind(&NodeController::ConnectToChildOnIOThread, 152 base::Bind(&NodeController::ConnectToChildOnIOThread,
138 base::Unretained(this), 153 base::Unretained(this),
139 process_handle, 154 process_handle,
140 base::Passed(&platform_handle))); 155 base::Passed(&platform_handle),
156 node_name));
157 }
158
159 void NodeController::CloseChildPorts(const std::string& child_token) {
160 std::vector<ports::PortRef> ports_to_close;
161 {
162 std::vector<std::string> port_tokens;
163 base::AutoLock lock(reserved_ports_lock_);
164 for (const auto& port : reserved_ports_) {
165 if (port.second.child_token == child_token) {
166 DVLOG(1) << "Closing reserved port " << port.second.port.name();
167 ports_to_close.push_back(port.second.port);
168 port_tokens.push_back(port.first);
169 }
170 }
171
172 for (const auto& token : port_tokens)
173 reserved_ports_.erase(token);
174 }
175
176 for (const auto& port : ports_to_close)
177 node_->ClosePort(port);
178
179 // Ensure local port closure messages are processed.
180 AcceptIncomingMessages();
141 } 181 }
142 182
143 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { 183 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
144 // TODO(amistry): Consider the need for a broker on Windows. 184 // TODO(amistry): Consider the need for a broker on Windows.
145 #if defined(OS_POSIX) && !defined(OS_MACOSX) 185 #if defined(OS_POSIX) && !defined(OS_MACOSX)
146 // On posix, use the bootstrap channel for the broker and receive the node's 186 // On posix, use the bootstrap channel for the broker and receive the node's
147 // channel synchronously as the first message from the broker. 187 // channel synchronously as the first message from the broker.
148 base::ElapsedTimer timer; 188 base::ElapsedTimer timer;
149 broker_.reset(new Broker(std::move(platform_handle))); 189 broker_.reset(new Broker(std::move(platform_handle)));
150 platform_handle = broker_->GetParentPlatformHandle(); 190 platform_handle = broker_->GetParentPlatformHandle();
(...skipping 25 matching lines...) Expand all
176 int NodeController::SendMessage(const ports::PortRef& port, 216 int NodeController::SendMessage(const ports::PortRef& port,
177 std::unique_ptr<PortsMessage> message) { 217 std::unique_ptr<PortsMessage> message) {
178 ports::ScopedMessage ports_message(message.release()); 218 ports::ScopedMessage ports_message(message.release());
179 int rv = node_->SendMessage(port, std::move(ports_message)); 219 int rv = node_->SendMessage(port, std::move(ports_message));
180 220
181 AcceptIncomingMessages(); 221 AcceptIncomingMessages();
182 return rv; 222 return rv;
183 } 223 }
184 224
185 void NodeController::ReservePort(const std::string& token, 225 void NodeController::ReservePort(const std::string& token,
186 const ports::PortRef& port) { 226 const ports::PortRef& port,
227 const std::string& child_token) {
187 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " 228 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
188 << token; 229 << token;
189 230
190 base::AutoLock lock(reserved_ports_lock_); 231 base::AutoLock lock(reserved_ports_lock_);
191 auto result = reserved_ports_.insert(std::make_pair(token, port)); 232 auto result = reserved_ports_.insert(
233 std::make_pair(token, ReservedPort{port, child_token}));
192 DCHECK(result.second); 234 DCHECK(result.second);
193 } 235 }
194 236
195 void NodeController::MergePortIntoParent(const std::string& token, 237 void NodeController::MergePortIntoParent(const std::string& token,
196 const ports::PortRef& port) { 238 const ports::PortRef& port) {
197 bool was_merged = false; 239 bool was_merged = false;
198 { 240 {
199 // This request may be coming from within the process that reserved the 241 // This request may be coming from within the process that reserved the
200 // "parent" side (e.g. for Chrome single-process mode), so if this token is 242 // "parent" side (e.g. for Chrome single-process mode), so if this token is
201 // reserved locally, merge locally instead. 243 // reserved locally, merge locally instead.
202 base::AutoLock lock(reserved_ports_lock_); 244 base::AutoLock lock(reserved_ports_lock_);
203 auto it = reserved_ports_.find(token); 245 auto it = reserved_ports_.find(token);
204 if (it != reserved_ports_.end()) { 246 if (it != reserved_ports_.end()) {
205 node_->MergePorts(port, name_, it->second.name()); 247 node_->MergePorts(port, name_, it->second.port.name());
206 reserved_ports_.erase(it); 248 reserved_ports_.erase(it);
207 was_merged = true; 249 was_merged = true;
208 } 250 }
209 } 251 }
210 if (was_merged) { 252 if (was_merged) {
211 AcceptIncomingMessages(); 253 AcceptIncomingMessages();
212 return; 254 return;
213 } 255 }
214 256
215 scoped_refptr<NodeChannel> parent; 257 scoped_refptr<NodeChannel> parent;
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
254 base::AutoLock lock(shutdown_lock_); 296 base::AutoLock lock(shutdown_lock_);
255 shutdown_callback_ = callback; 297 shutdown_callback_ = callback;
256 shutdown_callback_flag_.Set(true); 298 shutdown_callback_flag_.Set(true);
257 } 299 }
258 300
259 AttemptShutdownIfRequested(); 301 AttemptShutdownIfRequested();
260 } 302 }
261 303
262 void NodeController::ConnectToChildOnIOThread( 304 void NodeController::ConnectToChildOnIOThread(
263 base::ProcessHandle process_handle, 305 base::ProcessHandle process_handle,
264 ScopedPlatformHandle platform_handle) { 306 ScopedPlatformHandle platform_handle,
307 ports::NodeName token) {
265 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 308 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
266 309
267 #if defined(OS_POSIX) && !defined(OS_MACOSX) 310 #if defined(OS_POSIX) && !defined(OS_MACOSX)
268 PlatformChannelPair node_channel; 311 PlatformChannelPair node_channel;
269 // BrokerHost owns itself. 312 // BrokerHost owns itself.
270 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); 313 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle));
271 broker_host->SendChannel(node_channel.PassClientHandle()); 314 broker_host->SendChannel(node_channel.PassClientHandle());
272 scoped_refptr<NodeChannel> channel = NodeChannel::Create( 315 scoped_refptr<NodeChannel> channel = NodeChannel::Create(
273 this, node_channel.PassServerHandle(), io_task_runner_); 316 this, node_channel.PassServerHandle(), io_task_runner_);
274 #else 317 #else
275 scoped_refptr<NodeChannel> channel = 318 scoped_refptr<NodeChannel> channel =
276 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); 319 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_);
277 #endif 320 #endif
278 321
279 // We set up the child channel with a temporary name so it can be identified 322 // We set up the child channel with a temporary name so it can be identified
280 // as a pending child if it writes any messages to the channel. We may start 323 // as a pending child if it writes any messages to the channel. We may start
281 // receiving messages from it (though we shouldn't) as soon as Start() is 324 // receiving messages from it (though we shouldn't) as soon as Start() is
282 // called below. 325 // called below.
283 ports::NodeName token;
284 GenerateRandomName(&token);
285 326
286 pending_children_.insert(std::make_pair(token, channel)); 327 pending_children_.insert(std::make_pair(token, channel));
287 RecordPendingChildCount(pending_children_.size()); 328 RecordPendingChildCount(pending_children_.size());
288 329
289 channel->SetRemoteNodeName(token); 330 channel->SetRemoteNodeName(token);
290 channel->SetRemoteProcessHandle(process_handle); 331 channel->SetRemoteProcessHandle(process_handle);
291 channel->Start(); 332 channel->Start();
292 333
293 channel->AcceptChild(name_, token); 334 channel->AcceptChild(name_, token);
294 } 335 }
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
395 DVLOG(1) << "Dropped peer " << peer; 436 DVLOG(1) << "Dropped peer " << peer;
396 } 437 }
397 438
398 pending_peer_messages_.erase(name); 439 pending_peer_messages_.erase(name);
399 pending_children_.erase(name); 440 pending_children_.erase(name);
400 441
401 RecordPeerCount(peers_.size()); 442 RecordPeerCount(peers_.size());
402 RecordPendingChildCount(pending_children_.size()); 443 RecordPendingChildCount(pending_children_.size());
403 } 444 }
404 445
446 std::vector<ports::PortRef> ports_to_close;
447 {
448 // Clean up any reserved ports.
449 base::AutoLock lock(reserved_ports_lock_);
450 auto it = pending_child_tokens_.find(name);
451 if (it != pending_child_tokens_.end()) {
452 const std::string& child_token = it->second;
453
454 std::vector<std::string> port_tokens;
455 for (const auto& port : reserved_ports_) {
456 if (port.second.child_token == child_token) {
457 DVLOG(1) << "Closing reserved port: " << port.second.port.name();
458 ports_to_close.push_back(port.second.port);
459 port_tokens.push_back(port.first);
460 }
461 }
462
463 // We have to erase reserved ports in a two-step manner because the usual
464 // manner of using the returned iterator from map::erase isn't technically
465 // valid in C++11 (although it is in C++14).
466 for (const auto& token : port_tokens)
467 reserved_ports_.erase(token);
468
469 pending_child_tokens_.erase(it);
470 }
471 }
472
473 for (const auto& port : ports_to_close)
474 node_->ClosePort(port);
475
405 node_->LostConnectionToNode(name); 476 node_->LostConnectionToNode(name);
406 } 477 }
407 478
408 void NodeController::SendPeerMessage(const ports::NodeName& name, 479 void NodeController::SendPeerMessage(const ports::NodeName& name,
409 ports::ScopedMessage message) { 480 ports::ScopedMessage message) {
410 Channel::MessagePtr channel_message = 481 Channel::MessagePtr channel_message =
411 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); 482 static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
412 483
413 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); 484 scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
414 #if defined(OS_WIN) 485 #if defined(OS_WIN)
(...skipping 421 matching lines...) Expand 10 before | Expand all | Expand 10 after
836 907
837 ports::PortRef local_port; 908 ports::PortRef local_port;
838 { 909 {
839 base::AutoLock lock(reserved_ports_lock_); 910 base::AutoLock lock(reserved_ports_lock_);
840 auto it = reserved_ports_.find(token); 911 auto it = reserved_ports_.find(token);
841 if (it == reserved_ports_.end()) { 912 if (it == reserved_ports_.end()) {
842 DVLOG(1) << "Ignoring request to connect to port for unknown token " 913 DVLOG(1) << "Ignoring request to connect to port for unknown token "
843 << token; 914 << token;
844 return; 915 return;
845 } 916 }
846 local_port = it->second; 917 local_port = it->second.port;
847 } 918 }
848 919
849 int rv = node_->MergePorts(local_port, from_node, connector_port_name); 920 int rv = node_->MergePorts(local_port, from_node, connector_port_name);
850 if (rv != ports::OK) 921 if (rv != ports::OK)
851 DLOG(ERROR) << "MergePorts failed: " << rv; 922 DLOG(ERROR) << "MergePorts failed: " << rv;
852 } 923 }
853 924
854 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 925 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
855 const ports::NodeName& name) { 926 const ports::NodeName& name) {
856 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 927 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
958 if (peer) 1029 if (peer)
959 peer->PortsMessage(std::move(message)); 1030 peer->PortsMessage(std::move(message));
960 else 1031 else
961 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; 1032 DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
962 } 1033 }
963 #endif 1034 #endif
964 1035
965 void NodeController::OnChannelError(const ports::NodeName& from_node) { 1036 void NodeController::OnChannelError(const ports::NodeName& from_node) {
966 if (io_task_runner_->RunsTasksOnCurrentThread()) { 1037 if (io_task_runner_->RunsTasksOnCurrentThread()) {
967 DropPeer(from_node); 1038 DropPeer(from_node);
1039 // DropPeer may have caused local port closures, so be sure to process any
1040 // pending local messages.
1041 AcceptIncomingMessages();
968 } else { 1042 } else {
969 io_task_runner_->PostTask( 1043 io_task_runner_->PostTask(
970 FROM_HERE, 1044 FROM_HERE,
971 base::Bind(&NodeController::DropPeer, base::Unretained(this), 1045 base::Bind(&NodeController::OnChannelError, base::Unretained(this),
972 from_node)); 1046 from_node));
973 } 1047 }
974 } 1048 }
975 1049
976 #if defined(OS_MACOSX) && !defined(OS_IOS) 1050 #if defined(OS_MACOSX) && !defined(OS_IOS)
977 MachPortRelay* NodeController::GetMachPortRelay() { 1051 MachPortRelay* NodeController::GetMachPortRelay() {
978 { 1052 {
979 base::AutoLock lock(parent_lock_); 1053 base::AutoLock lock(parent_lock_);
980 // Return null if we're not the root. 1054 // Return null if we're not the root.
981 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) 1055 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName)
(...skipping 28 matching lines...) Expand all
1010 shutdown_callback_flag_.Set(false); 1084 shutdown_callback_flag_.Set(false);
1011 } 1085 }
1012 1086
1013 DCHECK(!callback.is_null()); 1087 DCHECK(!callback.is_null());
1014 1088
1015 callback.Run(); 1089 callback.Run();
1016 } 1090 }
1017 1091
1018 } // namespace edk 1092 } // namespace edk
1019 } // namespace mojo 1093 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/test/multiprocess_test_helper.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698