| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 132 #endif | 132 #endif |
| 133 | 133 |
| 134 void NodeController::SetIOTaskRunner( | 134 void NodeController::SetIOTaskRunner( |
| 135 scoped_refptr<base::TaskRunner> task_runner) { | 135 scoped_refptr<base::TaskRunner> task_runner) { |
| 136 io_task_runner_ = task_runner; | 136 io_task_runner_ = task_runner; |
| 137 ThreadDestructionObserver::Create( | 137 ThreadDestructionObserver::Create( |
| 138 io_task_runner_, | 138 io_task_runner_, |
| 139 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); | 139 base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); |
| 140 } | 140 } |
| 141 | 141 |
| 142 void NodeController::ConnectToChild(base::ProcessHandle process_handle, | 142 void NodeController::ConnectToChild( |
| 143 ScopedPlatformHandle platform_handle, | 143 base::ProcessHandle process_handle, |
| 144 const std::string& child_token) { | 144 ScopedPlatformHandle platform_handle, |
| 145 const std::string& child_token, |
| 146 const ProcessErrorCallback& process_error_callback) { |
| 145 // Generate the temporary remote node name here so that it can be associated | 147 // Generate the temporary remote node name here so that it can be associated |
| 146 // with the embedder's child_token. If an error occurs in the child process | 148 // with the embedder's child_token. If an error occurs in the child process |
| 147 // after it is launched, but before any reserved ports are connected, this can | 149 // after it is launched, but before any reserved ports are connected, this can |
| 148 // be used to clean up any dangling ports. | 150 // be used to clean up any dangling ports. |
| 149 ports::NodeName node_name; | 151 ports::NodeName node_name; |
| 150 GenerateRandomName(&node_name); | 152 GenerateRandomName(&node_name); |
| 151 | 153 |
| 152 { | 154 { |
| 153 base::AutoLock lock(reserved_ports_lock_); | 155 base::AutoLock lock(reserved_ports_lock_); |
| 154 bool inserted = pending_child_tokens_.insert( | 156 bool inserted = pending_child_tokens_.insert( |
| 155 std::make_pair(node_name, child_token)).second; | 157 std::make_pair(node_name, child_token)).second; |
| 156 DCHECK(inserted); | 158 DCHECK(inserted); |
| 157 } | 159 } |
| 158 | 160 |
| 159 io_task_runner_->PostTask( | 161 io_task_runner_->PostTask( |
| 160 FROM_HERE, | 162 FROM_HERE, |
| 161 base::Bind(&NodeController::ConnectToChildOnIOThread, | 163 base::Bind(&NodeController::ConnectToChildOnIOThread, |
| 162 base::Unretained(this), | 164 base::Unretained(this), |
| 163 process_handle, | 165 process_handle, |
| 164 base::Passed(&platform_handle), | 166 base::Passed(&platform_handle), |
| 165 node_name)); | 167 node_name, |
| 168 process_error_callback)); |
| 166 } | 169 } |
| 167 | 170 |
| 168 void NodeController::CloseChildPorts(const std::string& child_token) { | 171 void NodeController::CloseChildPorts(const std::string& child_token) { |
| 169 std::vector<ports::PortRef> ports_to_close; | 172 std::vector<ports::PortRef> ports_to_close; |
| 170 { | 173 { |
| 171 std::vector<std::string> port_tokens; | 174 std::vector<std::string> port_tokens; |
| 172 base::AutoLock lock(reserved_ports_lock_); | 175 base::AutoLock lock(reserved_ports_lock_); |
| 173 for (const auto& port : reserved_ports_) { | 176 for (const auto& port : reserved_ports_) { |
| 174 if (port.second.child_token == child_token) { | 177 if (port.second.child_token == child_token) { |
| 175 DVLOG(1) << "Closing reserved port " << port.second.port.name(); | 178 DVLOG(1) << "Closing reserved port " << port.second.port.name(); |
| (...skipping 127 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 303 void NodeController::RequestShutdown(const base::Closure& callback) { | 306 void NodeController::RequestShutdown(const base::Closure& callback) { |
| 304 { | 307 { |
| 305 base::AutoLock lock(shutdown_lock_); | 308 base::AutoLock lock(shutdown_lock_); |
| 306 shutdown_callback_ = callback; | 309 shutdown_callback_ = callback; |
| 307 shutdown_callback_flag_.Set(true); | 310 shutdown_callback_flag_.Set(true); |
| 308 } | 311 } |
| 309 | 312 |
| 310 AttemptShutdownIfRequested(); | 313 AttemptShutdownIfRequested(); |
| 311 } | 314 } |
| 312 | 315 |
| 316 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node, |
| 317 const std::string& error) { |
| 318 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); |
| 319 if (peer) |
| 320 peer->NotifyBadMessage(error); |
| 321 } |
| 322 |
| 313 void NodeController::ConnectToChildOnIOThread( | 323 void NodeController::ConnectToChildOnIOThread( |
| 314 base::ProcessHandle process_handle, | 324 base::ProcessHandle process_handle, |
| 315 ScopedPlatformHandle platform_handle, | 325 ScopedPlatformHandle platform_handle, |
| 316 ports::NodeName token) { | 326 ports::NodeName token, |
| 327 const ProcessErrorCallback& process_error_callback) { |
| 317 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 328 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 318 | 329 |
| 319 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL) | 330 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL) |
| 320 PlatformChannelPair node_channel; | 331 PlatformChannelPair node_channel; |
| 321 // BrokerHost owns itself. | 332 // BrokerHost owns itself. |
| 322 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); | 333 BrokerHost* broker_host = new BrokerHost(std::move(platform_handle)); |
| 323 broker_host->SendChannel(node_channel.PassClientHandle()); | 334 broker_host->SendChannel(node_channel.PassClientHandle()); |
| 324 scoped_refptr<NodeChannel> channel = NodeChannel::Create( | 335 scoped_refptr<NodeChannel> channel = NodeChannel::Create( |
| 325 this, node_channel.PassServerHandle(), io_task_runner_); | 336 this, node_channel.PassServerHandle(), io_task_runner_, |
| 337 process_error_callback); |
| 326 #else | 338 #else |
| 327 scoped_refptr<NodeChannel> channel = | 339 scoped_refptr<NodeChannel> channel = |
| 328 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); | 340 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_, |
| 341 process_error_callback); |
| 329 #endif | 342 #endif |
| 330 | 343 |
| 331 // We set up the child channel with a temporary name so it can be identified | 344 // We set up the child channel with a temporary name so it can be identified |
| 332 // as a pending child if it writes any messages to the channel. We may start | 345 // as a pending child if it writes any messages to the channel. We may start |
| 333 // receiving messages from it (though we shouldn't) as soon as Start() is | 346 // receiving messages from it (though we shouldn't) as soon as Start() is |
| 334 // called below. | 347 // called below. |
| 335 | 348 |
| 336 pending_children_.insert(std::make_pair(token, channel)); | 349 pending_children_.insert(std::make_pair(token, channel)); |
| 337 RecordPendingChildCount(pending_children_.size()); | 350 RecordPendingChildCount(pending_children_.size()); |
| 338 | 351 |
| 339 channel->SetRemoteNodeName(token); | 352 channel->SetRemoteNodeName(token); |
| 340 channel->SetRemoteProcessHandle(process_handle); | 353 channel->SetRemoteProcessHandle(process_handle); |
| 341 channel->Start(); | 354 channel->Start(); |
| 342 | 355 |
| 343 channel->AcceptChild(name_, token); | 356 channel->AcceptChild(name_, token); |
| 344 } | 357 } |
| 345 | 358 |
| 346 void NodeController::ConnectToParentOnIOThread( | 359 void NodeController::ConnectToParentOnIOThread( |
| 347 ScopedPlatformHandle platform_handle) { | 360 ScopedPlatformHandle platform_handle) { |
| 348 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 361 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 349 | 362 |
| 350 { | 363 { |
| 351 base::AutoLock lock(parent_lock_); | 364 base::AutoLock lock(parent_lock_); |
| 352 DCHECK(parent_name_ == ports::kInvalidNodeName); | 365 DCHECK(parent_name_ == ports::kInvalidNodeName); |
| 353 | 366 |
| 354 // At this point we don't know the parent's name, so we can't yet insert it | 367 // At this point we don't know the parent's name, so we can't yet insert it |
| 355 // into our |peers_| map. That will happen as soon as we receive an | 368 // into our |peers_| map. That will happen as soon as we receive an |
| 356 // AcceptChild message from them. | 369 // AcceptChild message from them. |
| 357 bootstrap_parent_channel_ = | 370 bootstrap_parent_channel_ = |
| 358 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_); | 371 NodeChannel::Create(this, std::move(platform_handle), io_task_runner_, |
| 372 ProcessErrorCallback()); |
| 359 } | 373 } |
| 360 bootstrap_parent_channel_->Start(); | 374 bootstrap_parent_channel_->Start(); |
| 361 } | 375 } |
| 362 | 376 |
| 363 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( | 377 scoped_refptr<NodeChannel> NodeController::GetPeerChannel( |
| 364 const ports::NodeName& name) { | 378 const ports::NodeName& name) { |
| 365 base::AutoLock lock(peers_lock_); | 379 base::AutoLock lock(peers_lock_); |
| 366 auto it = peers_.find(name); | 380 auto it = peers_.find(name); |
| 367 if (it == peers_.end()) | 381 if (it == peers_.end()) |
| 368 return nullptr; | 382 return nullptr; |
| (...skipping 380 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 749 } | 763 } |
| 750 | 764 |
| 751 if (GetPeerChannel(client_name)) { | 765 if (GetPeerChannel(client_name)) { |
| 752 DLOG(ERROR) << "Ignoring AddBrokerClient for known client."; | 766 DLOG(ERROR) << "Ignoring AddBrokerClient for known client."; |
| 753 DropPeer(from_node); | 767 DropPeer(from_node); |
| 754 return; | 768 return; |
| 755 } | 769 } |
| 756 | 770 |
| 757 PlatformChannelPair broker_channel; | 771 PlatformChannelPair broker_channel; |
| 758 scoped_refptr<NodeChannel> client = NodeChannel::Create( | 772 scoped_refptr<NodeChannel> client = NodeChannel::Create( |
| 759 this, broker_channel.PassServerHandle(), io_task_runner_); | 773 this, broker_channel.PassServerHandle(), io_task_runner_, |
| 774 ProcessErrorCallback()); |
| 760 | 775 |
| 761 #if defined(OS_WIN) | 776 #if defined(OS_WIN) |
| 762 // The broker must have a working handle to the client process in order to | 777 // The broker must have a working handle to the client process in order to |
| 763 // properly copy other handles to and from the client. | 778 // properly copy other handles to and from the client. |
| 764 if (!scoped_process_handle.is_valid()) { | 779 if (!scoped_process_handle.is_valid()) { |
| 765 DLOG(ERROR) << "Broker rejecting client with invalid process handle."; | 780 DLOG(ERROR) << "Broker rejecting client with invalid process handle."; |
| 766 return; | 781 return; |
| 767 } | 782 } |
| 768 client->SetRemoteProcessHandle(scoped_process_handle.release().handle); | 783 client->SetRemoteProcessHandle(scoped_process_handle.release().handle); |
| 769 #else | 784 #else |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 826 | 841 |
| 827 // It's now possible to add both the broker and the parent as peers. | 842 // It's now possible to add both the broker and the parent as peers. |
| 828 // Note that the broker and parent may be the same node. | 843 // Note that the broker and parent may be the same node. |
| 829 scoped_refptr<NodeChannel> broker; | 844 scoped_refptr<NodeChannel> broker; |
| 830 if (broker_name == parent_name) { | 845 if (broker_name == parent_name) { |
| 831 DCHECK(!broker_channel.is_valid()); | 846 DCHECK(!broker_channel.is_valid()); |
| 832 broker = parent; | 847 broker = parent; |
| 833 } else { | 848 } else { |
| 834 DCHECK(broker_channel.is_valid()); | 849 DCHECK(broker_channel.is_valid()); |
| 835 broker = NodeChannel::Create(this, std::move(broker_channel), | 850 broker = NodeChannel::Create(this, std::move(broker_channel), |
| 836 io_task_runner_); | 851 io_task_runner_, ProcessErrorCallback()); |
| 837 AddPeer(broker_name, broker, true /* start_channel */); | 852 AddPeer(broker_name, broker, true /* start_channel */); |
| 838 } | 853 } |
| 839 | 854 |
| 840 AddPeer(parent_name, parent, false /* start_channel */); | 855 AddPeer(parent_name, parent, false /* start_channel */); |
| 841 | 856 |
| 842 { | 857 { |
| 843 // Complete any port merge requests we have waiting for the parent. | 858 // Complete any port merge requests we have waiting for the parent. |
| 844 base::AutoLock lock(pending_port_merges_lock_); | 859 base::AutoLock lock(pending_port_merges_lock_); |
| 845 for (const auto& request : pending_port_merges_) | 860 for (const auto& request : pending_port_merges_) |
| 846 parent->RequestPortMerge(request.second.name(), request.first); | 861 parent->RequestPortMerge(request.second.name(), request.first); |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 888 if (!ports::Message::Parse(data, | 903 if (!ports::Message::Parse(data, |
| 889 num_data_bytes, | 904 num_data_bytes, |
| 890 &num_header_bytes, | 905 &num_header_bytes, |
| 891 &num_payload_bytes, | 906 &num_payload_bytes, |
| 892 &num_ports_bytes)) { | 907 &num_ports_bytes)) { |
| 893 DropPeer(from_node); | 908 DropPeer(from_node); |
| 894 return; | 909 return; |
| 895 } | 910 } |
| 896 | 911 |
| 897 CHECK(channel_message); | 912 CHECK(channel_message); |
| 898 ports::ScopedMessage message( | 913 std::unique_ptr<PortsMessage> ports_message( |
| 899 new PortsMessage(num_header_bytes, | 914 new PortsMessage(num_header_bytes, |
| 900 num_payload_bytes, | 915 num_payload_bytes, |
| 901 num_ports_bytes, | 916 num_ports_bytes, |
| 902 std::move(channel_message))); | 917 std::move(channel_message))); |
| 903 | 918 ports_message->set_source_node(from_node); |
| 904 node_->AcceptMessage(std::move(message)); | 919 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); |
| 905 AcceptIncomingMessages(); | 920 AcceptIncomingMessages(); |
| 906 } | 921 } |
| 907 | 922 |
| 908 void NodeController::OnRequestPortMerge( | 923 void NodeController::OnRequestPortMerge( |
| 909 const ports::NodeName& from_node, | 924 const ports::NodeName& from_node, |
| 910 const ports::PortName& connector_port_name, | 925 const ports::PortName& connector_port_name, |
| 911 const std::string& token) { | 926 const std::string& token) { |
| 912 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 927 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 913 | 928 |
| 914 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " | 929 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token " |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 962 if (!channel_handle.is_valid()) { | 977 if (!channel_handle.is_valid()) { |
| 963 node_->LostConnectionToNode(name); | 978 node_->LostConnectionToNode(name); |
| 964 | 979 |
| 965 DLOG(ERROR) << "Could not be introduced to peer " << name; | 980 DLOG(ERROR) << "Could not be introduced to peer " << name; |
| 966 base::AutoLock lock(peers_lock_); | 981 base::AutoLock lock(peers_lock_); |
| 967 pending_peer_messages_.erase(name); | 982 pending_peer_messages_.erase(name); |
| 968 return; | 983 return; |
| 969 } | 984 } |
| 970 | 985 |
| 971 scoped_refptr<NodeChannel> channel = | 986 scoped_refptr<NodeChannel> channel = |
| 972 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_); | 987 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_, |
| 988 ProcessErrorCallback()); |
| 973 | 989 |
| 974 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; | 990 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; |
| 975 AddPeer(name, channel, true /* start_channel */); | 991 AddPeer(name, channel, true /* start_channel */); |
| 976 } | 992 } |
| 977 | 993 |
| 978 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) | 994 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
| 979 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, | 995 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
| 980 base::ProcessHandle from_process, | 996 base::ProcessHandle from_process, |
| 981 const ports::NodeName& destination, | 997 const ports::NodeName& destination, |
| 982 Channel::MessagePtr message) { | 998 Channel::MessagePtr message) { |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1029 #endif // defined(OS_WIN) | 1045 #endif // defined(OS_WIN) |
| 1030 | 1046 |
| 1031 if (destination == name_) { | 1047 if (destination == name_) { |
| 1032 // Great, we can deliver this message locally. | 1048 // Great, we can deliver this message locally. |
| 1033 OnPortsMessage(from_node, std::move(message)); | 1049 OnPortsMessage(from_node, std::move(message)); |
| 1034 return; | 1050 return; |
| 1035 } | 1051 } |
| 1036 | 1052 |
| 1037 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination); | 1053 scoped_refptr<NodeChannel> peer = GetPeerChannel(destination); |
| 1038 if (peer) | 1054 if (peer) |
| 1039 peer->PortsMessage(std::move(message)); | 1055 peer->PortsMessageFromRelay(from_node, std::move(message)); |
| 1040 else | 1056 else |
| 1041 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; | 1057 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; |
| 1042 } | 1058 } |
| 1059 |
| 1060 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node, |
| 1061 const ports::NodeName& source_node, |
| 1062 Channel::MessagePtr message) { |
| 1063 if (GetPeerChannel(from_node) != GetBrokerChannel()) { |
| 1064 LOG(ERROR) << "Refusing relayed message from non-broker node."; |
| 1065 DropPeer(from_node); |
| 1066 return; |
| 1067 } |
| 1068 |
| 1069 OnPortsMessage(source_node, std::move(message)); |
| 1070 } |
| 1043 #endif | 1071 #endif |
| 1044 | 1072 |
| 1045 void NodeController::OnChannelError(const ports::NodeName& from_node) { | 1073 void NodeController::OnChannelError(const ports::NodeName& from_node) { |
| 1046 if (io_task_runner_->RunsTasksOnCurrentThread()) { | 1074 if (io_task_runner_->RunsTasksOnCurrentThread()) { |
| 1047 DropPeer(from_node); | 1075 DropPeer(from_node); |
| 1048 // DropPeer may have caused local port closures, so be sure to process any | 1076 // DropPeer may have caused local port closures, so be sure to process any |
| 1049 // pending local messages. | 1077 // pending local messages. |
| 1050 AcceptIncomingMessages(); | 1078 AcceptIncomingMessages(); |
| 1051 } else { | 1079 } else { |
| 1052 io_task_runner_->PostTask( | 1080 io_task_runner_->PostTask( |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1093 shutdown_callback_flag_.Set(false); | 1121 shutdown_callback_flag_.Set(false); |
| 1094 } | 1122 } |
| 1095 | 1123 |
| 1096 DCHECK(!callback.is_null()); | 1124 DCHECK(!callback.is_null()); |
| 1097 | 1125 |
| 1098 callback.Run(); | 1126 callback.Run(); |
| 1099 } | 1127 } |
| 1100 | 1128 |
| 1101 } // namespace edk | 1129 } // namespace edk |
| 1102 } // namespace mojo | 1130 } // namespace mojo |
| OLD | NEW |