Index: mojo/edk/system/node_controller.cc |
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc |
index 6f9a7a0e8d79dcbc0c8c9922d6cdacd076bb53f1..07108b849be825e3bf7935e8e0ba3ce8e7369c39 100644 |
--- a/mojo/edk/system/node_controller.cc |
+++ b/mojo/edk/system/node_controller.cc |
@@ -73,6 +73,27 @@ void RecordPendingChildCount(size_t count) { |
50 /* bucket count */); |
} |
+bool ParsePortsMessage(Channel::Message* message, |
+ void** data, |
+ size_t* num_data_bytes, |
+ size_t* num_header_bytes, |
+ size_t* num_payload_bytes, |
+ size_t* num_ports_bytes) { |
+ DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes && |
+ num_ports_bytes); |
+ |
+ NodeChannel::GetPortsMessageData(message, data, num_data_bytes); |
+ if (!*num_data_bytes) |
+ return false; |
+ |
+ if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes, |
+ num_payload_bytes, num_ports_bytes)) { |
+ return false; |
+ } |
+ |
+ return true; |
+} |
+ |
// Used by NodeController to watch for shutdown. Since no IO can happen once |
// the IO thread is killed, the NodeController can cleanly drop all its peers |
// at that time. |
@@ -672,6 +693,19 @@ void NodeController::ForwardMessage(const ports::NodeName& node, |
} |
} |
+void NodeController::BroadcastMessage(ports::ScopedMessage message) { |
+ CHECK_EQ(message->num_ports(), 0u); |
+ Channel::MessagePtr channel_message = |
+ static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); |
+ CHECK(!channel_message->has_handles()); |
+ |
+ scoped_refptr<NodeChannel> broker = GetBrokerChannel(); |
+ if (broker) |
+ broker->Broadcast(std::move(channel_message)); |
+ else |
+ OnBroadcast(name_, std::move(channel_message)); |
+} |
+ |
void NodeController::PortStatusChanged(const ports::PortRef& port) { |
scoped_refptr<ports::UserData> user_data; |
node_->GetUserData(port, &user_data); |
@@ -906,20 +940,10 @@ void NodeController::OnPortsMessage(const ports::NodeName& from_node, |
DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
void* data; |
- size_t num_data_bytes; |
- NodeChannel::GetPortsMessageData( |
- channel_message.get(), &data, &num_data_bytes); |
- if (!num_data_bytes) { |
- DropPeer(from_node); |
- return; |
- } |
- |
- size_t num_header_bytes, num_payload_bytes, num_ports_bytes; |
- if (!ports::Message::Parse(data, |
- num_data_bytes, |
- &num_header_bytes, |
- &num_payload_bytes, |
- &num_ports_bytes)) { |
+ size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; |
+ if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, |
+ &num_header_bytes, &num_payload_bytes, |
+ &num_ports_bytes)) { |
DropPeer(from_node); |
return; |
} |
@@ -1006,6 +1030,36 @@ void NodeController::OnIntroduce(const ports::NodeName& from_node, |
AddPeer(name, channel, true /* start_channel */); |
} |
+void NodeController::OnBroadcast(const ports::NodeName& from_node, |
+ Channel::MessagePtr message) { |
+ DCHECK(!message->has_handles()); |
+ |
+ void* data; |
+ size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; |
+ if (!ParsePortsMessage(message.get(), &data, &num_data_bytes, |
+ &num_header_bytes, &num_payload_bytes, |
+ &num_ports_bytes)) { |
+ DropPeer(from_node); |
+ return; |
+ } |
+ |
+ // Broadcast messages must not contain ports. |
+ if (num_ports_bytes > 0) { |
+ DropPeer(from_node); |
+ return; |
+ } |
+ |
+ base::AutoLock lock(peers_lock_); |
+ for (auto& iter : peers_) { |
+ // Copy and send the message to each known peer. |
+ Channel::MessagePtr peer_message( |
+ new Channel::Message(message->payload_size(), 0)); |
+ memcpy(peer_message->mutable_payload(), message->payload(), |
+ message->payload_size()); |
+ iter.second->PortsMessage(std::move(peer_message)); |
+ } |
+} |
+ |
#if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
base::ProcessHandle from_process, |