| Index: mojo/edk/system/node_controller.cc
|
| diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
|
| index 6f9a7a0e8d79dcbc0c8c9922d6cdacd076bb53f1..07108b849be825e3bf7935e8e0ba3ce8e7369c39 100644
|
| --- a/mojo/edk/system/node_controller.cc
|
| +++ b/mojo/edk/system/node_controller.cc
|
| @@ -73,6 +73,27 @@ void RecordPendingChildCount(size_t count) {
|
| 50 /* bucket count */);
|
| }
|
|
|
| +bool ParsePortsMessage(Channel::Message* message,
|
| + void** data,
|
| + size_t* num_data_bytes,
|
| + size_t* num_header_bytes,
|
| + size_t* num_payload_bytes,
|
| + size_t* num_ports_bytes) {
|
| + DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes &&
|
| + num_ports_bytes);
|
| +
|
| + NodeChannel::GetPortsMessageData(message, data, num_data_bytes);
|
| + if (!*num_data_bytes)
|
| + return false;
|
| +
|
| + if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes,
|
| + num_payload_bytes, num_ports_bytes)) {
|
| + return false;
|
| + }
|
| +
|
| + return true;
|
| +}
|
| +
|
| // Used by NodeController to watch for shutdown. Since no IO can happen once
|
| // the IO thread is killed, the NodeController can cleanly drop all its peers
|
| // at that time.
|
| @@ -672,6 +693,19 @@ void NodeController::ForwardMessage(const ports::NodeName& node,
|
| }
|
| }
|
|
|
| +void NodeController::BroadcastMessage(ports::ScopedMessage message) {
|
| + CHECK_EQ(message->num_ports(), 0u);
|
| + Channel::MessagePtr channel_message =
|
| + static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
|
| + CHECK(!channel_message->has_handles());
|
| +
|
| + scoped_refptr<NodeChannel> broker = GetBrokerChannel();
|
| + if (broker)
|
| + broker->Broadcast(std::move(channel_message));
|
| + else
|
| + OnBroadcast(name_, std::move(channel_message));
|
| +}
|
| +
|
| void NodeController::PortStatusChanged(const ports::PortRef& port) {
|
| scoped_refptr<ports::UserData> user_data;
|
| node_->GetUserData(port, &user_data);
|
| @@ -906,20 +940,10 @@ void NodeController::OnPortsMessage(const ports::NodeName& from_node,
|
| DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
|
|
| void* data;
|
| - size_t num_data_bytes;
|
| - NodeChannel::GetPortsMessageData(
|
| - channel_message.get(), &data, &num_data_bytes);
|
| - if (!num_data_bytes) {
|
| - DropPeer(from_node);
|
| - return;
|
| - }
|
| -
|
| - size_t num_header_bytes, num_payload_bytes, num_ports_bytes;
|
| - if (!ports::Message::Parse(data,
|
| - num_data_bytes,
|
| - &num_header_bytes,
|
| - &num_payload_bytes,
|
| - &num_ports_bytes)) {
|
| + size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
|
| + if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
|
| + &num_header_bytes, &num_payload_bytes,
|
| + &num_ports_bytes)) {
|
| DropPeer(from_node);
|
| return;
|
| }
|
| @@ -1006,6 +1030,36 @@ void NodeController::OnIntroduce(const ports::NodeName& from_node,
|
| AddPeer(name, channel, true /* start_channel */);
|
| }
|
|
|
| +void NodeController::OnBroadcast(const ports::NodeName& from_node,
|
| + Channel::MessagePtr message) {
|
| + DCHECK(!message->has_handles());
|
| +
|
| + void* data;
|
| + size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
|
| + if (!ParsePortsMessage(message.get(), &data, &num_data_bytes,
|
| + &num_header_bytes, &num_payload_bytes,
|
| + &num_ports_bytes)) {
|
| + DropPeer(from_node);
|
| + return;
|
| + }
|
| +
|
| + // Broadcast messages must not contain ports.
|
| + if (num_ports_bytes > 0) {
|
| + DropPeer(from_node);
|
| + return;
|
| + }
|
| +
|
| + base::AutoLock lock(peers_lock_);
|
| + for (auto& iter : peers_) {
|
| + // Copy and send the message to each known peer.
|
| + Channel::MessagePtr peer_message(
|
| + new Channel::Message(message->payload_size(), 0));
|
| + memcpy(peer_message->mutable_payload(), message->payload(),
|
| + message->payload_size());
|
| + iter.second->PortsMessage(std::move(peer_message));
|
| + }
|
| +}
|
| +
|
| #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
|
| void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
|
| base::ProcessHandle from_process,
|
|
|