Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(101)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 1675603002: [mojo-edk] Simplify multiprocess pipe bootstrap (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fix some callers to work with sync APIs Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports/event.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 delete this; 92 delete this;
93 } 93 }
94 94
95 const base::Closure callback_; 95 const base::Closure callback_;
96 96
97 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); 97 DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
98 }; 98 };
99 99
100 } // namespace 100 } // namespace
101 101
102 NodeController::PendingPortRequest::PendingPortRequest() {}
103
104 NodeController::PendingPortRequest::~PendingPortRequest() {}
105
106 NodeController::ReservedPort::ReservedPort() {}
107
108 NodeController::ReservedPort::~ReservedPort() {}
109
110 NodeController::PendingRemotePortConnection::PendingRemotePortConnection() {}
111
112 NodeController::PendingRemotePortConnection::~PendingRemotePortConnection() {}
113
114 NodeController::~NodeController() {} 102 NodeController::~NodeController() {}
115 103
116 NodeController::NodeController(Core* core) 104 NodeController::NodeController(Core* core)
117 : core_(core), 105 : core_(core),
118 name_(GetRandomNodeName()), 106 name_(GetRandomNodeName()),
119 node_(new ports::Node(name_, this)) { 107 node_(new ports::Node(name_, this)) {
120 DVLOG(1) << "Initializing node " << name_; 108 DVLOG(1) << "Initializing node " << name_;
121 } 109 }
122 110
123 void NodeController::SetIOTaskRunner( 111 void NodeController::SetIOTaskRunner(
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
175 if (rv != ports::OK) { 163 if (rv != ports::OK) {
176 DCHECK(ports_message); 164 DCHECK(ports_message);
177 message->reset(static_cast<PortsMessage*>(ports_message.release())); 165 message->reset(static_cast<PortsMessage*>(ports_message.release()));
178 } 166 }
179 167
180 AcceptIncomingMessages(); 168 AcceptIncomingMessages();
181 return rv; 169 return rv;
182 } 170 }
183 171
184 void NodeController::ReservePort(const std::string& token, 172 void NodeController::ReservePort(const std::string& token,
185 const ReservePortCallback& callback) { 173 const ports::PortRef& port) {
186 ports::PortRef port;
187 node_->CreateUninitializedPort(&port);
188
189 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token " 174 DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
190 << token; 175 << token;
191 176
192 base::AutoLock lock(reserved_ports_lock_); 177 base::AutoLock lock(reserved_ports_lock_);
193 ReservedPort reservation; 178 auto result = reserved_ports_.insert(std::make_pair(token, port));
194 reservation.local_port = port; 179 DCHECK(result.second);
195 reservation.callback = callback; 180 }
196 reserved_ports_.insert(std::make_pair(token, reservation)); 181
182 void NodeController::MergePortIntoParent(const std::string& token,
183 const ports::PortRef& port) {
184 scoped_refptr<NodeChannel> parent = GetParentChannel();
185 if (parent) {
186 parent->RequestPortMerge(port.name(), token);
187 return;
188 }
189
190 base::AutoLock lock(pending_port_merges_lock_);
191 pending_port_merges_.push_back(std::make_pair(token, port));
197 } 192 }
198 193
199 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer( 194 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
200 size_t num_bytes) { 195 size_t num_bytes) {
201 scoped_refptr<PlatformSharedBuffer> buffer = 196 scoped_refptr<PlatformSharedBuffer> buffer =
202 internal::g_platform_support->CreateSharedBuffer(num_bytes); 197 internal::g_platform_support->CreateSharedBuffer(num_bytes);
203 #if defined(OS_POSIX) 198 #if defined(OS_POSIX)
204 if (!buffer && broker_) { 199 if (!buffer && broker_) {
205 // On POSIX, creating a shared buffer in a sandboxed process will fail, so 200 // On POSIX, creating a shared buffer in a sandboxed process will fail, so
206 // fall back to the broker if there is one. 201 // fall back to the broker if there is one.
207 buffer = broker_->GetSharedBuffer(num_bytes); 202 buffer = broker_->GetSharedBuffer(num_bytes);
208 } 203 }
209 #endif 204 #endif
210 return buffer; 205 return buffer;
211 } 206 }
212 207
213 void NodeController::ConnectToParentPort(const ports::PortRef& local_port,
214 const std::string& token,
215 const base::Closure& callback) {
216 io_task_runner_->PostTask(
217 FROM_HERE,
218 base::Bind(&NodeController::RequestParentPortConnectionOnIOThread,
219 base::Unretained(this), local_port, token, callback));
220 }
221
222 void NodeController::ConnectToRemotePort(
223 const ports::PortRef& local_port,
224 const ports::NodeName& remote_node_name,
225 const ports::PortName& remote_port_name,
226 const base::Closure& callback) {
227 if (remote_node_name == name_) {
228 // It's possible that two different code paths on the node are trying to
229 // bootstrap ports to each other (e.g. in Chrome single-process mode)
230 // without being aware of the fact. In this case we can initialize the port
231 // immediately (which can fail silently if it's already been initialized by
232 // the request on the other side), and invoke |callback|.
233 node_->InitializePort(local_port, name_, remote_port_name);
234 callback.Run();
235 return;
236 }
237
238 PendingRemotePortConnection connection;
239 connection.local_port = local_port;
240 connection.remote_node_name = remote_node_name;
241 connection.remote_port_name = remote_port_name;
242 connection.callback = callback;
243 io_task_runner_->PostTask(
244 FROM_HERE,
245 base::Bind(&NodeController::ConnectToRemotePortOnIOThread,
246 base::Unretained(this), connection));
247 }
248
249 void NodeController::RequestShutdown(const base::Closure& callback) { 208 void NodeController::RequestShutdown(const base::Closure& callback) {
250 { 209 {
251 base::AutoLock lock(shutdown_lock_); 210 base::AutoLock lock(shutdown_lock_);
252 shutdown_callback_ = callback; 211 shutdown_callback_ = callback;
253 } 212 }
254 213
255 AttemptShutdownIfRequested(); 214 AttemptShutdownIfRequested();
256 } 215 }
257 216
258 void NodeController::ConnectToChildOnIOThread( 217 void NodeController::ConnectToChildOnIOThread(
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
297 DCHECK(parent_name_ == ports::kInvalidNodeName); 256 DCHECK(parent_name_ == ports::kInvalidNodeName);
298 257
299 // At this point we don't know the parent's name, so we can't yet insert it 258 // At this point we don't know the parent's name, so we can't yet insert it
300 // into our |peers_| map. That will happen as soon as we receive an 259 // into our |peers_| map. That will happen as soon as we receive an
301 // AcceptChild message from them. 260 // AcceptChild message from them.
302 bootstrap_parent_channel_ = 261 bootstrap_parent_channel_ =
303 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); 262 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_);
304 bootstrap_parent_channel_->Start(); 263 bootstrap_parent_channel_->Start();
305 } 264 }
306 265
307 void NodeController::RequestParentPortConnectionOnIOThread(
308 const ports::PortRef& local_port,
309 const std::string& token,
310 const base::Closure& callback) {
311 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
312
313 scoped_refptr<NodeChannel> parent = GetParentChannel();
314 if (!parent) {
315 PendingPortRequest request;
316 request.token = token;
317 request.local_port = local_port;
318 request.callback = callback;
319 pending_port_requests_.push_back(request);
320 return;
321 }
322
323 pending_parent_port_connections_.insert(
324 std::make_pair(local_port.name(), callback));
325 parent->RequestPortConnection(local_port.name(), token);
326 }
327
328 void NodeController::ConnectToRemotePortOnIOThread(
329 const PendingRemotePortConnection& connection) {
330 scoped_refptr<NodeChannel> peer = GetPeerChannel(connection.remote_node_name);
331 if (peer) {
332 // It's safe to initialize the port since we already have a channel to its
333 // peer. No need to actually send them a message.
334 int rv = node_->InitializePort(connection.local_port,
335 connection.remote_node_name,
336 connection.remote_port_name);
337 DCHECK_EQ(rv, ports::OK);
338 connection.callback.Run();
339 return;
340 }
341
342 // Save this for later. We'll initialize the port once this peer is added.
343 pending_remote_port_connections_[connection.remote_node_name].push_back(
344 connection);
345 }
346
347 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( 266 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
348 const ports::NodeName& name) { 267 const ports::NodeName& name) {
349 base::AutoLock lock(peers_lock_); 268 base::AutoLock lock(peers_lock_);
350 auto it = peers_.find(name); 269 auto it = peers_.find(name);
351 if (it == peers_.end()) 270 if (it == peers_.end())
352 return nullptr; 271 return nullptr;
353 return it->second; 272 return it->second;
354 } 273 }
355 274
356 scoped_refptr<NodeChannel> NodeController::GetParentChannel() { 275 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
407 } 326 }
408 327
409 if (start_channel) 328 if (start_channel)
410 channel->Start(); 329 channel->Start();
411 330
412 // Flush any queued message we need to deliver to this node. 331 // Flush any queued message we need to deliver to this node.
413 while (!pending_messages.empty()) { 332 while (!pending_messages.empty()) {
414 channel->PortsMessage(std::move(pending_messages.front())); 333 channel->PortsMessage(std::move(pending_messages.front()));
415 pending_messages.pop(); 334 pending_messages.pop();
416 } 335 }
417
418 // Complete any pending port connections to this peer.
419 auto connections_it = pending_remote_port_connections_.find(name);
420 if (connections_it != pending_remote_port_connections_.end()) {
421 for (const auto& connection : connections_it->second) {
422 int rv = node_->InitializePort(connection.local_port,
423 connection.remote_node_name,
424 connection.remote_port_name);
425 DCHECK_EQ(rv, ports::OK);
426 connection.callback.Run();
427 }
428 pending_remote_port_connections_.erase(connections_it);
429 }
430 } 336 }
431 337
432 void NodeController::DropPeer(const ports::NodeName& name) { 338 void NodeController::DropPeer(const ports::NodeName& name) {
433 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 339 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
434 340
435 { 341 {
436 base::AutoLock lock(peers_lock_); 342 base::AutoLock lock(peers_lock_);
437 auto it = peers_.find(name); 343 auto it = peers_.find(name);
438 344
439 if (it != peers_.end()) { 345 if (it != peers_.end()) {
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after
609 parent_name_ = parent_name; 515 parent_name_ = parent_name;
610 parent = bootstrap_parent_channel_; 516 parent = bootstrap_parent_channel_;
611 } 517 }
612 518
613 parent->SetRemoteNodeName(parent_name); 519 parent->SetRemoteNodeName(parent_name);
614 parent->AcceptParent(token, name_); 520 parent->AcceptParent(token, name_);
615 521
616 // NOTE: The child does not actually add its parent as a peer until 522 // NOTE: The child does not actually add its parent as a peer until
617 // receiving an AcceptBrokerClient message from the broker. The parent 523 // receiving an AcceptBrokerClient message from the broker. The parent
618 // will request that said message be sent upon receiving AcceptParent. 524 // will request that said message be sent upon receiving AcceptParent.
525
526 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
619 } 527 }
620 528
621 void NodeController::OnAcceptParent(const ports::NodeName& from_node, 529 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
622 const ports::NodeName& token, 530 const ports::NodeName& token,
623 const ports::NodeName& child_name) { 531 const ports::NodeName& child_name) {
624 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 532 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
625 533
626 auto it = pending_children_.find(from_node); 534 auto it = pending_children_.find(from_node);
627 if (it == pending_children_.end() || token != from_node) { 535 if (it == pending_children_.end() || token != from_node) {
628 DLOG(ERROR) << "Received unexpected AcceptParent message from " 536 DLOG(ERROR) << "Received unexpected AcceptParent message from "
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after
756 scoped_refptr<NodeChannel> broker; 664 scoped_refptr<NodeChannel> broker;
757 if (broker_name == parent_name) { 665 if (broker_name == parent_name) {
758 DCHECK(!broker_channel.is_valid()); 666 DCHECK(!broker_channel.is_valid());
759 broker = parent; 667 broker = parent;
760 } else { 668 } else {
761 DCHECK(broker_channel.is_valid()); 669 DCHECK(broker_channel.is_valid());
762 broker = NodeChannel::Create(this, std::move(broker_channel), 670 broker = NodeChannel::Create(this, std::move(broker_channel),
763 io_task_runner_); 671 io_task_runner_);
764 AddPeer(broker_name, broker, true /* start_channel */); 672 AddPeer(broker_name, broker, true /* start_channel */);
765 } 673 }
674
766 AddPeer(parent_name, parent, false /* start_channel */); 675 AddPeer(parent_name, parent, false /* start_channel */);
767 676
768 // Resolve any pending port connections to the parent. 677 {
769 for (const auto& request : pending_port_requests_) { 678 // Complete any port merge requests we have waiting for the parent.
770 pending_parent_port_connections_.insert( 679 base::AutoLock lock(pending_port_merges_lock_);
771 std::make_pair(request.local_port.name(), request.callback)); 680 for (const auto& request : pending_port_merges_)
772 parent->RequestPortConnection(request.local_port.name(), request.token); 681 parent->RequestPortMerge(request.second.name(), request.first);
682 pending_port_merges_.clear();
773 } 683 }
774 pending_port_requests_.clear();
775 684
776 // Feed the broker any pending children of our own. 685 // Feed the broker any pending children of our own.
777 while (!pending_broker_clients.empty()) { 686 while (!pending_broker_clients.empty()) {
778 const ports::NodeName& child_name = pending_broker_clients.front(); 687 const ports::NodeName& child_name = pending_broker_clients.front();
779 auto it = pending_children_.find(child_name); 688 auto it = pending_children_.find(child_name);
780 DCHECK(it != pending_children_.end()); 689 DCHECK(it != pending_children_.end());
781 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle()); 690 broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle());
782 pending_broker_clients.pop(); 691 pending_broker_clients.pop();
783 } 692 }
784 693
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
817 new PortsMessage(num_header_bytes, 726 new PortsMessage(num_header_bytes,
818 num_payload_bytes, 727 num_payload_bytes,
819 num_ports_bytes, 728 num_ports_bytes,
820 std::move(channel_message))); 729 std::move(channel_message)));
821 730
822 node_->AcceptMessage(std::move(message)); 731 node_->AcceptMessage(std::move(message));
823 AcceptIncomingMessages(); 732 AcceptIncomingMessages();
824 AttemptShutdownIfRequested(); 733 AttemptShutdownIfRequested();
825 } 734 }
826 735
827 void NodeController::OnRequestPortConnection( 736 void NodeController::OnRequestPortMerge(
828 const ports::NodeName& from_node, 737 const ports::NodeName& from_node,
829 const ports::PortName& connector_port_name, 738 const ports::PortName& connector_port_name,
830 const std::string& token) { 739 const std::string& token) {
831 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 740 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
832 741
833 DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token " 742 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
834 << token << " and port " << connector_port_name << "@" << from_node; 743 << token << " and port " << connector_port_name << "@" << from_node;
835 744
836 ReservePortCallback callback;
837 ports::PortRef local_port; 745 ports::PortRef local_port;
838 { 746 {
839 base::AutoLock lock(reserved_ports_lock_); 747 base::AutoLock lock(reserved_ports_lock_);
840 auto it = reserved_ports_.find(token); 748 auto it = reserved_ports_.find(token);
841 if (it == reserved_ports_.end()) { 749 if (it == reserved_ports_.end()) {
842 DVLOG(1) << "Ignoring request to connect to port for unknown token " 750 DVLOG(1) << "Ignoring request to connect to port for unknown token "
843 << token; 751 << token;
844 return; 752 return;
845 } 753 }
846 local_port = it->second.local_port; 754 local_port = it->second;
847 callback = it->second.callback;
848 reserved_ports_.erase(it);
849 } 755 }
850 756
851 DCHECK(!callback.is_null()); 757 int rv = node_->MergePorts(local_port, from_node, connector_port_name);
852 758 if (rv != ports::OK)
853 scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node); 759 DLOG(ERROR) << "MergePorts failed: " << rv;
854 if (!peer) {
855 DVLOG(1) << "Ignoring request to connect to port from unknown node "
856 << from_node;
857 return;
858 }
859
860 // This reserved port should not have been initialized yet.
861 CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node,
862 connector_port_name));
863
864 peer->ConnectToPort(local_port.name(), connector_port_name);
865 callback.Run(local_port);
866 }
867
868 void NodeController::OnConnectToPort(
869 const ports::NodeName& from_node,
870 const ports::PortName& connector_port_name,
871 const ports::PortName& connectee_port_name) {
872 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
873
874 DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port "
875 << connectee_port_name << " to port " << connector_port_name << "@"
876 << from_node;
877
878 ports::PortRef connectee_port;
879 int rv = node_->GetPort(connectee_port_name, &connectee_port);
880 if (rv != ports::OK) {
881 DLOG(ERROR) << "Ignoring ConnectToPort for unknown port "
882 << connectee_port_name;
883 return;
884 }
885
886 // It's OK if this port has already been initialized. This message is only
887 // sent by the remote peer to ensure the port is ready before it starts
888 // us sending messages to it.
889 ports::PortStatus port_status;
890 rv = node_->GetStatus(connectee_port, &port_status);
891 if (rv == ports::OK) {
892 DVLOG(1) << "Ignoring ConnectToPort for already-initialized port "
893 << connectee_port_name;
894 return;
895 }
896
897 CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node,
898 connector_port_name));
899
900 auto it = pending_parent_port_connections_.find(connectee_port_name);
901 DCHECK(it != pending_parent_port_connections_.end());
902 it->second.Run();
903 pending_parent_port_connections_.erase(it);
904 } 760 }
905 761
906 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 762 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
907 const ports::NodeName& name) { 763 const ports::NodeName& name) {
908 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 764 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
909 765
910 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); 766 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
911 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { 767 if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
912 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " 768 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
913 << from_node; 769 << from_node;
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
1015 shutdown_callback_.Reset(); 871 shutdown_callback_.Reset();
1016 } 872 }
1017 873
1018 DCHECK(!callback.is_null()); 874 DCHECK(!callback.is_null());
1019 875
1020 callback.Run(); 876 callback.Run();
1021 } 877 }
1022 878
1023 } // namespace edk 879 } // namespace edk
1024 } // namespace mojo 880 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports/event.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698