Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(387)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 1975073002: [mojo-edk] Broadcast surprise port disruptions (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@reenable-clean-shutdown
Patch Set: . Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports/node.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
66 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); 66 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
67 67
68 // 8k is the maximum number of file descriptors allowed in Chrome. 68 // 8k is the maximum number of file descriptors allowed in Chrome.
69 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren", 69 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren",
70 static_cast<int32_t>(count), 70 static_cast<int32_t>(count),
71 0 /* min */, 71 0 /* min */,
72 8000 /* max */, 72 8000 /* max */,
73 50 /* bucket count */); 73 50 /* bucket count */);
74 } 74 }
75 75
76 bool ParsePortsMessage(Channel::Message* message,
77 void** data,
78 size_t* num_data_bytes,
79 size_t* num_header_bytes,
80 size_t* num_payload_bytes,
81 size_t* num_ports_bytes) {
82 DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes &&
83 num_ports_bytes);
84
85 NodeChannel::GetPortsMessageData(message, data, num_data_bytes);
86 if (!*num_data_bytes)
87 return false;
88
89 if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes,
90 num_payload_bytes, num_ports_bytes)) {
91 return false;
92 }
93
94 return true;
95 }
96
76 // Used by NodeController to watch for shutdown. Since no IO can happen once 97 // Used by NodeController to watch for shutdown. Since no IO can happen once
77 // the IO thread is killed, the NodeController can cleanly drop all its peers 98 // the IO thread is killed, the NodeController can cleanly drop all its peers
78 // at that time. 99 // at that time.
79 class ThreadDestructionObserver : 100 class ThreadDestructionObserver :
80 public base::MessageLoop::DestructionObserver { 101 public base::MessageLoop::DestructionObserver {
81 public: 102 public:
82 static void Create(scoped_refptr<base::TaskRunner> task_runner, 103 static void Create(scoped_refptr<base::TaskRunner> task_runner,
83 const base::Closure& callback) { 104 const base::Closure& callback) {
84 if (task_runner->RunsTasksOnCurrentThread()) { 105 if (task_runner->RunsTasksOnCurrentThread()) {
85 // Owns itself. 106 // Owns itself.
(...skipping 579 matching lines...) Expand 10 before | Expand all | Expand 10 after
665 // (synchronously) in response to Node's ClosePort, SendMessage, or 686 // (synchronously) in response to Node's ClosePort, SendMessage, or
666 // AcceptMessage, we flush the queue after calling any of those methods. 687 // AcceptMessage, we flush the queue after calling any of those methods.
667 base::AutoLock lock(messages_lock_); 688 base::AutoLock lock(messages_lock_);
668 incoming_messages_.emplace(std::move(message)); 689 incoming_messages_.emplace(std::move(message));
669 incoming_messages_flag_.Set(true); 690 incoming_messages_flag_.Set(true);
670 } else { 691 } else {
671 SendPeerMessage(node, std::move(message)); 692 SendPeerMessage(node, std::move(message));
672 } 693 }
673 } 694 }
674 695
696 void NodeController::BroadcastMessage(ports::ScopedMessage message) {
697 CHECK_EQ(message->num_ports(), 0u);
698 Channel::MessagePtr channel_message =
699 static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
700 CHECK(!channel_message->has_handles());
701
702 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
703 if (broker)
704 broker->Broadcast(std::move(channel_message));
705 else
706 OnBroadcast(name_, std::move(channel_message));
707 }
708
675 void NodeController::PortStatusChanged(const ports::PortRef& port) { 709 void NodeController::PortStatusChanged(const ports::PortRef& port) {
676 scoped_refptr<ports::UserData> user_data; 710 scoped_refptr<ports::UserData> user_data;
677 node_->GetUserData(port, &user_data); 711 node_->GetUserData(port, &user_data);
678 712
679 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); 713 PortObserver* observer = static_cast<PortObserver*>(user_data.get());
680 if (observer) { 714 if (observer) {
681 observer->OnPortStatusChanged(); 715 observer->OnPortStatusChanged();
682 } else { 716 } else {
683 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " 717 DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
684 << "doesn't have an observer."; 718 << "doesn't have an observer.";
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after
899 #endif 933 #endif
900 934
901 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; 935 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name;
902 } 936 }
903 937
904 void NodeController::OnPortsMessage(const ports::NodeName& from_node, 938 void NodeController::OnPortsMessage(const ports::NodeName& from_node,
905 Channel::MessagePtr channel_message) { 939 Channel::MessagePtr channel_message) {
906 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 940 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
907 941
908 void* data; 942 void* data;
909 size_t num_data_bytes; 943 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
910 NodeChannel::GetPortsMessageData( 944 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
911 channel_message.get(), &data, &num_data_bytes); 945 &num_header_bytes, &num_payload_bytes,
912 if (!num_data_bytes) { 946 &num_ports_bytes)) {
913 DropPeer(from_node); 947 DropPeer(from_node);
914 return; 948 return;
915 } 949 }
916
917 size_t num_header_bytes, num_payload_bytes, num_ports_bytes;
918 if (!ports::Message::Parse(data,
919 num_data_bytes,
920 &num_header_bytes,
921 &num_payload_bytes,
922 &num_ports_bytes)) {
923 DropPeer(from_node);
924 return;
925 }
926 950
927 CHECK(channel_message); 951 CHECK(channel_message);
928 std::unique_ptr<PortsMessage> ports_message( 952 std::unique_ptr<PortsMessage> ports_message(
929 new PortsMessage(num_header_bytes, 953 new PortsMessage(num_header_bytes,
930 num_payload_bytes, 954 num_payload_bytes,
931 num_ports_bytes, 955 num_ports_bytes,
932 std::move(channel_message))); 956 std::move(channel_message)));
933 ports_message->set_source_node(from_node); 957 ports_message->set_source_node(from_node);
934 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); 958 node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
935 AcceptIncomingMessages(); 959 AcceptIncomingMessages();
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
999 } 1023 }
1000 1024
1001 scoped_refptr<NodeChannel> channel = 1025 scoped_refptr<NodeChannel> channel =
1002 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_, 1026 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_,
1003 ProcessErrorCallback()); 1027 ProcessErrorCallback());
1004 1028
1005 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; 1029 DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
1006 AddPeer(name, channel, true /* start_channel */); 1030 AddPeer(name, channel, true /* start_channel */);
1007 } 1031 }
1008 1032
1033 void NodeController::OnBroadcast(const ports::NodeName& from_node,
1034 Channel::MessagePtr message) {
1035 DCHECK(!message->has_handles());
1036
1037 void* data;
1038 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1039 if (!ParsePortsMessage(message.get(), &data, &num_data_bytes,
1040 &num_header_bytes, &num_payload_bytes,
1041 &num_ports_bytes)) {
1042 DropPeer(from_node);
1043 return;
1044 }
1045
1046 // Broadcast messages must not contain ports.
1047 if (num_ports_bytes > 0) {
1048 DropPeer(from_node);
1049 return;
1050 }
1051
1052 base::AutoLock lock(peers_lock_);
1053 for (auto& iter : peers_) {
1054 // Copy and send the message to each known peer.
1055 Channel::MessagePtr peer_message(
1056 new Channel::Message(message->payload_size(), 0));
1057 memcpy(peer_message->mutable_payload(), message->payload(),
1058 message->payload_size());
1059 iter.second->PortsMessage(std::move(peer_message));
1060 }
1061 }
1062
1009 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1063 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
1010 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, 1064 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
1011 base::ProcessHandle from_process, 1065 base::ProcessHandle from_process,
1012 const ports::NodeName& destination, 1066 const ports::NodeName& destination,
1013 Channel::MessagePtr message) { 1067 Channel::MessagePtr message) {
1014 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1068 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1015 1069
1016 if (GetBrokerChannel()) { 1070 if (GetBrokerChannel()) {
1017 // Only the broker should be asked to relay a message. 1071 // Only the broker should be asked to relay a message.
1018 LOG(ERROR) << "Non-broker refusing to relay message."; 1072 LOG(ERROR) << "Non-broker refusing to relay message.";
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after
1136 shutdown_callback_flag_.Set(false); 1190 shutdown_callback_flag_.Set(false);
1137 } 1191 }
1138 1192
1139 DCHECK(!callback.is_null()); 1193 DCHECK(!callback.is_null());
1140 1194
1141 callback.Run(); 1195 callback.Run();
1142 } 1196 }
1143 1197
1144 } // namespace edk 1198 } // namespace edk
1145 } // namespace mojo 1199 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_controller.h ('k') | mojo/edk/system/ports/node.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698