Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(39)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2019973002: [mojo-edk] Bind a child token to child launches and port reservations. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: arc fix Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after
124 124
125 void NodeController::SetIOTaskRunner( 125 void NodeController::SetIOTaskRunner(
126 scoped_refptr<base::TaskRunner> task_runner) { 126 scoped_refptr<base::TaskRunner> task_runner) {
127 io_task_runner_ = task_runner; 127 io_task_runner_ = task_runner;
128 ThreadDestructionObserver::Create( 128 ThreadDestructionObserver::Create(
129 io_task_runner_, 129 io_task_runner_,
130 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); 130 base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
131 } 131 }
132 132
133 void NodeController::ConnectToChild(base::ProcessHandle process_handle, 133 void NodeController::ConnectToChild(base::ProcessHandle process_handle,
134 ScopedPlatformHandle platform_handle) { 134 ScopedPlatformHandle platform_handle,
135 const std::string& child_token) {
136 // Generate the temporary remote node name here so that it can be associated
137 // with the embedder's child_token. If an error occurs in the child process
138 // after it is launched, but before any reserved ports are connected, this can
139 // be used to clean up any dangling ports.
140 ports::NodeName node_name;
141 GenerateRandomName(&node_name);
142
143 {
144 base::AutoLock lock(reserved_ports_lock_);
145 bool inserted = pending_child_tokens_.insert(
146 std::make_pair(node_name, child_token)).second;
147 DCHECK(inserted);
148 }
149
135 io_task_runner_->PostTask( 150 io_task_runner_->PostTask(
136 FROM_HERE, 151 FROM_HERE,
137 base::Bind(&NodeController::ConnectToChildOnIOThread, 152 base::Bind(&NodeController::ConnectToChildOnIOThread,
138 base::Unretained(this), 153 base::Unretained(this),
139 process_handle, 154 process_handle,
140 base::Passed(&platform_handle))); 155 base::Passed(&platform_handle),
156 node_name));
157 }
158
159 void NodeController::CloseChildPorts(const std::string& child_token) {
160 std::vector<ports::PortRef> ports_to_close;
161 {
162 std::vector<std::string> port_tokens;
163 base::AutoLock lock(reserved_ports_lock_);
164 for (const auto& port : reserved_ports_) {
165 if (port.second.second == child_token) {
166 DVLOG(1) << "Closing reserved port " << port.second.first.name();
167 ports_to_close.push_back(port.second.first);
168 port_tokens.push_back(port.first);
169 }
170 }
171
172 for (const auto& token : port_tokens)
173 reserved_ports_.erase(token);
174 }
175
176 for (const auto& port : ports_to_close)
177 node_->ClosePort(port);
178
179 // Ensure local port closure messages are processes.
Ken Rockot(use gerrit already) 2016/06/01 15:40:48 nit: processed*
Anand Mistry (off Chromium) 2016/06/02 04:35:36 Done.
180 AcceptIncomingMessages();
141 } 181 }
142 182
143 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) { 183 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
144 // TODO(amistry): Consider the need for a broker on Windows. 184 // TODO(amistry): Consider the need for a broker on Windows.
145 #if defined(OS_POSIX) && !defined(OS_MACOSX) 185 #if defined(OS_POSIX) && !defined(OS_MACOSX)
146 // On posix, use the bootstrap channel for the broker and receive the node's 186 // On posix, use the bootstrap channel for the broker and receive the node's
147 // channel synchronously as the first message from the broker. 187 // channel synchronously as the first message from the broker.
148 base::ElapsedTimer timer; 188 base::ElapsedTimer timer;
149 broker_.reset(new Broker(std::move(platform_handle))); 189 broker_.reset(new Broker(std::move(platform_handle)));
150 platform_handle = broker_->GetParentPlatformHandle(); 190 platform_handle = broker_->GetParentPlatformHandle();
(...skipping 25 matching lines...) Expand all
176 int NodeController::SendMessage(const ports::PortRef& port, 216 int NodeController::SendMessage(const ports::PortRef& port,
177 std::unique_ptr<PortsMessage> message) { 217 std::unique_ptr<PortsMessage> message) {
178 ports::ScopedMessage ports_message(message.release()); 218 ports::ScopedMessage ports_message(message.release());
179 int rv = node_->SendMessage(port, std::move(ports_message)); 219 int rv = node_->SendMessage(port, std::move(ports_message));
180 220
181 AcceptIncomingMessages(); 221 AcceptIncomingMessages();
182 return rv; 222 return rv;
183 } 223 }
184 224
185 void NodeController::ReservePort(const std::string& token, 225 void NodeController::ReservePort(const std::string& token,
186 const ports::PortRef& port) { 226 const ports::PortRef& port,
227 const std::string& child_token) {
187 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " 228 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
188 << token; 229 << token;
189 230
190 base::AutoLock lock(reserved_ports_lock_); 231 base::AutoLock lock(reserved_ports_lock_);
191 auto result = reserved_ports_.insert(std::make_pair(token, port)); 232 auto result = reserved_ports_.insert(
233 std::make_pair(token, std::make_pair(port, child_token)));
192 DCHECK(result.second); 234 DCHECK(result.second);
193 } 235 }
194 236
195 void NodeController::MergePortIntoParent(const std::string& token, 237 void NodeController::MergePortIntoParent(const std::string& token,
196 const ports::PortRef& port) { 238 const ports::PortRef& port) {
197 bool was_merged = false; 239 bool was_merged = false;
198 { 240 {
199 // This request may be coming from within the process that reserved the 241 // This request may be coming from within the process that reserved the
200 // "parent" side (e.g. for Chrome single-process mode), so if this token is 242 // "parent" side (e.g. for Chrome single-process mode), so if this token is
201 // reserved locally, merge locally instead. 243 // reserved locally, merge locally instead.
202 base::AutoLock lock(reserved_ports_lock_); 244 base::AutoLock lock(reserved_ports_lock_);
203 auto it = reserved_ports_.find(token); 245 auto it = reserved_ports_.find(token);
204 if (it != reserved_ports_.end()) { 246 if (it != reserved_ports_.end()) {
205 node_->MergePorts(port, name_, it->second.name()); 247 std::string child_token = std::move(it->second.second);
248 node_->MergePorts(port, name_, it->second.first.name());
206 reserved_ports_.erase(it); 249 reserved_ports_.erase(it);
207 was_merged = true; 250 was_merged = true;
208 } 251 }
209 } 252 }
210 if (was_merged) { 253 if (was_merged) {
211 AcceptIncomingMessages(); 254 AcceptIncomingMessages();
212 return; 255 return;
213 } 256 }
214 257
215 scoped_refptr<NodeChannel> parent; 258 scoped_refptr<NodeChannel> parent;
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
254 base::AutoLock lock(shutdown_lock_); 297 base::AutoLock lock(shutdown_lock_);
255 shutdown_callback_ = callback; 298 shutdown_callback_ = callback;
256 shutdown_callback_flag_.Set(true); 299 shutdown_callback_flag_.Set(true);
257 } 300 }
258 301
259 AttemptShutdownIfRequested(); 302 AttemptShutdownIfRequested();
260 } 303 }
261 304
262 void NodeController::ConnectToChildOnIOThread( 305 void NodeController::ConnectToChildOnIOThread(
263 base::ProcessHandle process_handle, 306 base::ProcessHandle process_handle,
264 ScopedPlatformHandle platform_handle) { 307 ScopedPlatformHandle platform_handle,
308 ports::NodeName token) {
265 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 309 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
266 310
267 #if defined(OS_POSIX) && !defined(OS_MACOSX) 311 #if defined(OS_POSIX) && !defined(OS_MACOSX)
268 PlatformChannelPair node_channel; 312 PlatformChannelPair node_channel;
269 // BrokerHost owns itself. 313 // BrokerHost owns itself.
270 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); 314 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle));
271 broker_host->SendChannel(node_channel.PassClientHandle()); 315 broker_host->SendChannel(node_channel.PassClientHandle());
272 scoped_refptr<NodeChannel> channel = NodeChannel::Create( 316 scoped_refptr<NodeChannel> channel = NodeChannel::Create(
273 this, node_channel.PassServerHandle(), io_task_runner_); 317 this, node_channel.PassServerHandle(), io_task_runner_);
274 #else 318 #else
275 scoped_refptr<NodeChannel> channel = 319 scoped_refptr<NodeChannel> channel =
276 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); 320 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_);
277 #endif 321 #endif
278 322
279 // We set up the child channel with a temporary name so it can be identified 323 // We set up the child channel with a temporary name so it can be identified
280 // as a pending child if it writes any messages to the channel. We may start 324 // as a pending child if it writes any messages to the channel. We may start
281 // receiving messages from it (though we shouldn't) as soon as Start() is 325 // receiving messages from it (though we shouldn't) as soon as Start() is
282 // called below. 326 // called below.
283 ports::NodeName token;
284 GenerateRandomName(&token);
285 327
286 pending_children_.insert(std::make_pair(token, channel)); 328 pending_children_.insert(std::make_pair(token, channel));
287 RecordPendingChildCount(pending_children_.size()); 329 RecordPendingChildCount(pending_children_.size());
288 330
289 channel->SetRemoteNodeName(token); 331 channel->SetRemoteNodeName(token);
290 channel->SetRemoteProcessHandle(process_handle); 332 channel->SetRemoteProcessHandle(process_handle);
291 channel->Start(); 333 channel->Start();
292 334
293 channel->AcceptChild(name_, token); 335 channel->AcceptChild(name_, token);
294 } 336 }
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
395 DVLOG(1) << "Dropped peer " << peer; 437 DVLOG(1) << "Dropped peer " << peer;
396 } 438 }
397 439
398 pending_peer_messages_.erase(name); 440 pending_peer_messages_.erase(name);
399 pending_children_.erase(name); 441 pending_children_.erase(name);
400 442
401 RecordPeerCount(peers_.size()); 443 RecordPeerCount(peers_.size());
402 RecordPendingChildCount(pending_children_.size()); 444 RecordPendingChildCount(pending_children_.size());
403 } 445 }
404 446
447 std::vector<ports::PortRef> ports_to_close;
448 {
449 // Clean up any reserved ports.
450 base::AutoLock lock(reserved_ports_lock_);
451 auto it = pending_child_tokens_.find(name);
452 if (it != pending_child_tokens_.end()) {
453 const std::string& child_token = it->second;
454
455 std::vector<std::string> port_tokens;
456 for (const auto& port : reserved_ports_) {
457 if (port.second.second == child_token) {
458 DVLOG(1) << "Closing reserved port: " << port.second.first.name();
459 ports_to_close.push_back(port.second.first);
460 port_tokens.push_back(port.first);
461 }
462 }
463
464 // We have to erase reserved ports in a two-step manner because the usual
465 // manner of using the returned iterator from map::erase isn't technically
466 // valid in C++11 (although it is in C++14).
467 for (const auto& token : port_tokens)
468 reserved_ports_.erase(token);
469
470 pending_child_tokens_.erase(it);
471 }
472 }
473
474 for (const auto& port : ports_to_close)
475 node_->ClosePort(port);
476
405 node_->LostConnectionToNode(name); 477 node_->LostConnectionToNode(name);
406 } 478 }
407 479
408 void NodeController::SendPeerMessage(const ports::NodeName& name, 480 void NodeController::SendPeerMessage(const ports::NodeName& name,
409 ports::ScopedMessage message) { 481 ports::ScopedMessage message) {
410 Channel::MessagePtr channel_message = 482 Channel::MessagePtr channel_message =
411 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); 483 static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
412 484
413 scoped_refptr<NodeChannel> peer = GetPeerChannel(name); 485 scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
414 #if defined(OS_WIN) 486 #if defined(OS_WIN)
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after
525 base::AutoLock lock(peers_lock_); 597 base::AutoLock lock(peers_lock_);
526 for (const auto& peer : peers_) 598 for (const auto& peer : peers_)
527 all_peers.push_back(peer.second); 599 all_peers.push_back(peer.second);
528 for (const auto& peer : pending_children_) 600 for (const auto& peer : pending_children_)
529 all_peers.push_back(peer.second); 601 all_peers.push_back(peer.second);
530 peers_.clear(); 602 peers_.clear();
531 pending_children_.clear(); 603 pending_children_.clear();
532 pending_peer_messages_.clear(); 604 pending_peer_messages_.clear();
533 } 605 }
534 606
607 std::vector<ports::PortRef> all_reserved_ports;
608 {
609 base::AutoLock lock(reserved_ports_lock_);
610 pending_child_tokens_.clear();
611 for (const auto& port : reserved_ports_)
612 all_reserved_ports.push_back(port.second.first);
613 reserved_ports_.clear();
614 }
615
616 for (const auto& port : all_reserved_ports)
617 node_->ClosePort(port);
618
535 for (const auto& peer : all_peers) 619 for (const auto& peer : all_peers)
536 peer->ShutDown(); 620 peer->ShutDown();
537 621
538 if (destroy_on_io_thread_shutdown_) 622 if (destroy_on_io_thread_shutdown_)
539 delete this; 623 delete this;
540 } 624 }
Ken Rockot(use gerrit already) 2016/06/01 15:40:48 Huh. It probably doesn't make much difference in p
Anand Mistry (off Chromium) 2016/06/02 04:35:36 There'a problem with running AcceptIncomingMessage
541 625
542 void NodeController::GenerateRandomPortName(ports::PortName* port_name) { 626 void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
543 GenerateRandomName(port_name); 627 GenerateRandomName(port_name);
544 } 628 }
545 629
546 void NodeController::AllocMessage(size_t num_header_bytes, 630 void NodeController::AllocMessage(size_t num_header_bytes,
547 ports::ScopedMessage* message) { 631 ports::ScopedMessage* message) {
548 message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr)); 632 message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
549 } 633 }
550 634
(...skipping 285 matching lines...) Expand 10 before | Expand all | Expand 10 after
836 920
837 ports::PortRef local_port; 921 ports::PortRef local_port;
838 { 922 {
839 base::AutoLock lock(reserved_ports_lock_); 923 base::AutoLock lock(reserved_ports_lock_);
840 auto it = reserved_ports_.find(token); 924 auto it = reserved_ports_.find(token);
841 if (it == reserved_ports_.end()) { 925 if (it == reserved_ports_.end()) {
842 DVLOG(1) << "Ignoring request to connect to port for unknown token " 926 DVLOG(1) << "Ignoring request to connect to port for unknown token "
843 << token; 927 << token;
844 return; 928 return;
845 } 929 }
846 local_port = it->second; 930 local_port = it->second.first;
847 } 931 }
848 932
849 int rv = node_->MergePorts(local_port, from_node, connector_port_name); 933 int rv = node_->MergePorts(local_port, from_node, connector_port_name);
850 if (rv != ports::OK) 934 if (rv != ports::OK)
851 DLOG(ERROR) << "MergePorts failed: " << rv; 935 DLOG(ERROR) << "MergePorts failed: " << rv;
852 } 936 }
853 937
854 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 938 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
855 const ports::NodeName& name) { 939 const ports::NodeName& name) {
856 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 940 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
958 if (peer) 1042 if (peer)
959 peer->PortsMessage(std::move(message)); 1043 peer->PortsMessage(std::move(message));
960 else 1044 else
961 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; 1045 DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
962 } 1046 }
963 #endif 1047 #endif
964 1048
965 void NodeController::OnChannelError(const ports::NodeName& from_node) { 1049 void NodeController::OnChannelError(const ports::NodeName& from_node) {
966 if (io_task_runner_->RunsTasksOnCurrentThread()) { 1050 if (io_task_runner_->RunsTasksOnCurrentThread()) {
967 DropPeer(from_node); 1051 DropPeer(from_node);
1052 // DropPeer may have caused local port closures, so be sure to process any
1053 // pending local messages.
1054 AcceptIncomingMessages();
968 } else { 1055 } else {
969 io_task_runner_->PostTask( 1056 io_task_runner_->PostTask(
970 FROM_HERE, 1057 FROM_HERE,
971 base::Bind(&NodeController::DropPeer, base::Unretained(this), 1058 base::Bind(&NodeController::OnChannelError, base::Unretained(this),
972 from_node)); 1059 from_node));
973 } 1060 }
974 } 1061 }
975 1062
976 #if defined(OS_MACOSX) && !defined(OS_IOS) 1063 #if defined(OS_MACOSX) && !defined(OS_IOS)
977 MachPortRelay* NodeController::GetMachPortRelay() { 1064 MachPortRelay* NodeController::GetMachPortRelay() {
978 { 1065 {
979 base::AutoLock lock(parent_lock_); 1066 base::AutoLock lock(parent_lock_);
980 // Return null if we're not the root. 1067 // Return null if we're not the root.
981 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) 1068 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName)
(...skipping 28 matching lines...) Expand all
1010 shutdown_callback_flag_.Set(false); 1097 shutdown_callback_flag_.Set(false);
1011 } 1098 }
1012 1099
1013 DCHECK(!callback.is_null()); 1100 DCHECK(!callback.is_null());
1014 1101
1015 callback.Run(); 1102 callback.Run();
1016 } 1103 }
1017 1104
1018 } // namespace edk 1105 } // namespace edk
1019 } // namespace mojo 1106 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698