Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(175)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2043713004: Mojo: Add NotifyBadMessage API (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: no bindings Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports_message.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
132 #endif 132 #endif
133 133
134 void NodeController::SetIOTaskRunner( 134 void NodeController::SetIOTaskRunner(
135 scoped_refptr<base::TaskRunner> task_runner) { 135 scoped_refptr<base::TaskRunner> task_runner) {
136 io_task_runner_ = task_runner; 136 io_task_runner_ = task_runner;
137 ThreadDestructionObserver::Create( 137 ThreadDestructionObserver::Create(
138 io_task_runner_, 138 io_task_runner_,
139 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); 139 base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
140 } 140 }
141 141
142 void NodeController::ConnectToChild(base::ProcessHandle process_handle, 142 void NodeController::ConnectToChild(
143 ScopedPlatformHandle platform_handle, 143 base::ProcessHandle process_handle,
144 const std::string& child_token) { 144 ScopedPlatformHandle platform_handle,
145 const std::string& child_token,
146 const ProcessErrorCallback& process_error_callback) {
145 // Generate the temporary remote node name here so that it can be associated 147 // Generate the temporary remote node name here so that it can be associated
146 // with the embedder's child_token. If an error occurs in the child process 148 // with the embedder's child_token. If an error occurs in the child process
147 // after it is launched, but before any reserved ports are connected, this can 149 // after it is launched, but before any reserved ports are connected, this can
148 // be used to clean up any dangling ports. 150 // be used to clean up any dangling ports.
149 ports::NodeName node_name; 151 ports::NodeName node_name;
150 GenerateRandomName(&node_name); 152 GenerateRandomName(&node_name);
151 153
152 { 154 {
153 base::AutoLock lock(reserved_ports_lock_); 155 base::AutoLock lock(reserved_ports_lock_);
154 bool inserted = pending_child_tokens_.insert( 156 bool inserted = pending_child_tokens_.insert(
155 std::make_pair(node_name, child_token)).second; 157 std::make_pair(node_name, child_token)).second;
156 DCHECK(inserted); 158 DCHECK(inserted);
157 } 159 }
158 160
159 io_task_runner_->PostTask( 161 io_task_runner_->PostTask(
160 FROM_HERE, 162 FROM_HERE,
161 base::Bind(&NodeController::ConnectToChildOnIOThread, 163 base::Bind(&NodeController::ConnectToChildOnIOThread,
162 base::Unretained(this), 164 base::Unretained(this),
163 process_handle, 165 process_handle,
164 base::Passed(&platform_handle), 166 base::Passed(&platform_handle),
165 node_name)); 167 node_name,
168 process_error_callback));
166 } 169 }
167 170
168 void NodeController::CloseChildPorts(const std::string& child_token) { 171 void NodeController::CloseChildPorts(const std::string& child_token) {
169 std::vector<ports::PortRef> ports_to_close; 172 std::vector<ports::PortRef> ports_to_close;
170 { 173 {
171 std::vector<std::string> port_tokens; 174 std::vector<std::string> port_tokens;
172 base::AutoLock lock(reserved_ports_lock_); 175 base::AutoLock lock(reserved_ports_lock_);
173 for (const auto& port : reserved_ports_) { 176 for (const auto& port : reserved_ports_) {
174 if (port.second.child_token == child_token) { 177 if (port.second.child_token == child_token) {
175 DVLOG(1) << "Closing reserved port " << port.second.port.name(); 178 DVLOG(1) << "Closing reserved port " << port.second.port.name();
(...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after
303 void NodeController::RequestShutdown(const base::Closure& callback) { 306 void NodeController::RequestShutdown(const base::Closure& callback) {
304 { 307 {
305 base::AutoLock lock(shutdown_lock_); 308 base::AutoLock lock(shutdown_lock_);
306 shutdown_callback_ = callback; 309 shutdown_callback_ = callback;
307 shutdown_callback_flag_.Set(true); 310 shutdown_callback_flag_.Set(true);
308 } 311 }
309 312
310 AttemptShutdownIfRequested(); 313 AttemptShutdownIfRequested();
311 } 314 }
312 315
316 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
317 const std::string& error) {
318 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
319 if (peer)
320 peer->NotifyBadMessage(error);
321 }
322
313 void NodeController::ConnectToChildOnIOThread( 323 void NodeController::ConnectToChildOnIOThread(
314 base::ProcessHandle process_handle, 324 base::ProcessHandle process_handle,
315 ScopedPlatformHandle platform_handle, 325 ScopedPlatformHandle platform_handle,
316 ports::NodeName token) { 326 ports::NodeName token,
327 const ProcessErrorCallback& process_error_callback) {
317 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 328 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
318 329
319 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL) 330 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL)
320 PlatformChannelPair node_channel; 331 PlatformChannelPair node_channel;
321 // BrokerHost owns itself. 332 // BrokerHost owns itself.
322 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); 333 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle));
323 broker_host->SendChannel(node_channel.PassClientHandle()); 334 broker_host->SendChannel(node_channel.PassClientHandle());
324 scoped_refptr<NodeChannel> channel = NodeChannel::Create( 335 scoped_refptr<NodeChannel> channel = NodeChannel::Create(
325 this, node_channel.PassServerHandle(), io_task_runner_); 336 this, node_channel.PassServerHandle(), io_task_runner_,
337 process_error_callback);
326 #else 338 #else
327 scoped_refptr<NodeChannel> channel = 339 scoped_refptr<NodeChannel> channel =
328 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); 340 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_,
341 process_error_callback);
329 #endif 342 #endif
330 343
331 // We set up the child channel with a temporary name so it can be identified 344 // We set up the child channel with a temporary name so it can be identified
332 // as a pending child if it writes any messages to the channel. We may start 345 // as a pending child if it writes any messages to the channel. We may start
333 // receiving messages from it (though we shouldn't) as soon as Start() is 346 // receiving messages from it (though we shouldn't) as soon as Start() is
334 // called below. 347 // called below.
335 348
336 pending_children_.insert(std::make_pair(token, channel)); 349 pending_children_.insert(std::make_pair(token, channel));
337 RecordPendingChildCount(pending_children_.size()); 350 RecordPendingChildCount(pending_children_.size());
338 351
339 channel->SetRemoteNodeName(token); 352 channel->SetRemoteNodeName(token);
340 channel->SetRemoteProcessHandle(process_handle); 353 channel->SetRemoteProcessHandle(process_handle);
341 channel->Start(); 354 channel->Start();
342 355
343 channel->AcceptChild(name_, token); 356 channel->AcceptChild(name_, token);
344 } 357 }
345 358
346 void NodeController::ConnectToParentOnIOThread( 359 void NodeController::ConnectToParentOnIOThread(
347 ScopedPlatformHandle platform_handle) { 360 ScopedPlatformHandle platform_handle) {
348 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 361 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
349 362
350 { 363 {
351 base::AutoLock lock(parent_lock_); 364 base::AutoLock lock(parent_lock_);
352 DCHECK(parent_name_ == ports::kInvalidNodeName); 365 DCHECK(parent_name_ == ports::kInvalidNodeName);
353 366
354 // At this point we don't know the parent's name, so we can't yet insert it 367 // At this point we don't know the parent's name, so we can't yet insert it
355 // into our |peers_| map. That will happen as soon as we receive an 368 // into our |peers_| map. That will happen as soon as we receive an
356 // AcceptChild message from them. 369 // AcceptChild message from them.
357 bootstrap_parent_channel_ = 370 bootstrap_parent_channel_ =
358 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); 371 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_,
372 ProcessErrorCallback());
359 } 373 }
360 bootstrap_parent_channel_->Start(); 374 bootstrap_parent_channel_->Start();
361 } 375 }
362 376
363 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( 377 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
364 const ports::NodeName& name) { 378 const ports::NodeName& name) {
365 base::AutoLock lock(peers_lock_); 379 base::AutoLock lock(peers_lock_);
366 auto it = peers_.find(name); 380 auto it = peers_.find(name);
367 if (it == peers_.end()) 381 if (it == peers_.end())
368 return nullptr; 382 return nullptr;
(...skipping 380 matching lines...) Expand 10 before | Expand all | Expand 10 after
749 } 763 }
750 764
751 if (GetPeerChannel(client_name)) { 765 if (GetPeerChannel(client_name)) {
752 DLOG(ERROR) << "Ignoring AddBrokerClient for known client."; 766 DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
753 DropPeer(from_node); 767 DropPeer(from_node);
754 return; 768 return;
755 } 769 }
756 770
757 PlatformChannelPair broker_channel; 771 PlatformChannelPair broker_channel;
758 scoped_refptr<NodeChannel> client = NodeChannel::Create( 772 scoped_refptr<NodeChannel> client = NodeChannel::Create(
759 this, broker_channel.PassServerHandle(), io_task_runner_); 773 this, broker_channel.PassServerHandle(), io_task_runner_,
774 ProcessErrorCallback());
760 775
761 #if defined(OS_WIN) 776 #if defined(OS_WIN)
762 // The broker must have a working handle to the client process in order to 777 // The broker must have a working handle to the client process in order to
763 // properly copy other handles to and from the client. 778 // properly copy other handles to and from the client.
764 if (!scoped_process_handle.is_valid()) { 779 if (!scoped_process_handle.is_valid()) {
765 DLOG(ERROR) << "Broker rejecting client with invalid process handle."; 780 DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
766 return; 781 return;
767 } 782 }
768 client->SetRemoteProcessHandle(scoped_process_handle.release().handle); 783 client->SetRemoteProcessHandle(scoped_process_handle.release().handle);
769 #else 784 #else
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
826 841
827 // It's now possible to add both the broker and the parent as peers. 842 // It's now possible to add both the broker and the parent as peers.
828 // Note that the broker and parent may be the same node. 843 // Note that the broker and parent may be the same node.
829 scoped_refptr<NodeChannel> broker; 844 scoped_refptr<NodeChannel> broker;
830 if (broker_name == parent_name) { 845 if (broker_name == parent_name) {
831 DCHECK(!broker_channel.is_valid()); 846 DCHECK(!broker_channel.is_valid());
832 broker = parent; 847 broker = parent;
833 } else { 848 } else {
834 DCHECK(broker_channel.is_valid()); 849 DCHECK(broker_channel.is_valid());
835 broker = NodeChannel::Create(this, std::move(broker_channel), 850 broker = NodeChannel::Create(this, std::move(broker_channel),
836 io_task_runner_); 851 io_task_runner_, ProcessErrorCallback());
837 AddPeer(broker_name, broker, true /* start_channel */); 852 AddPeer(broker_name, broker, true /* start_channel */);
838 } 853 }
839 854
840 AddPeer(parent_name, parent, false /* start_channel */); 855 AddPeer(parent_name, parent, false /* start_channel */);
841 856
842 { 857 {
843 // Complete any port merge requests we have waiting for the parent. 858 // Complete any port merge requests we have waiting for the parent.
844 base::AutoLock lock(pending_port_merges_lock_); 859 base::AutoLock lock(pending_port_merges_lock_);
845 for (const auto& request : pending_port_merges_) 860 for (const auto& request : pending_port_merges_)
846 parent->RequestPortMerge(request.second.name(), request.first); 861 parent->RequestPortMerge(request.second.name(), request.first);
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
888 if (!ports::Message::Parse(data, 903 if (!ports::Message::Parse(data,
889 num_data_bytes, 904 num_data_bytes,
890 &num_header_bytes, 905 &num_header_bytes,
891 &num_payload_bytes, 906 &num_payload_bytes,
892 &num_ports_bytes)) { 907 &num_ports_bytes)) {
893 DropPeer(from_node); 908 DropPeer(from_node);
894 return; 909 return;
895 } 910 }
896 911
897 CHECK(channel_message); 912 CHECK(channel_message);
898 ports::ScopedMessage message( 913 std::unique_ptr<PortsMessage> ports_message(
899 new PortsMessage(num_header_bytes, 914 new PortsMessage(num_header_bytes,
900 num_payload_bytes, 915 num_payload_bytes,
901 num_ports_bytes, 916 num_ports_bytes,
902 std::move(channel_message))); 917 std::move(channel_message)));
903 918 ports_message->set_source_node(from_node);
904 node_->AcceptMessage(std::move(message)); 919 node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
905 AcceptIncomingMessages(); 920 AcceptIncomingMessages();
906 } 921 }
907 922
908 void NodeController::OnRequestPortMerge( 923 void NodeController::OnRequestPortMerge(
909 const ports::NodeName& from_node, 924 const ports::NodeName& from_node,
910 const ports::PortName& connector_port_name, 925 const ports::PortName& connector_port_name,
911 const std::string& token) { 926 const std::string& token) {
912 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 927 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
913 928
914 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " 929 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
962 if (!channel_handle.is_valid()) { 977 if (!channel_handle.is_valid()) {
963 node_->LostConnectionToNode(name); 978 node_->LostConnectionToNode(name);
964 979
965 DLOG(ERROR) << "Could not be introduced to peer " << name; 980 DLOG(ERROR) << "Could not be introduced to peer " << name;
966 base::AutoLock lock(peers_lock_); 981 base::AutoLock lock(peers_lock_);
967 pending_peer_messages_.erase(name); 982 pending_peer_messages_.erase(name);
968 return; 983 return;
969 } 984 }
970 985
971 scoped_refptr<NodeChannel> channel = 986 scoped_refptr<NodeChannel> channel =
972 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_); 987 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_,
988 ProcessErrorCallback());
973 989
974 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; 990 DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
975 AddPeer(name, channel, true /* start_channel */); 991 AddPeer(name, channel, true /* start_channel */);
976 } 992 }
977 993
978 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 994 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
979 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, 995 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
980 base::ProcessHandle from_process, 996 base::ProcessHandle from_process,
981 const ports::NodeName& destination, 997 const ports::NodeName& destination,
982 Channel::MessagePtr message) { 998 Channel::MessagePtr message) {
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
1029 #endif // defined(OS_WIN) 1045 #endif // defined(OS_WIN)
1030 1046
1031 if (destination == name_) { 1047 if (destination == name_) {
1032 // Great, we can deliver this message locally. 1048 // Great, we can deliver this message locally.
1033 OnPortsMessage(from_node, std::move(message)); 1049 OnPortsMessage(from_node, std::move(message));
1034 return; 1050 return;
1035 } 1051 }
1036 1052
1037 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination); 1053 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
1038 if (peer) 1054 if (peer)
1039 peer->PortsMessage(std::move(message)); 1055 peer->PortsMessageFromRelay(from_node, std::move(message));
1040 else 1056 else
1041 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; 1057 DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
1042 } 1058 }
1059
1060 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node,
1061 const ports::NodeName& source_node,
1062 Channel::MessagePtr message) {
1063 if (GetPeerChannel(from_node) != GetBrokerChannel()) {
1064 LOG(ERROR) << "Refusing relayed message from non-broker node.";
1065 DropPeer(from_node);
1066 return;
1067 }
1068
1069 OnPortsMessage(source_node, std::move(message));
1070 }
1043 #endif 1071 #endif
1044 1072
1045 void NodeController::OnChannelError(const ports::NodeName& from_node) { 1073 void NodeController::OnChannelError(const ports::NodeName& from_node) {
1046 if (io_task_runner_->RunsTasksOnCurrentThread()) { 1074 if (io_task_runner_->RunsTasksOnCurrentThread()) {
1047 DropPeer(from_node); 1075 DropPeer(from_node);
1048 // DropPeer may have caused local port closures, so be sure to process any 1076 // DropPeer may have caused local port closures, so be sure to process any
1049 // pending local messages. 1077 // pending local messages.
1050 AcceptIncomingMessages(); 1078 AcceptIncomingMessages();
1051 } else { 1079 } else {
1052 io_task_runner_->PostTask( 1080 io_task_runner_->PostTask(
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
1093 shutdown_callback_flag_.Set(false); 1121 shutdown_callback_flag_.Set(false);
1094 } 1122 }
1095 1123
1096 DCHECK(!callback.is_null()); 1124 DCHECK(!callback.is_null());
1097 1125
1098 callback.Run(); 1126 callback.Run();
1099 } 1127 }
1100 1128
1101 } // namespace edk 1129 } // namespace edk
1102 } // namespace mojo 1130 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports_message.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698