| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 66 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); | 66 DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max())); |
| 67 | 67 |
| 68 // 8k is the maximum number of file descriptors allowed in Chrome. | 68 // 8k is the maximum number of file descriptors allowed in Chrome. |
| 69 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren", | 69 UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren", |
| 70 static_cast<int32_t>(count), | 70 static_cast<int32_t>(count), |
| 71 0 /* min */, | 71 0 /* min */, |
| 72 8000 /* max */, | 72 8000 /* max */, |
| 73 50 /* bucket count */); | 73 50 /* bucket count */); |
| 74 } | 74 } |
| 75 | 75 |
| 76 bool ParsePortsMessage(Channel::Message* message, |
| 77 void** data, |
| 78 size_t* num_data_bytes, |
| 79 size_t* num_header_bytes, |
| 80 size_t* num_payload_bytes, |
| 81 size_t* num_ports_bytes) { |
| 82 DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes && |
| 83 num_ports_bytes); |
| 84 |
| 85 NodeChannel::GetPortsMessageData(message, data, num_data_bytes); |
| 86 if (!*num_data_bytes) |
| 87 return false; |
| 88 |
| 89 if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes, |
| 90 num_payload_bytes, num_ports_bytes)) { |
| 91 return false; |
| 92 } |
| 93 |
| 94 return true; |
| 95 } |
| 96 |
| 76 // Used by NodeController to watch for shutdown. Since no IO can happen once | 97 // Used by NodeController to watch for shutdown. Since no IO can happen once |
| 77 // the IO thread is killed, the NodeController can cleanly drop all its peers | 98 // the IO thread is killed, the NodeController can cleanly drop all its peers |
| 78 // at that time. | 99 // at that time. |
| 79 class ThreadDestructionObserver : | 100 class ThreadDestructionObserver : |
| 80 public base::MessageLoop::DestructionObserver { | 101 public base::MessageLoop::DestructionObserver { |
| 81 public: | 102 public: |
| 82 static void Create(scoped_refptr<base::TaskRunner> task_runner, | 103 static void Create(scoped_refptr<base::TaskRunner> task_runner, |
| 83 const base::Closure& callback) { | 104 const base::Closure& callback) { |
| 84 if (task_runner->RunsTasksOnCurrentThread()) { | 105 if (task_runner->RunsTasksOnCurrentThread()) { |
| 85 // Owns itself. | 106 // Owns itself. |
| (...skipping 579 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 665 // (synchronously) in response to Node's ClosePort, SendMessage, or | 686 // (synchronously) in response to Node's ClosePort, SendMessage, or |
| 666 // AcceptMessage, we flush the queue after calling any of those methods. | 687 // AcceptMessage, we flush the queue after calling any of those methods. |
| 667 base::AutoLock lock(messages_lock_); | 688 base::AutoLock lock(messages_lock_); |
| 668 incoming_messages_.emplace(std::move(message)); | 689 incoming_messages_.emplace(std::move(message)); |
| 669 incoming_messages_flag_.Set(true); | 690 incoming_messages_flag_.Set(true); |
| 670 } else { | 691 } else { |
| 671 SendPeerMessage(node, std::move(message)); | 692 SendPeerMessage(node, std::move(message)); |
| 672 } | 693 } |
| 673 } | 694 } |
| 674 | 695 |
| 696 void NodeController::BroadcastMessage(ports::ScopedMessage message) { |
| 697 CHECK_EQ(message->num_ports(), 0u); |
| 698 Channel::MessagePtr channel_message = |
| 699 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); |
| 700 CHECK(!channel_message->has_handles()); |
| 701 |
| 702 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); |
| 703 if (broker) |
| 704 broker->Broadcast(std::move(channel_message)); |
| 705 else |
| 706 OnBroadcast(name_, std::move(channel_message)); |
| 707 } |
| 708 |
| 675 void NodeController::PortStatusChanged(const ports::PortRef& port) { | 709 void NodeController::PortStatusChanged(const ports::PortRef& port) { |
| 676 scoped_refptr<ports::UserData> user_data; | 710 scoped_refptr<ports::UserData> user_data; |
| 677 node_->GetUserData(port, &user_data); | 711 node_->GetUserData(port, &user_data); |
| 678 | 712 |
| 679 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); | 713 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); |
| 680 if (observer) { | 714 if (observer) { |
| 681 observer->OnPortStatusChanged(); | 715 observer->OnPortStatusChanged(); |
| 682 } else { | 716 } else { |
| 683 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " | 717 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " |
| 684 << "doesn't have an observer."; | 718 << "doesn't have an observer."; |
| (...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 899 #endif | 933 #endif |
| 900 | 934 |
| 901 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; | 935 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; |
| 902 } | 936 } |
| 903 | 937 |
| 904 void NodeController::OnPortsMessage(const ports::NodeName& from_node, | 938 void NodeController::OnPortsMessage(const ports::NodeName& from_node, |
| 905 Channel::MessagePtr channel_message) { | 939 Channel::MessagePtr channel_message) { |
| 906 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 940 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 907 | 941 |
| 908 void* data; | 942 void* data; |
| 909 size_t num_data_bytes; | 943 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; |
| 910 NodeChannel::GetPortsMessageData( | 944 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, |
| 911 channel_message.get(), &data, &num_data_bytes); | 945 &num_header_bytes, &num_payload_bytes, |
| 912 if (!num_data_bytes) { | 946 &num_ports_bytes)) { |
| 913 DropPeer(from_node); | 947 DropPeer(from_node); |
| 914 return; | 948 return; |
| 915 } | 949 } |
| 916 | |
| 917 size_t num_header_bytes, num_payload_bytes, num_ports_bytes; | |
| 918 if (!ports::Message::Parse(data, | |
| 919 num_data_bytes, | |
| 920 &num_header_bytes, | |
| 921 &num_payload_bytes, | |
| 922 &num_ports_bytes)) { | |
| 923 DropPeer(from_node); | |
| 924 return; | |
| 925 } | |
| 926 | 950 |
| 927 CHECK(channel_message); | 951 CHECK(channel_message); |
| 928 std::unique_ptr<PortsMessage> ports_message( | 952 std::unique_ptr<PortsMessage> ports_message( |
| 929 new PortsMessage(num_header_bytes, | 953 new PortsMessage(num_header_bytes, |
| 930 num_payload_bytes, | 954 num_payload_bytes, |
| 931 num_ports_bytes, | 955 num_ports_bytes, |
| 932 std::move(channel_message))); | 956 std::move(channel_message))); |
| 933 ports_message->set_source_node(from_node); | 957 ports_message->set_source_node(from_node); |
| 934 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); | 958 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); |
| 935 AcceptIncomingMessages(); | 959 AcceptIncomingMessages(); |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 999 } | 1023 } |
| 1000 | 1024 |
| 1001 scoped_refptr<NodeChannel> channel = | 1025 scoped_refptr<NodeChannel> channel = |
| 1002 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_, | 1026 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_, |
| 1003 ProcessErrorCallback()); | 1027 ProcessErrorCallback()); |
| 1004 | 1028 |
| 1005 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; | 1029 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; |
| 1006 AddPeer(name, channel, true /* start_channel */); | 1030 AddPeer(name, channel, true /* start_channel */); |
| 1007 } | 1031 } |
| 1008 | 1032 |
| 1033 void NodeController::OnBroadcast(const ports::NodeName& from_node, |
| 1034 Channel::MessagePtr message) { |
| 1035 DCHECK(!message->has_handles()); |
| 1036 |
| 1037 void* data; |
| 1038 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; |
| 1039 if (!ParsePortsMessage(message.get(), &data, &num_data_bytes, |
| 1040 &num_header_bytes, &num_payload_bytes, |
| 1041 &num_ports_bytes)) { |
| 1042 DropPeer(from_node); |
| 1043 return; |
| 1044 } |
| 1045 |
| 1046 // Broadcast messages must not contain ports. |
| 1047 if (num_ports_bytes > 0) { |
| 1048 DropPeer(from_node); |
| 1049 return; |
| 1050 } |
| 1051 |
| 1052 base::AutoLock lock(peers_lock_); |
| 1053 for (auto& iter : peers_) { |
| 1054 // Copy and send the message to each known peer. |
| 1055 Channel::MessagePtr peer_message( |
| 1056 new Channel::Message(message->payload_size(), 0)); |
| 1057 memcpy(peer_message->mutable_payload(), message->payload(), |
| 1058 message->payload_size()); |
| 1059 iter.second->PortsMessage(std::move(peer_message)); |
| 1060 } |
| 1061 } |
| 1062 |
| 1009 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) | 1063 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
| 1010 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, | 1064 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
| 1011 base::ProcessHandle from_process, | 1065 base::ProcessHandle from_process, |
| 1012 const ports::NodeName& destination, | 1066 const ports::NodeName& destination, |
| 1013 Channel::MessagePtr message) { | 1067 Channel::MessagePtr message) { |
| 1014 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1068 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 1015 | 1069 |
| 1016 if (GetBrokerChannel()) { | 1070 if (GetBrokerChannel()) { |
| 1017 // Only the broker should be asked to relay a message. | 1071 // Only the broker should be asked to relay a message. |
| 1018 LOG(ERROR) << "Non-broker refusing to relay message."; | 1072 LOG(ERROR) << "Non-broker refusing to relay message."; |
| (...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1136 shutdown_callback_flag_.Set(false); | 1190 shutdown_callback_flag_.Set(false); |
| 1137 } | 1191 } |
| 1138 | 1192 |
| 1139 DCHECK(!callback.is_null()); | 1193 DCHECK(!callback.is_null()); |
| 1140 | 1194 |
| 1141 callback.Run(); | 1195 callback.Run(); |
| 1142 } | 1196 } |
| 1143 | 1197 |
| 1144 } // namespace edk | 1198 } // namespace edk |
| 1145 } // namespace mojo | 1199 } // namespace mojo |
| OLD | NEW |