OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
66 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); | 66 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); |
67 | 67 |
68 // 8k is the maximum number of file descriptors allowed in Chrome. | 68 // 8k is the maximum number of file descriptors allowed in Chrome. |
69 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren", | 69 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren", |
70 static_cast<int32_t>(count), | 70 static_cast<int32_t>(count), |
71 0 /* min */, | 71 0 /* min */, |
72 8000 /* max */, | 72 8000 /* max */, |
73 50 /* bucket count */); | 73 50 /* bucket count */); |
74 } | 74 } |
75 | 75 |
| 76 bool ParsePortsMessage(Channel::Message* message, |
| 77 void** data, |
| 78 size_t* num_data_bytes, |
| 79 size_t* num_header_bytes, |
| 80 size_t* num_payload_bytes, |
| 81 size_t* num_ports_bytes) { |
| 82 DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes && |
| 83 num_ports_bytes); |
| 84 |
| 85 NodeChannel::GetPortsMessageData(message, data, num_data_bytes); |
| 86 if (!*num_data_bytes) |
| 87 return false; |
| 88 |
| 89 if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes, |
| 90 num_payload_bytes, num_ports_bytes)) { |
| 91 return false; |
| 92 } |
| 93 |
| 94 return true; |
| 95 } |
| 96 |
76 // Used by NodeController to watch for shutdown. Since no IO can happen once | 97 // Used by NodeController to watch for shutdown. Since no IO can happen once |
77 // the IO thread is killed, the NodeController can cleanly drop all its peers | 98 // the IO thread is killed, the NodeController can cleanly drop all its peers |
78 // at that time. | 99 // at that time. |
79 class ThreadDestructionObserver : | 100 class ThreadDestructionObserver : |
80 public base::MessageLoop::DestructionObserver { | 101 public base::MessageLoop::DestructionObserver { |
81 public: | 102 public: |
82 static void Create(scoped_refptr<base::TaskRunner> task_runner, | 103 static void Create(scoped_refptr<base::TaskRunner> task_runner, |
83 const base::Closure& callback) { | 104 const base::Closure& callback) { |
84 if (task_runner->RunsTasksOnCurrentThread()) { | 105 if (task_runner->RunsTasksOnCurrentThread()) { |
85 // Owns itself. | 106 // Owns itself. |
(...skipping 579 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
665 // (synchronously) in response to Node's ClosePort, SendMessage, or | 686 // (synchronously) in response to Node's ClosePort, SendMessage, or |
666 // AcceptMessage, we flush the queue after calling any of those methods. | 687 // AcceptMessage, we flush the queue after calling any of those methods. |
667 base::AutoLock lock(messages_lock_); | 688 base::AutoLock lock(messages_lock_); |
668 incoming_messages_.emplace(std::move(message)); | 689 incoming_messages_.emplace(std::move(message)); |
669 incoming_messages_flag_.Set(true); | 690 incoming_messages_flag_.Set(true); |
670 } else { | 691 } else { |
671 SendPeerMessage(node, std::move(message)); | 692 SendPeerMessage(node, std::move(message)); |
672 } | 693 } |
673 } | 694 } |
674 | 695 |
| 696 void NodeController::BroadcastMessage(ports::ScopedMessage message) { |
| 697 CHECK_EQ(message->num_ports(), 0u); |
| 698 Channel::MessagePtr channel_message = |
| 699 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); |
| 700 CHECK(!channel_message->has_handles()); |
| 701 |
| 702 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); |
| 703 if (broker) |
| 704 broker->Broadcast(std::move(channel_message)); |
| 705 else |
| 706 OnBroadcast(name_, std::move(channel_message)); |
| 707 } |
| 708 |
675 void NodeController::PortStatusChanged(const ports::PortRef& port) { | 709 void NodeController::PortStatusChanged(const ports::PortRef& port) { |
676 scoped_refptr<ports::UserData> user_data; | 710 scoped_refptr<ports::UserData> user_data; |
677 node_->GetUserData(port, &user_data); | 711 node_->GetUserData(port, &user_data); |
678 | 712 |
679 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); | 713 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); |
680 if (observer) { | 714 if (observer) { |
681 observer->OnPortStatusChanged(); | 715 observer->OnPortStatusChanged(); |
682 } else { | 716 } else { |
683 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " | 717 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " |
684 << "doesn't have an observer."; | 718 << "doesn't have an observer."; |
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
899 #endif | 933 #endif |
900 | 934 |
901 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; | 935 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; |
902 } | 936 } |
903 | 937 |
904 void NodeController::OnPortsMessage(const ports::NodeName& from_node, | 938 void NodeController::OnPortsMessage(const ports::NodeName& from_node, |
905 Channel::MessagePtr channel_message) { | 939 Channel::MessagePtr channel_message) { |
906 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 940 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
907 | 941 |
908 void* data; | 942 void* data; |
909 size_t num_data_bytes; | 943 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; |
910 NodeChannel::GetPortsMessageData( | 944 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, |
911 channel_message.get(), &data, &num_data_bytes); | 945 &num_header_bytes, &num_payload_bytes, |
912 if (!num_data_bytes) { | 946 &num_ports_bytes)) { |
913 DropPeer(from_node); | 947 DropPeer(from_node); |
914 return; | 948 return; |
915 } | 949 } |
916 | |
917 size_t num_header_bytes, num_payload_bytes, num_ports_bytes; | |
918 if (!ports::Message::Parse(data, | |
919 num_data_bytes, | |
920 &num_header_bytes, | |
921 &num_payload_bytes, | |
922 &num_ports_bytes)) { | |
923 DropPeer(from_node); | |
924 return; | |
925 } | |
926 | 950 |
927 CHECK(channel_message); | 951 CHECK(channel_message); |
928 std::unique_ptr<PortsMessage> ports_message( | 952 std::unique_ptr<PortsMessage> ports_message( |
929 new PortsMessage(num_header_bytes, | 953 new PortsMessage(num_header_bytes, |
930 num_payload_bytes, | 954 num_payload_bytes, |
931 num_ports_bytes, | 955 num_ports_bytes, |
932 std::move(channel_message))); | 956 std::move(channel_message))); |
933 ports_message->set_source_node(from_node); | 957 ports_message->set_source_node(from_node); |
934 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); | 958 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); |
935 AcceptIncomingMessages(); | 959 AcceptIncomingMessages(); |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
999 } | 1023 } |
1000 | 1024 |
1001 scoped_refptr<NodeChannel> channel = | 1025 scoped_refptr<NodeChannel> channel = |
1002 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_, | 1026 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_, |
1003 ProcessErrorCallback()); | 1027 ProcessErrorCallback()); |
1004 | 1028 |
1005 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; | 1029 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; |
1006 AddPeer(name, channel, true /* start_channel */); | 1030 AddPeer(name, channel, true /* start_channel */); |
1007 } | 1031 } |
1008 | 1032 |
| 1033 void NodeController::OnBroadcast(const ports::NodeName& from_node, |
| 1034 Channel::MessagePtr message) { |
| 1035 DCHECK(!message->has_handles()); |
| 1036 |
| 1037 void* data; |
| 1038 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; |
| 1039 if (!ParsePortsMessage(message.get(), &data, &num_data_bytes, |
| 1040 &num_header_bytes, &num_payload_bytes, |
| 1041 &num_ports_bytes)) { |
| 1042 DropPeer(from_node); |
| 1043 return; |
| 1044 } |
| 1045 |
| 1046 // Broadcast messages must not contain ports. |
| 1047 if (num_ports_bytes > 0) { |
| 1048 DropPeer(from_node); |
| 1049 return; |
| 1050 } |
| 1051 |
| 1052 base::AutoLock lock(peers_lock_); |
| 1053 for (auto& iter : peers_) { |
| 1054 // Copy and send the message to each known peer. |
| 1055 Channel::MessagePtr peer_message( |
| 1056 new Channel::Message(message->payload_size(), 0)); |
| 1057 memcpy(peer_message->mutable_payload(), message->payload(), |
| 1058 message->payload_size()); |
| 1059 iter.second->PortsMessage(std::move(peer_message)); |
| 1060 } |
| 1061 } |
| 1062 |
1009 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) | 1063 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
1010 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, | 1064 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
1011 base::ProcessHandle from_process, | 1065 base::ProcessHandle from_process, |
1012 const ports::NodeName& destination, | 1066 const ports::NodeName& destination, |
1013 Channel::MessagePtr message) { | 1067 Channel::MessagePtr message) { |
1014 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1068 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
1015 | 1069 |
1016 if (GetBrokerChannel()) { | 1070 if (GetBrokerChannel()) { |
1017 // Only the broker should be asked to relay a message. | 1071 // Only the broker should be asked to relay a message. |
1018 LOG(ERROR) << "Non-broker refusing to relay message."; | 1072 LOG(ERROR) << "Non-broker refusing to relay message."; |
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1136 shutdown_callback_flag_.Set(false); | 1190 shutdown_callback_flag_.Set(false); |
1137 } | 1191 } |
1138 | 1192 |
1139 DCHECK(!callback.is_null()); | 1193 DCHECK(!callback.is_null()); |
1140 | 1194 |
1141 callback.Run(); | 1195 callback.Run(); |
1142 } | 1196 } |
1143 | 1197 |
1144 } // namespace edk | 1198 } // namespace edk |
1145 } // namespace mojo | 1199 } // namespace mojo |
OLD | NEW |