Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(158)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 1678333003: Revert of [mojo-edk] Simplify multiprocess pipe bootstrap (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports/event.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 delete this; 92 delete this;
93 } 93 }
94 94
95 const base::Closure callback_; 95 const base::Closure callback_;
96 96
97 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); 97 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
98 }; 98 };
99 99
100 } // namespace 100 } // namespace
101 101
102 NodeController::PendingPortRequest::PendingPortRequest() {}
103
104 NodeController::PendingPortRequest::~PendingPortRequest() {}
105
106 NodeController::ReservedPort::ReservedPort() {}
107
108 NodeController::ReservedPort::~ReservedPort() {}
109
110 NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {}
111
112 NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {}
113
102 NodeController::~NodeController() {} 114 NodeController::~NodeController() {}
103 115
104 NodeController::NodeController(Core* core) 116 NodeController::NodeController(Core* core)
105 : core_(core), 117 : core_(core),
106 name_(GetRandomNodeName()), 118 name_(GetRandomNodeName()),
107 node_(new ports::Node(name_, this)) { 119 node_(new ports::Node(name_, this)) {
108 DVLOG(1) << "Initializing node " << name_; 120 DVLOG(1) << "Initializing node " << name_;
109 } 121 }
110 122
111 void NodeController::SetIOTaskRunner( 123 void NodeController::SetIOTaskRunner(
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
163 if (rv != ports::OK) { 175 if (rv != ports::OK) {
164 DCHECK(ports_message); 176 DCHECK(ports_message);
165 message->reset(static_cast<PortsMessage*>(ports_message.release())); 177 message->reset(static_cast<PortsMessage*>(ports_message.release()));
166 } 178 }
167 179
168 AcceptIncomingMessages(); 180 AcceptIncomingMessages();
169 return rv; 181 return rv;
170 } 182 }
171 183
172 void NodeController::ReservePort(const std::string& token, 184 void NodeController::ReservePort(const std::string& token,
173 const ports::PortRef& port) { 185 const ReservePortCallback& callback) {
186 ports::PortRef port;
187 node_->CreateUninitializedPort(&port);
188
174 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " 189 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
175 << token; 190 << token;
176 191
177 base::AutoLock lock(reserved_ports_lock_); 192 base::AutoLock lock(reserved_ports_lock_);
178 auto result = reserved_ports_.insert(std::make_pair(token, port)); 193 ReservedPort reservation;
179 DCHECK(result.second); 194 reservation.local_port = port;
180 } 195 reservation.callback = callback;
181 196 reserved_ports_.insert(std::make_pair(token, reservation));
182 void NodeController::MergePortIntoParent(const std::string& token,
183 const ports::PortRef& port) {
184 scoped_refptr<NodeChannel> parent = GetParentChannel();
185 if (parent) {
186 parent->RequestPortMerge(port.name(), token);
187 return;
188 }
189
190 base::AutoLock lock(pending_port_merges_lock_);
191 pending_port_merges_.push_back(std::make_pair(token, port));
192 } 197 }
193 198
194 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( 199 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
195 size_t num_bytes) { 200 size_t num_bytes) {
196 scoped_refptr<PlatformSharedBuffer> buffer = 201 scoped_refptr<PlatformSharedBuffer> buffer =
197 internal::g_platform_support->CreateSharedBuffer(num_bytes); 202 internal::g_platform_support->CreateSharedBuffer(num_bytes);
198 #if defined(OS_POSIX) 203 #if defined(OS_POSIX)
199 if (!buffer && broker_) { 204 if (!buffer && broker_) {
200 // On POSIX, creating a shared buffer in a sandboxed process will fail, so 205 // On POSIX, creating a shared buffer in a sandboxed process will fail, so
201 // fall back to the broker if there is one. 206 // fall back to the broker if there is one.
202 buffer = broker_->GetSharedBuffer(num_bytes); 207 buffer = broker_->GetSharedBuffer(num_bytes);
203 } 208 }
204 #endif 209 #endif
205 return buffer; 210 return buffer;
206 } 211 }
207 212
213 void NodeController::ConnectToParentPort(const ports::PortRef& local_port,
214 const std::string& token,
215 const base::Closure& callback) {
216 io_task_runner_->PostTask(
217 FROM_HERE,
218 base::Bind(&NodeController::RequestParentPortConnectionOnIOThread,
219 base::Unretained(this), local_port, token, callback));
220 }
221
222 void NodeController::ConnectToRemotePort(
223 const ports::PortRef& local_port,
224 const ports::NodeName& remote_node_name,
225 const ports::PortName& remote_port_name,
226 const base::Closure& callback) {
227 if (remote_node_name == name_) {
228 // It's possible that two different code paths on the node are trying to
229 // bootstrap ports to each other (e.g. in Chrome single-process mode)
230 // without being aware of the fact. In this case we can initialize the port
231 // immediately (which can fail silently if it's already been initialized by
232 // the request on the other side), and invoke |callback|.
233 node_->InitializePort(local_port, name_, remote_port_name);
234 callback.Run();
235 return;
236 }
237
238 PendingRemotePortConnection connection;
239 connection.local_port = local_port;
240 connection.remote_node_name = remote_node_name;
241 connection.remote_port_name = remote_port_name;
242 connection.callback = callback;
243 io_task_runner_->PostTask(
244 FROM_HERE,
245 base::Bind(&NodeController::ConnectToRemotePortOnIOThread,
246 base::Unretained(this), connection));
247 }
248
208 void NodeController::RequestShutdown(const base::Closure& callback) { 249 void NodeController::RequestShutdown(const base::Closure& callback) {
209 { 250 {
210 base::AutoLock lock(shutdown_lock_); 251 base::AutoLock lock(shutdown_lock_);
211 shutdown_callback_ = callback; 252 shutdown_callback_ = callback;
212 } 253 }
213 254
214 AttemptShutdownIfRequested(); 255 AttemptShutdownIfRequested();
215 } 256 }
216 257
217 void NodeController::ConnectToChildOnIOThread( 258 void NodeController::ConnectToChildOnIOThread(
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
256 DCHECK(parent_name_ == ports::kInvalidNodeName); 297 DCHECK(parent_name_ == ports::kInvalidNodeName);
257 298
258 // At this point we don't know the parent's name, so we can't yet insert it 299 // At this point we don't know the parent's name, so we can't yet insert it
259 // into our |peers_| map. That will happen as soon as we receive an 300 // into our |peers_| map. That will happen as soon as we receive an
260 // AcceptChild message from them. 301 // AcceptChild message from them.
261 bootstrap_parent_channel_ = 302 bootstrap_parent_channel_ =
262 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); 303 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_);
263 bootstrap_parent_channel_->Start(); 304 bootstrap_parent_channel_->Start();
264 } 305 }
265 306
307 void NodeController::RequestParentPortConnectionOnIOThread(
308 const ports::PortRef& local_port,
309 const std::string& token,
310 const base::Closure& callback) {
311 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
312
313 scoped_refptr<NodeChannel> parent = GetParentChannel();
314 if (!parent) {
315 PendingPortRequest request;
316 request.token = token;
317 request.local_port = local_port;
318 request.callback = callback;
319 pending_port_requests_.push_back(request);
320 return;
321 }
322
323 pending_parent_port_connections_.insert(
324 std::make_pair(local_port.name(), callback));
325 parent->RequestPortConnection(local_port.name(), token);
326 }
327
328 void NodeController::ConnectToRemotePortOnIOThread(
329 const PendingRemotePortConnection& connection) {
330 scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name);
331 if (peer) {
332 // It's safe to initialize the port since we already have a channel to its
333 // peer. No need to actually send them a message.
334 int rv = node_->InitializePort(connection.local_port,
335 connection.remote_node_name,
336 connection.remote_port_name);
337 DCHECK_EQ(rv, ports::OK);
338 connection.callback.Run();
339 return;
340 }
341
342 // Save this for later. We'll initialize the port once this peer is added.
343 pending_remote_port_connections_[connection.remote_node_name].push_back(
344 connection);
345 }
346
266 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( 347 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
267 const ports::NodeName& name) { 348 const ports::NodeName& name) {
268 base::AutoLock lock(peers_lock_); 349 base::AutoLock lock(peers_lock_);
269 auto it = peers_.find(name); 350 auto it = peers_.find(name);
270 if (it == peers_.end()) 351 if (it == peers_.end())
271 return nullptr; 352 return nullptr;
272 return it->second; 353 return it->second;
273 } 354 }
274 355
275 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { 356 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 } 407 }
327 408
328 if (start_channel) 409 if (start_channel)
329 channel->Start(); 410 channel->Start();
330 411
331 // Flush any queued message we need to deliver to this node. 412 // Flush any queued message we need to deliver to this node.
332 while (!pending_messages.empty()) { 413 while (!pending_messages.empty()) {
333 channel->PortsMessage(std::move(pending_messages.front())); 414 channel->PortsMessage(std::move(pending_messages.front()));
334 pending_messages.pop(); 415 pending_messages.pop();
335 } 416 }
417
418 // Complete any pending port connections to this peer.
419 auto connections_it = pending_remote_port_connections_.find(name);
420 if (connections_it != pending_remote_port_connections_.end()) {
421 for (const auto& connection : connections_it->second) {
422 int rv = node_->InitializePort(connection.local_port,
423 connection.remote_node_name,
424 connection.remote_port_name);
425 DCHECK_EQ(rv, ports::OK);
426 connection.callback.Run();
427 }
428 pending_remote_port_connections_.erase(connections_it);
429 }
336 } 430 }
337 431
338 void NodeController::DropPeer(const ports::NodeName& name) { 432 void NodeController::DropPeer(const ports::NodeName& name) {
339 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 433 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
340 434
341 { 435 {
342 base::AutoLock lock(peers_lock_); 436 base::AutoLock lock(peers_lock_);
343 auto it = peers_.find(name); 437 auto it = peers_.find(name);
344 438
345 if (it != peers_.end()) { 439 if (it != peers_.end()) {
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after
515 parent_name_ = parent_name; 609 parent_name_ = parent_name;
516 parent = bootstrap_parent_channel_; 610 parent = bootstrap_parent_channel_;
517 } 611 }
518 612
519 parent->SetRemoteNodeName(parent_name); 613 parent->SetRemoteNodeName(parent_name);
520 parent->AcceptParent(token, name_); 614 parent->AcceptParent(token, name_);
521 615
522 // NOTE: The child does not actually add its parent as a peer until 616 // NOTE: The child does not actually add its parent as a peer until
523 // receiving an AcceptBrokerClient message from the broker. The parent 617 // receiving an AcceptBrokerClient message from the broker. The parent
524 // will request that said message be sent upon receiving AcceptParent. 618 // will request that said message be sent upon receiving AcceptParent.
525
526 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
527 } 619 }
528 620
529 void NodeController::OnAcceptParent(const ports::NodeName& from_node, 621 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
530 const ports::NodeName& token, 622 const ports::NodeName& token,
531 const ports::NodeName& child_name) { 623 const ports::NodeName& child_name) {
532 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 624 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
533 625
534 auto it = pending_children_.find(from_node); 626 auto it = pending_children_.find(from_node);
535 if (it == pending_children_.end() || token != from_node) { 627 if (it == pending_children_.end() || token != from_node) {
536 DLOG(ERROR) << "Received unexpected AcceptParent message from " 628 DLOG(ERROR) << "Received unexpected AcceptParent message from "
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after
664 scoped_refptr<NodeChannel> broker; 756 scoped_refptr<NodeChannel> broker;
665 if (broker_name == parent_name) { 757 if (broker_name == parent_name) {
666 DCHECK(!broker_channel.is_valid()); 758 DCHECK(!broker_channel.is_valid());
667 broker = parent; 759 broker = parent;
668 } else { 760 } else {
669 DCHECK(broker_channel.is_valid()); 761 DCHECK(broker_channel.is_valid());
670 broker = NodeChannel::Create(this, std::move(broker_channel), 762 broker = NodeChannel::Create(this, std::move(broker_channel),
671 io_task_runner_); 763 io_task_runner_);
672 AddPeer(broker_name, broker, true /* start_channel */); 764 AddPeer(broker_name, broker, true /* start_channel */);
673 } 765 }
674
675 AddPeer(parent_name, parent, false /* start_channel */); 766 AddPeer(parent_name, parent, false /* start_channel */);
676 767
677 { 768 // Resolve any pending port connections to the parent.
678 // Complete any port merge requests we have waiting for the parent. 769 for (const auto& request : pending_port_requests_) {
679 base::AutoLock lock(pending_port_merges_lock_); 770 pending_parent_port_connections_.insert(
680 for (const auto& request : pending_port_merges_) 771 std::make_pair(request.local_port.name(), request.callback));
681 parent->RequestPortMerge(request.second.name(), request.first); 772 parent->RequestPortConnection(request.local_port.name(), request.token);
682 pending_port_merges_.clear();
683 } 773 }
774 pending_port_requests_.clear();
684 775
685 // Feed the broker any pending children of our own. 776 // Feed the broker any pending children of our own.
686 while (!pending_broker_clients.empty()) { 777 while (!pending_broker_clients.empty()) {
687 const ports::NodeName& child_name = pending_broker_clients.front(); 778 const ports::NodeName& child_name = pending_broker_clients.front();
688 auto it = pending_children_.find(child_name); 779 auto it = pending_children_.find(child_name);
689 DCHECK(it != pending_children_.end()); 780 DCHECK(it != pending_children_.end());
690 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle()); 781 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle());
691 pending_broker_clients.pop(); 782 pending_broker_clients.pop();
692 } 783 }
693 784
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
726 new PortsMessage(num_header_bytes, 817 new PortsMessage(num_header_bytes,
727 num_payload_bytes, 818 num_payload_bytes,
728 num_ports_bytes, 819 num_ports_bytes,
729 std::move(channel_message))); 820 std::move(channel_message)));
730 821
731 node_->AcceptMessage(std::move(message)); 822 node_->AcceptMessage(std::move(message));
732 AcceptIncomingMessages(); 823 AcceptIncomingMessages();
733 AttemptShutdownIfRequested(); 824 AttemptShutdownIfRequested();
734 } 825 }
735 826
736 void NodeController::OnRequestPortMerge( 827 void NodeController::OnRequestPortConnection(
737 const ports::NodeName& from_node, 828 const ports::NodeName& from_node,
738 const ports::PortName& connector_port_name, 829 const ports::PortName& connector_port_name,
739 const std::string& token) { 830 const std::string& token) {
740 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 831 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
741 832
742 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " 833 DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token "
743 << token << " and port " << connector_port_name << "@" << from_node; 834 << token << " and port " << connector_port_name << "@" << from_node;
744 835
836 ReservePortCallback callback;
745 ports::PortRef local_port; 837 ports::PortRef local_port;
746 { 838 {
747 base::AutoLock lock(reserved_ports_lock_); 839 base::AutoLock lock(reserved_ports_lock_);
748 auto it = reserved_ports_.find(token); 840 auto it = reserved_ports_.find(token);
749 if (it == reserved_ports_.end()) { 841 if (it == reserved_ports_.end()) {
750 DVLOG(1) << "Ignoring request to connect to port for unknown token " 842 DVLOG(1) << "Ignoring request to connect to port for unknown token "
751 << token; 843 << token;
752 return; 844 return;
753 } 845 }
754 local_port = it->second; 846 local_port = it->second.local_port;
847 callback = it->second.callback;
848 reserved_ports_.erase(it);
755 } 849 }
756 850
757 int rv = node_->MergePorts(local_port, from_node, connector_port_name); 851 DCHECK(!callback.is_null());
758 if (rv != ports::OK) 852
759 DLOG(ERROR) << "MergePorts failed: " << rv; 853 scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node);
854 if (!peer) {
855 DVLOG(1) << "Ignoring request to connect to port from unknown node "
856 << from_node;
857 return;
858 }
859
860 // This reserved port should not have been initialized yet.
861 CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node,
862 connector_port_name));
863
864 peer->ConnectToPort(local_port.name(), connector_port_name);
865 callback.Run(local_port);
866 }
867
868 void NodeController::OnConnectToPort(
869 const ports::NodeName& from_node,
870 const ports::PortName& connector_port_name,
871 const ports::PortName& connectee_port_name) {
872 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
873
874 DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port "
875 << connectee_port_name << " to port " << connector_port_name << "@"
876 << from_node;
877
878 ports::PortRef connectee_port;
879 int rv = node_->GetPort(connectee_port_name, &connectee_port);
880 if (rv != ports::OK) {
881 DLOG(ERROR) << "Ignoring ConnectToPort for unknown port "
882 << connectee_port_name;
883 return;
884 }
885
886 // It's OK if this port has already been initialized. This message is only
887 // sent by the remote peer to ensure the port is ready before it starts
888 // us sending messages to it.
889 ports::PortStatus port_status;
890 rv = node_->GetStatus(connectee_port, &port_status);
891 if (rv == ports::OK) {
892 DVLOG(1) << "Ignoring ConnectToPort for already-initialized port "
893 << connectee_port_name;
894 return;
895 }
896
897 CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node,
898 connector_port_name));
899
900 auto it = pending_parent_port_connections_.find(connectee_port_name);
901 DCHECK(it != pending_parent_port_connections_.end());
902 it->second.Run();
903 pending_parent_port_connections_.erase(it);
760 } 904 }
761 905
762 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 906 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
763 const ports::NodeName& name) { 907 const ports::NodeName& name) {
764 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 908 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
765 909
766 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); 910 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
767 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { 911 if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
768 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " 912 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
769 << from_node; 913 << from_node;
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
871 shutdown_callback_.Reset(); 1015 shutdown_callback_.Reset();
872 } 1016 }
873 1017
874 DCHECK(!callback.is_null()); 1018 DCHECK(!callback.is_null());
875 1019
876 callback.Run(); 1020 callback.Run();
877 } 1021 }
878 1022
879 } // namespace edk 1023 } // namespace edk
880 } // namespace mojo 1024 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports/event.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698