OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
92 delete this; | 92 delete this; |
93 } | 93 } |
94 | 94 |
95 const base::Closure callback_; | 95 const base::Closure callback_; |
96 | 96 |
97 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); | 97 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); |
98 }; | 98 }; |
99 | 99 |
100 } // namespace | 100 } // namespace |
101 | 101 |
102 NodeController::PendingPortRequest::PendingPortRequest() {} | |
103 | |
104 NodeController::PendingPortRequest::~PendingPortRequest() {} | |
105 | |
106 NodeController::ReservedPort::ReservedPort() {} | |
107 | |
108 NodeController::ReservedPort::~ReservedPort() {} | |
109 | |
110 NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {} | |
111 | |
112 NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {} | |
113 | |
114 NodeController::~NodeController() {} | 102 NodeController::~NodeController() {} |
115 | 103 |
116 NodeController::NodeController(Core* core) | 104 NodeController::NodeController(Core* core) |
117 : core_(core), | 105 : core_(core), |
118 name_(GetRandomNodeName()), | 106 name_(GetRandomNodeName()), |
119 node_(new ports::Node(name_, this)) { | 107 node_(new ports::Node(name_, this)) { |
120 DVLOG(1) << "Initializing node " << name_; | 108 DVLOG(1) << "Initializing node " << name_; |
121 } | 109 } |
122 | 110 |
123 void NodeController::SetIOTaskRunner( | 111 void NodeController::SetIOTaskRunner( |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
175 if (rv != ports::OK) { | 163 if (rv != ports::OK) { |
176 DCHECK(ports_message); | 164 DCHECK(ports_message); |
177 message->reset(static_cast<PortsMessage*>(ports_message.release())); | 165 message->reset(static_cast<PortsMessage*>(ports_message.release())); |
178 } | 166 } |
179 | 167 |
180 AcceptIncomingMessages(); | 168 AcceptIncomingMessages(); |
181 return rv; | 169 return rv; |
182 } | 170 } |
183 | 171 |
184 void NodeController::ReservePort(const std::string& token, | 172 void NodeController::ReservePort(const std::string& token, |
185 const ReservePortCallback& callback) { | 173 const ports::PortRef& port) { |
186 ports::PortRef port; | |
187 node_->CreateUninitializedPort(&port); | |
188 | |
189 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " | 174 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " |
190 << token; | 175 << token; |
191 | 176 |
192 base::AutoLock lock(reserved_ports_lock_); | 177 base::AutoLock lock(reserved_ports_lock_); |
193 ReservedPort reservation; | 178 auto result = reserved_ports_.insert(std::make_pair(token, port)); |
194 reservation.local_port = port; | 179 DCHECK(result.second); |
195 reservation.callback = callback; | 180 } |
196 reserved_ports_.insert(std::make_pair(token, reservation)); | 181 |
| 182 void NodeController::MergePortIntoParent(const std::string& token, |
| 183 const ports::PortRef& port) { |
| 184 scoped_refptr<NodeChannel> parent = GetParentChannel(); |
| 185 if (parent) { |
| 186 parent->RequestPortMerge(port.name(), token); |
| 187 return; |
| 188 } |
| 189 |
| 190 base::AutoLock lock(pending_port_merges_lock_); |
| 191 pending_port_merges_.push_back(std::make_pair(token, port)); |
197 } | 192 } |
198 | 193 |
199 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( | 194 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( |
200 size_t num_bytes) { | 195 size_t num_bytes) { |
201 scoped_refptr<PlatformSharedBuffer> buffer = | 196 scoped_refptr<PlatformSharedBuffer> buffer = |
202 internal::g_platform_support->CreateSharedBuffer(num_bytes); | 197 internal::g_platform_support->CreateSharedBuffer(num_bytes); |
203 #if defined(OS_POSIX) | 198 #if defined(OS_POSIX) |
204 if (!buffer && broker_) { | 199 if (!buffer && broker_) { |
205 // On POSIX, creating a shared buffer in a sandboxed process will fail, so | 200 // On POSIX, creating a shared buffer in a sandboxed process will fail, so |
206 // fall back to the broker if there is one. | 201 // fall back to the broker if there is one. |
207 buffer = broker_->GetSharedBuffer(num_bytes); | 202 buffer = broker_->GetSharedBuffer(num_bytes); |
208 } | 203 } |
209 #endif | 204 #endif |
210 return buffer; | 205 return buffer; |
211 } | 206 } |
212 | 207 |
213 void NodeController::ConnectToParentPort(const ports::PortRef& local_port, | |
214 const std::string& token, | |
215 const base::Closure& callback) { | |
216 io_task_runner_->PostTask( | |
217 FROM_HERE, | |
218 base::Bind(&NodeController::RequestParentPortConnectionOnIOThread, | |
219 base::Unretained(this), local_port, token, callback)); | |
220 } | |
221 | |
222 void NodeController::ConnectToRemotePort( | |
223 const ports::PortRef& local_port, | |
224 const ports::NodeName& remote_node_name, | |
225 const ports::PortName& remote_port_name, | |
226 const base::Closure& callback) { | |
227 if (remote_node_name == name_) { | |
228 // It's possible that two different code paths on the node are trying to | |
229 // bootstrap ports to each other (e.g. in Chrome single-process mode) | |
230 // without being aware of the fact. In this case we can initialize the port | |
231 // immediately (which can fail silently if it's already been initialized by | |
232 // the request on the other side), and invoke |callback|. | |
233 node_->InitializePort(local_port, name_, remote_port_name); | |
234 callback.Run(); | |
235 return; | |
236 } | |
237 | |
238 PendingRemotePortConnection connection; | |
239 connection.local_port = local_port; | |
240 connection.remote_node_name = remote_node_name; | |
241 connection.remote_port_name = remote_port_name; | |
242 connection.callback = callback; | |
243 io_task_runner_->PostTask( | |
244 FROM_HERE, | |
245 base::Bind(&NodeController::ConnectToRemotePortOnIOThread, | |
246 base::Unretained(this), connection)); | |
247 } | |
248 | |
249 void NodeController::RequestShutdown(const base::Closure& callback) { | 208 void NodeController::RequestShutdown(const base::Closure& callback) { |
250 { | 209 { |
251 base::AutoLock lock(shutdown_lock_); | 210 base::AutoLock lock(shutdown_lock_); |
252 shutdown_callback_ = callback; | 211 shutdown_callback_ = callback; |
253 } | 212 } |
254 | 213 |
255 AttemptShutdownIfRequested(); | 214 AttemptShutdownIfRequested(); |
256 } | 215 } |
257 | 216 |
258 void NodeController::ConnectToChildOnIOThread( | 217 void NodeController::ConnectToChildOnIOThread( |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
297 DCHECK(parent_name_ == ports::kInvalidNodeName); | 256 DCHECK(parent_name_ == ports::kInvalidNodeName); |
298 | 257 |
299 // At this point we don't know the parent's name, so we can't yet insert it | 258 // At this point we don't know the parent's name, so we can't yet insert it |
300 // into our |peers_| map. That will happen as soon as we receive an | 259 // into our |peers_| map. That will happen as soon as we receive an |
301 // AcceptChild message from them. | 260 // AcceptChild message from them. |
302 bootstrap_parent_channel_ = | 261 bootstrap_parent_channel_ = |
303 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); | 262 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); |
304 bootstrap_parent_channel_->Start(); | 263 bootstrap_parent_channel_->Start(); |
305 } | 264 } |
306 | 265 |
307 void NodeController::RequestParentPortConnectionOnIOThread( | |
308 const ports::PortRef& local_port, | |
309 const std::string& token, | |
310 const base::Closure& callback) { | |
311 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | |
312 | |
313 scoped_refptr<NodeChannel> parent = GetParentChannel(); | |
314 if (!parent) { | |
315 PendingPortRequest request; | |
316 request.token = token; | |
317 request.local_port = local_port; | |
318 request.callback = callback; | |
319 pending_port_requests_.push_back(request); | |
320 return; | |
321 } | |
322 | |
323 pending_parent_port_connections_.insert( | |
324 std::make_pair(local_port.name(), callback)); | |
325 parent->RequestPortConnection(local_port.name(), token); | |
326 } | |
327 | |
328 void NodeController::ConnectToRemotePortOnIOThread( | |
329 const PendingRemotePortConnection& connection) { | |
330 scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name); | |
331 if (peer) { | |
332 // It's safe to initialize the port since we already have a channel to its | |
333 // peer. No need to actually send them a message. | |
334 int rv = node_->InitializePort(connection.local_port, | |
335 connection.remote_node_name, | |
336 connection.remote_port_name); | |
337 DCHECK_EQ(rv, ports::OK); | |
338 connection.callback.Run(); | |
339 return; | |
340 } | |
341 | |
342 // Save this for later. We'll initialize the port once this peer is added. | |
343 pending_remote_port_connections_[connection.remote_node_name].push_back( | |
344 connection); | |
345 } | |
346 | |
347 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( | 266 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( |
348 const ports::NodeName& name) { | 267 const ports::NodeName& name) { |
349 base::AutoLock lock(peers_lock_); | 268 base::AutoLock lock(peers_lock_); |
350 auto it = peers_.find(name); | 269 auto it = peers_.find(name); |
351 if (it == peers_.end()) | 270 if (it == peers_.end()) |
352 return nullptr; | 271 return nullptr; |
353 return it->second; | 272 return it->second; |
354 } | 273 } |
355 | 274 |
356 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { | 275 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
407 } | 326 } |
408 | 327 |
409 if (start_channel) | 328 if (start_channel) |
410 channel->Start(); | 329 channel->Start(); |
411 | 330 |
412 // Flush any queued message we need to deliver to this node. | 331 // Flush any queued message we need to deliver to this node. |
413 while (!pending_messages.empty()) { | 332 while (!pending_messages.empty()) { |
414 channel->PortsMessage(std::move(pending_messages.front())); | 333 channel->PortsMessage(std::move(pending_messages.front())); |
415 pending_messages.pop(); | 334 pending_messages.pop(); |
416 } | 335 } |
417 | |
418 // Complete any pending port connections to this peer. | |
419 auto connections_it = pending_remote_port_connections_.find(name); | |
420 if (connections_it != pending_remote_port_connections_.end()) { | |
421 for (const auto& connection : connections_it->second) { | |
422 int rv = node_->InitializePort(connection.local_port, | |
423 connection.remote_node_name, | |
424 connection.remote_port_name); | |
425 DCHECK_EQ(rv, ports::OK); | |
426 connection.callback.Run(); | |
427 } | |
428 pending_remote_port_connections_.erase(connections_it); | |
429 } | |
430 } | 336 } |
431 | 337 |
432 void NodeController::DropPeer(const ports::NodeName& name) { | 338 void NodeController::DropPeer(const ports::NodeName& name) { |
433 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 339 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
434 | 340 |
435 { | 341 { |
436 base::AutoLock lock(peers_lock_); | 342 base::AutoLock lock(peers_lock_); |
437 auto it = peers_.find(name); | 343 auto it = peers_.find(name); |
438 | 344 |
439 if (it != peers_.end()) { | 345 if (it != peers_.end()) { |
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
609 parent_name_ = parent_name; | 515 parent_name_ = parent_name; |
610 parent = bootstrap_parent_channel_; | 516 parent = bootstrap_parent_channel_; |
611 } | 517 } |
612 | 518 |
613 parent->SetRemoteNodeName(parent_name); | 519 parent->SetRemoteNodeName(parent_name); |
614 parent->AcceptParent(token, name_); | 520 parent->AcceptParent(token, name_); |
615 | 521 |
616 // NOTE: The child does not actually add its parent as a peer until | 522 // NOTE: The child does not actually add its parent as a peer until |
617 // receiving an AcceptBrokerClient message from the broker. The parent | 523 // receiving an AcceptBrokerClient message from the broker. The parent |
618 // will request that said message be sent upon receiving AcceptParent. | 524 // will request that said message be sent upon receiving AcceptParent. |
| 525 |
| 526 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; |
619 } | 527 } |
620 | 528 |
621 void NodeController::OnAcceptParent(const ports::NodeName& from_node, | 529 void NodeController::OnAcceptParent(const ports::NodeName& from_node, |
622 const ports::NodeName& token, | 530 const ports::NodeName& token, |
623 const ports::NodeName& child_name) { | 531 const ports::NodeName& child_name) { |
624 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 532 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
625 | 533 |
626 auto it = pending_children_.find(from_node); | 534 auto it = pending_children_.find(from_node); |
627 if (it == pending_children_.end() || token != from_node) { | 535 if (it == pending_children_.end() || token != from_node) { |
628 DLOG(ERROR) << "Received unexpected AcceptParent message from " | 536 DLOG(ERROR) << "Received unexpected AcceptParent message from " |
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
756 scoped_refptr<NodeChannel> broker; | 664 scoped_refptr<NodeChannel> broker; |
757 if (broker_name == parent_name) { | 665 if (broker_name == parent_name) { |
758 DCHECK(!broker_channel.is_valid()); | 666 DCHECK(!broker_channel.is_valid()); |
759 broker = parent; | 667 broker = parent; |
760 } else { | 668 } else { |
761 DCHECK(broker_channel.is_valid()); | 669 DCHECK(broker_channel.is_valid()); |
762 broker = NodeChannel::Create(this, std::move(broker_channel), | 670 broker = NodeChannel::Create(this, std::move(broker_channel), |
763 io_task_runner_); | 671 io_task_runner_); |
764 AddPeer(broker_name, broker, true /* start_channel */); | 672 AddPeer(broker_name, broker, true /* start_channel */); |
765 } | 673 } |
| 674 |
766 AddPeer(parent_name, parent, false /* start_channel */); | 675 AddPeer(parent_name, parent, false /* start_channel */); |
767 | 676 |
768 // Resolve any pending port connections to the parent. | 677 { |
769 for (const auto& request : pending_port_requests_) { | 678 // Complete any port merge requests we have waiting for the parent. |
770 pending_parent_port_connections_.insert( | 679 base::AutoLock lock(pending_port_merges_lock_); |
771 std::make_pair(request.local_port.name(), request.callback)); | 680 for (const auto& request : pending_port_merges_) |
772 parent->RequestPortConnection(request.local_port.name(), request.token); | 681 parent->RequestPortMerge(request.second.name(), request.first); |
| 682 pending_port_merges_.clear(); |
773 } | 683 } |
774 pending_port_requests_.clear(); | |
775 | 684 |
776 // Feed the broker any pending children of our own. | 685 // Feed the broker any pending children of our own. |
777 while (!pending_broker_clients.empty()) { | 686 while (!pending_broker_clients.empty()) { |
778 const ports::NodeName& child_name = pending_broker_clients.front(); | 687 const ports::NodeName& child_name = pending_broker_clients.front(); |
779 auto it = pending_children_.find(child_name); | 688 auto it = pending_children_.find(child_name); |
780 DCHECK(it != pending_children_.end()); | 689 DCHECK(it != pending_children_.end()); |
781 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle()); | 690 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle()); |
782 pending_broker_clients.pop(); | 691 pending_broker_clients.pop(); |
783 } | 692 } |
784 | 693 |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
817 new PortsMessage(num_header_bytes, | 726 new PortsMessage(num_header_bytes, |
818 num_payload_bytes, | 727 num_payload_bytes, |
819 num_ports_bytes, | 728 num_ports_bytes, |
820 std::move(channel_message))); | 729 std::move(channel_message))); |
821 | 730 |
822 node_->AcceptMessage(std::move(message)); | 731 node_->AcceptMessage(std::move(message)); |
823 AcceptIncomingMessages(); | 732 AcceptIncomingMessages(); |
824 AttemptShutdownIfRequested(); | 733 AttemptShutdownIfRequested(); |
825 } | 734 } |
826 | 735 |
827 void NodeController::OnRequestPortConnection( | 736 void NodeController::OnRequestPortMerge( |
828 const ports::NodeName& from_node, | 737 const ports::NodeName& from_node, |
829 const ports::PortName& connector_port_name, | 738 const ports::PortName& connector_port_name, |
830 const std::string& token) { | 739 const std::string& token) { |
831 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 740 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
832 | 741 |
833 DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token " | 742 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " |
834 << token << " and port " << connector_port_name << "@" << from_node; | 743 << token << " and port " << connector_port_name << "@" << from_node; |
835 | 744 |
836 ReservePortCallback callback; | |
837 ports::PortRef local_port; | 745 ports::PortRef local_port; |
838 { | 746 { |
839 base::AutoLock lock(reserved_ports_lock_); | 747 base::AutoLock lock(reserved_ports_lock_); |
840 auto it = reserved_ports_.find(token); | 748 auto it = reserved_ports_.find(token); |
841 if (it == reserved_ports_.end()) { | 749 if (it == reserved_ports_.end()) { |
842 DVLOG(1) << "Ignoring request to connect to port for unknown token " | 750 DVLOG(1) << "Ignoring request to connect to port for unknown token " |
843 << token; | 751 << token; |
844 return; | 752 return; |
845 } | 753 } |
846 local_port = it->second.local_port; | 754 local_port = it->second; |
847 callback = it->second.callback; | |
848 reserved_ports_.erase(it); | |
849 } | 755 } |
850 | 756 |
851 DCHECK(!callback.is_null()); | 757 int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
852 | 758 if (rv != ports::OK) |
853 scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node); | 759 DLOG(ERROR) << "MergePorts failed: " << rv; |
854 if (!peer) { | |
855 DVLOG(1) << "Ignoring request to connect to port from unknown node " | |
856 << from_node; | |
857 return; | |
858 } | |
859 | |
860 // This reserved port should not have been initialized yet. | |
861 CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node, | |
862 connector_port_name)); | |
863 | |
864 peer->ConnectToPort(local_port.name(), connector_port_name); | |
865 callback.Run(local_port); | |
866 } | |
867 | |
868 void NodeController::OnConnectToPort( | |
869 const ports::NodeName& from_node, | |
870 const ports::PortName& connector_port_name, | |
871 const ports::PortName& connectee_port_name) { | |
872 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | |
873 | |
874 DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port " | |
875 << connectee_port_name << " to port " << connector_port_name << "@" | |
876 << from_node; | |
877 | |
878 ports::PortRef connectee_port; | |
879 int rv = node_->GetPort(connectee_port_name, &connectee_port); | |
880 if (rv != ports::OK) { | |
881 DLOG(ERROR) << "Ignoring ConnectToPort for unknown port " | |
882 << connectee_port_name; | |
883 return; | |
884 } | |
885 | |
886 // It's OK if this port has already been initialized. This message is only | |
887 // sent by the remote peer to ensure the port is ready before it starts | |
888 // us sending messages to it. | |
889 ports::PortStatus port_status; | |
890 rv = node_->GetStatus(connectee_port, &port_status); | |
891 if (rv == ports::OK) { | |
892 DVLOG(1) << "Ignoring ConnectToPort for already-initialized port " | |
893 << connectee_port_name; | |
894 return; | |
895 } | |
896 | |
897 CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node, | |
898 connector_port_name)); | |
899 | |
900 auto it = pending_parent_port_connections_.find(connectee_port_name); | |
901 DCHECK(it != pending_parent_port_connections_.end()); | |
902 it->second.Run(); | |
903 pending_parent_port_connections_.erase(it); | |
904 } | 760 } |
905 | 761 |
906 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, | 762 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |
907 const ports::NodeName& name) { | 763 const ports::NodeName& name) { |
908 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 764 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
909 | 765 |
910 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); | 766 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); |
911 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { | 767 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { |
912 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " | 768 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " |
913 << from_node; | 769 << from_node; |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1015 shutdown_callback_.Reset(); | 871 shutdown_callback_.Reset(); |
1016 } | 872 } |
1017 | 873 |
1018 DCHECK(!callback.is_null()); | 874 DCHECK(!callback.is_null()); |
1019 | 875 |
1020 callback.Run(); | 876 callback.Run(); |
1021 } | 877 } |
1022 | 878 |
1023 } // namespace edk | 879 } // namespace edk |
1024 } // namespace mojo | 880 } // namespace mojo |
OLD | NEW |