Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(90)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2135113002: [mojo-edk] Close pending merges if the parent channel dies. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Revert stuff for merge. Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 292 matching lines...) Expand 10 before | Expand all | Expand 10 after
303 reserved_ports_.erase(it); 303 reserved_ports_.erase(it);
304 was_merged = true; 304 was_merged = true;
305 } 305 }
306 } 306 }
307 if (was_merged) { 307 if (was_merged) {
308 AcceptIncomingMessages(); 308 AcceptIncomingMessages();
309 return; 309 return;
310 } 310 }
311 311
312 scoped_refptr<NodeChannel> parent; 312 scoped_refptr<NodeChannel> parent;
313 bool reject_merge = false;
313 { 314 {
314 // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise, 315 // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise,
315 // there is a race where the parent can be set, and |pending_port_merges_| 316 // there is a race where the parent can be set, and |pending_port_merges_|
316 // be processed between retrieving |parent| and adding the merge to 317 // be processed between retrieving |parent| and adding the merge to
317 // |pending_port_merges_|. 318 // |pending_port_merges_|.
318 base::AutoLock lock(pending_port_merges_lock_); 319 base::AutoLock lock(pending_port_merges_lock_);
319 parent = GetParentChannel(); 320 parent = GetParentChannel();
320 if (!parent) { 321 if (reject_pending_merges_) {
322 reject_merge = true;
323 } else if (!parent) {
321 pending_port_merges_.push_back(std::make_pair(token, port)); 324 pending_port_merges_.push_back(std::make_pair(token, port));
322 return; 325 return;
323 } 326 }
324 } 327 }
328 if (reject_merge) {
329 node_->ClosePort(port);
330 DVLOG(2) << "Rejecting port merge for token " << token
331 << " due to closed parent channel.";
332 AcceptIncomingMessages();
333 return;
334 }
335
325 parent->RequestPortMerge(port.name(), token); 336 parent->RequestPortMerge(port.name(), token);
326 } 337 }
327 338
328 int NodeController::MergeLocalPorts(const ports::PortRef& port0, 339 int NodeController::MergeLocalPorts(const ports::PortRef& port0,
329 const ports::PortRef& port1) { 340 const ports::PortRef& port1) {
330 int rv = node_->MergeLocalPorts(port0, port1); 341 int rv = node_->MergeLocalPorts(port0, port1);
331 AcceptIncomingMessages(); 342 AcceptIncomingMessages();
332 return rv; 343 return rv;
333 } 344 }
334 345
(...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after
488 if (start_channel) 499 if (start_channel)
489 channel->Start(); 500 channel->Start();
490 501
491 // Flush any queued message we need to deliver to this node. 502 // Flush any queued message we need to deliver to this node.
492 while (!pending_messages.empty()) { 503 while (!pending_messages.empty()) {
493 channel->PortsMessage(std::move(pending_messages.front())); 504 channel->PortsMessage(std::move(pending_messages.front()));
494 pending_messages.pop(); 505 pending_messages.pop();
495 } 506 }
496 } 507 }
497 508
498 void NodeController::DropPeer(const ports::NodeName& name) { 509 void NodeController::DropPeer(const ports::NodeName& name,
510 NodeChannel* channel) {
499 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 511 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
500 512
501 { 513 {
502 base::AutoLock lock(peers_lock_); 514 base::AutoLock lock(peers_lock_);
503 auto it = peers_.find(name); 515 auto it = peers_.find(name);
504 516
505 if (it != peers_.end()) { 517 if (it != peers_.end()) {
506 ports::NodeName peer = it->first; 518 ports::NodeName peer = it->first;
507 peers_.erase(it); 519 peers_.erase(it);
508 DVLOG(1) << "Dropped peer " << peer; 520 DVLOG(1) << "Dropped peer " << peer;
(...skipping 26 matching lines...) Expand all
535 // We have to erase reserved ports in a two-step manner because the usual 547 // We have to erase reserved ports in a two-step manner because the usual
536 // manner of using the returned iterator from map::erase isn't technically 548 // manner of using the returned iterator from map::erase isn't technically
537 // valid in C++11 (although it is in C++14). 549 // valid in C++11 (although it is in C++14).
538 for (const auto& token : port_tokens) 550 for (const auto& token : port_tokens)
539 reserved_ports_.erase(token); 551 reserved_ports_.erase(token);
540 552
541 pending_child_tokens_.erase(it); 553 pending_child_tokens_.erase(it);
542 } 554 }
543 } 555 }
544 556
557 bool is_parent;
558 {
559 base::AutoLock lock(parent_lock_);
560 is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_);
561 }
562 // If the error comes from the parent channel, we also need to cancel any
563 // port merge requests, so that errors can be propagated to the message
564 // pipes.
565 if (is_parent) {
566 base::AutoLock lock(pending_port_merges_lock_);
567 reject_pending_merges_ = true;
568
569 for (const auto& port : pending_port_merges_)
570 ports_to_close.push_back(port.second);
571 pending_port_merges_.clear();
572 }
573
545 for (const auto& port : ports_to_close) 574 for (const auto& port : ports_to_close)
546 node_->ClosePort(port); 575 node_->ClosePort(port);
547 576
548 node_->LostConnectionToNode(name); 577 node_->LostConnectionToNode(name);
549 578
550 AcceptIncomingMessages(); 579 AcceptIncomingMessages();
551 } 580 }
552 581
553 void NodeController::SendPeerMessage(const ports::NodeName& name, 582 void NodeController::SendPeerMessage(const ports::NodeName& name,
554 ports::ScopedMessage message) { 583 ports::ScopedMessage message) {
(...skipping 216 matching lines...) Expand 10 before | Expand all | Expand 10 after
771 { 800 {
772 base::AutoLock lock(parent_lock_); 801 base::AutoLock lock(parent_lock_);
773 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { 802 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) {
774 parent_name_ = parent_name; 803 parent_name_ = parent_name;
775 parent = bootstrap_parent_channel_; 804 parent = bootstrap_parent_channel_;
776 } 805 }
777 } 806 }
778 807
779 if (!parent) { 808 if (!parent) {
780 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; 809 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
781 DropPeer(from_node); 810 DropPeer(from_node, nullptr);
782 return; 811 return;
783 } 812 }
784 813
785 parent->SetRemoteNodeName(parent_name); 814 parent->SetRemoteNodeName(parent_name);
786 parent->AcceptParent(token, name_); 815 parent->AcceptParent(token, name_);
787 816
788 // NOTE: The child does not actually add its parent as a peer until 817 // NOTE: The child does not actually add its parent as a peer until
789 // receiving an AcceptBrokerClient message from the broker. The parent 818 // receiving an AcceptBrokerClient message from the broker. The parent
790 // will request that said message be sent upon receiving AcceptParent. 819 // will request that said message be sent upon receiving AcceptParent.
791 820
792 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; 821 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
793 } 822 }
794 823
795 void NodeController::OnAcceptParent(const ports::NodeName& from_node, 824 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
796 const ports::NodeName& token, 825 const ports::NodeName& token,
797 const ports::NodeName& child_name) { 826 const ports::NodeName& child_name) {
798 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 827 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
799 828
800 auto it = pending_children_.find(from_node); 829 auto it = pending_children_.find(from_node);
801 if (it == pending_children_.end() || token != from_node) { 830 if (it == pending_children_.end() || token != from_node) {
802 DLOG(ERROR) << "Received unexpected AcceptParent message from " 831 DLOG(ERROR) << "Received unexpected AcceptParent message from "
803 << from_node; 832 << from_node;
804 DropPeer(from_node); 833 DropPeer(from_node, nullptr);
805 return; 834 return;
806 } 835 }
807 836
808 scoped_refptr<NodeChannel> channel = it->second; 837 scoped_refptr<NodeChannel> channel = it->second;
809 pending_children_.erase(it); 838 pending_children_.erase(it);
810 839
811 DCHECK(channel); 840 DCHECK(channel);
812 841
813 DVLOG(1) << "Parent " << name_ << " accepted child " << child_name; 842 DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
814 843
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
851 ScopedPlatformHandle(PlatformHandle(process_handle)); 880 ScopedPlatformHandle(PlatformHandle(process_handle));
852 #endif 881 #endif
853 scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node); 882 scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
854 if (!sender) { 883 if (!sender) {
855 DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender."; 884 DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
856 return; 885 return;
857 } 886 }
858 887
859 if (GetPeerChannel(client_name)) { 888 if (GetPeerChannel(client_name)) {
860 DLOG(ERROR) << "Ignoring AddBrokerClient for known client."; 889 DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
861 DropPeer(from_node); 890 DropPeer(from_node, nullptr);
862 return; 891 return;
863 } 892 }
864 893
865 PlatformChannelPair broker_channel; 894 PlatformChannelPair broker_channel;
866 scoped_refptr<NodeChannel> client = NodeChannel::Create( 895 scoped_refptr<NodeChannel> client = NodeChannel::Create(
867 this, broker_channel.PassServerHandle(), io_task_runner_, 896 this, broker_channel.PassServerHandle(), io_task_runner_,
868 ProcessErrorCallback()); 897 ProcessErrorCallback());
869 898
870 #if defined(OS_WIN) 899 #if defined(OS_WIN)
871 // The broker must have a working handle to the client process in order to 900 // The broker must have a working handle to the client process in order to
(...skipping 110 matching lines...) Expand 10 before | Expand all | Expand 10 after
982 1011
983 void NodeController::OnPortsMessage(const ports::NodeName& from_node, 1012 void NodeController::OnPortsMessage(const ports::NodeName& from_node,
984 Channel::MessagePtr channel_message) { 1013 Channel::MessagePtr channel_message) {
985 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1014 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
986 1015
987 void* data; 1016 void* data;
988 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; 1017 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
989 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, 1018 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
990 &num_header_bytes, &num_payload_bytes, 1019 &num_header_bytes, &num_payload_bytes,
991 &num_ports_bytes)) { 1020 &num_ports_bytes)) {
992 DropPeer(from_node); 1021 DropPeer(from_node, nullptr);
993 return; 1022 return;
994 } 1023 }
995 1024
996 CHECK(channel_message); 1025 CHECK(channel_message);
997 std::unique_ptr<PortsMessage> ports_message( 1026 std::unique_ptr<PortsMessage> ports_message(
998 new PortsMessage(num_header_bytes, 1027 new PortsMessage(num_header_bytes,
999 num_payload_bytes, 1028 num_payload_bytes,
1000 num_ports_bytes, 1029 num_ports_bytes,
1001 std::move(channel_message))); 1030 std::move(channel_message)));
1002 ports_message->set_source_node(from_node); 1031 ports_message->set_source_node(from_node);
(...skipping 30 matching lines...) Expand all
1033 } 1062 }
1034 1063
1035 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 1064 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
1036 const ports::NodeName& name) { 1065 const ports::NodeName& name) {
1037 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1066 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1038 1067
1039 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); 1068 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
1040 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { 1069 if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
1041 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " 1070 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
1042 << from_node; 1071 << from_node;
1043 DropPeer(from_node); 1072 DropPeer(from_node, nullptr);
1044 return; 1073 return;
1045 } 1074 }
1046 1075
1047 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); 1076 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
1048 if (!new_friend) { 1077 if (!new_friend) {
1049 // We don't know who they're talking about! 1078 // We don't know who they're talking about!
1050 requestor->Introduce(name, ScopedPlatformHandle()); 1079 requestor->Introduce(name, ScopedPlatformHandle());
1051 } else { 1080 } else {
1052 PlatformChannelPair new_channel; 1081 PlatformChannelPair new_channel;
1053 requestor->Introduce(name, new_channel.PassServerHandle()); 1082 requestor->Introduce(name, new_channel.PassServerHandle());
(...skipping 25 matching lines...) Expand all
1079 1108
1080 void NodeController::OnBroadcast(const ports::NodeName& from_node, 1109 void NodeController::OnBroadcast(const ports::NodeName& from_node,
1081 Channel::MessagePtr message) { 1110 Channel::MessagePtr message) {
1082 DCHECK(!message->has_handles()); 1111 DCHECK(!message->has_handles());
1083 1112
1084 void* data; 1113 void* data;
1085 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; 1114 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1086 if (!ParsePortsMessage(message.get(), &data, &num_data_bytes, 1115 if (!ParsePortsMessage(message.get(), &data, &num_data_bytes,
1087 &num_header_bytes, &num_payload_bytes, 1116 &num_header_bytes, &num_payload_bytes,
1088 &num_ports_bytes)) { 1117 &num_ports_bytes)) {
1089 DropPeer(from_node); 1118 DropPeer(from_node, nullptr);
1090 return; 1119 return;
1091 } 1120 }
1092 1121
1093 // Broadcast messages must not contain ports. 1122 // Broadcast messages must not contain ports.
1094 if (num_ports_bytes > 0) { 1123 if (num_ports_bytes > 0) {
1095 DropPeer(from_node); 1124 DropPeer(from_node, nullptr);
1096 return; 1125 return;
1097 } 1126 }
1098 1127
1099 base::AutoLock lock(peers_lock_); 1128 base::AutoLock lock(peers_lock_);
1100 for (auto& iter : peers_) { 1129 for (auto& iter : peers_) {
1101 // Copy and send the message to each known peer. 1130 // Copy and send the message to each known peer.
1102 Channel::MessagePtr peer_message( 1131 Channel::MessagePtr peer_message(
1103 new Channel::Message(message->payload_size(), 0)); 1132 new Channel::Message(message->payload_size(), 0));
1104 memcpy(peer_message->mutable_payload(), message->payload(), 1133 memcpy(peer_message->mutable_payload(), message->payload(),
1105 message->payload_size()); 1134 message->payload_size());
1106 iter.second->PortsMessage(std::move(peer_message)); 1135 iter.second->PortsMessage(std::move(peer_message));
1107 } 1136 }
1108 } 1137 }
1109 1138
1110 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1139 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
1111 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, 1140 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
1112 base::ProcessHandle from_process, 1141 base::ProcessHandle from_process,
1113 const ports::NodeName& destination, 1142 const ports::NodeName& destination,
1114 Channel::MessagePtr message) { 1143 Channel::MessagePtr message) {
1115 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1144 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1116 1145
1117 if (GetBrokerChannel()) { 1146 if (GetBrokerChannel()) {
1118 // Only the broker should be asked to relay a message. 1147 // Only the broker should be asked to relay a message.
1119 LOG(ERROR) << "Non-broker refusing to relay message."; 1148 LOG(ERROR) << "Non-broker refusing to relay message.";
1120 DropPeer(from_node); 1149 DropPeer(from_node, nullptr);
1121 return; 1150 return;
1122 } 1151 }
1123 1152
1124 // The parent should always know which process this came from. 1153 // The parent should always know which process this came from.
1125 DCHECK(from_process != base::kNullProcessHandle); 1154 DCHECK(from_process != base::kNullProcessHandle);
1126 1155
1127 #if defined(OS_WIN) 1156 #if defined(OS_WIN)
1128 // Rewrite the handles to this (the parent) process. If the message is 1157 // Rewrite the handles to this (the parent) process. If the message is
1129 // destined for another child process, the handles will be rewritten to that 1158 // destined for another child process, the handles will be rewritten to that
1130 // process before going out (see NodeChannel::WriteChannelMessage). 1159 // process before going out (see NodeChannel::WriteChannelMessage).
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
1171 peer->PortsMessageFromRelay(from_node, std::move(message)); 1200 peer->PortsMessageFromRelay(from_node, std::move(message));
1172 else 1201 else
1173 DLOG(ERROR) << "Dropping relay message for unknown node " << destination; 1202 DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
1174 } 1203 }
1175 1204
1176 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node, 1205 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node,
1177 const ports::NodeName& source_node, 1206 const ports::NodeName& source_node,
1178 Channel::MessagePtr message) { 1207 Channel::MessagePtr message) {
1179 if (GetPeerChannel(from_node) != GetBrokerChannel()) { 1208 if (GetPeerChannel(from_node) != GetBrokerChannel()) {
1180 LOG(ERROR) << "Refusing relayed message from non-broker node."; 1209 LOG(ERROR) << "Refusing relayed message from non-broker node.";
1181 DropPeer(from_node); 1210 DropPeer(from_node, nullptr);
1182 return; 1211 return;
1183 } 1212 }
1184 1213
1185 OnPortsMessage(source_node, std::move(message)); 1214 OnPortsMessage(source_node, std::move(message));
1186 } 1215 }
1187 #endif 1216 #endif
1188 1217
1189 void NodeController::OnChannelError(const ports::NodeName& from_node) { 1218 void NodeController::OnChannelError(const ports::NodeName& from_node,
1219 NodeChannel* channel) {
1190 if (io_task_runner_->RunsTasksOnCurrentThread()) { 1220 if (io_task_runner_->RunsTasksOnCurrentThread()) {
1191 DropPeer(from_node); 1221 DropPeer(from_node, channel);
1192 // DropPeer may have caused local port closures, so be sure to process any 1222 // DropPeer may have caused local port closures, so be sure to process any
1193 // pending local messages. 1223 // pending local messages.
1194 AcceptIncomingMessages(); 1224 AcceptIncomingMessages();
1195 } else { 1225 } else {
1196 io_task_runner_->PostTask( 1226 io_task_runner_->PostTask(
1197 FROM_HERE, 1227 FROM_HERE,
1198 base::Bind(&NodeController::OnChannelError, base::Unretained(this), 1228 base::Bind(&NodeController::OnChannelError, base::Unretained(this),
1199 from_node)); 1229 from_node, channel));
1200 } 1230 }
1201 } 1231 }
1202 1232
1203 #if defined(OS_MACOSX) && !defined(OS_IOS) 1233 #if defined(OS_MACOSX) && !defined(OS_IOS)
1204 MachPortRelay* NodeController::GetMachPortRelay() { 1234 MachPortRelay* NodeController::GetMachPortRelay() {
1205 { 1235 {
1206 base::AutoLock lock(parent_lock_); 1236 base::AutoLock lock(parent_lock_);
1207 // Return null if we're not the root. 1237 // Return null if we're not the root.
1208 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) 1238 if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName)
1209 return nullptr; 1239 return nullptr;
(...skipping 27 matching lines...) Expand all
1237 shutdown_callback_flag_.Set(false); 1267 shutdown_callback_flag_.Set(false);
1238 } 1268 }
1239 1269
1240 DCHECK(!callback.is_null()); 1270 DCHECK(!callback.is_null());
1241 1271
1242 callback.Run(); 1272 callback.Run();
1243 } 1273 }
1244 1274
1245 } // namespace edk 1275 } // namespace edk
1246 } // namespace mojo 1276 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698