Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 544 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 555 // (synchronously) in response to Node's ClosePort, SendMessage, or | 555 // (synchronously) in response to Node's ClosePort, SendMessage, or |
| 556 // AcceptMessage, we flush the queue after calling any of those methods. | 556 // AcceptMessage, we flush the queue after calling any of those methods. |
| 557 base::AutoLock lock(messages_lock_); | 557 base::AutoLock lock(messages_lock_); |
| 558 incoming_messages_.emplace(std::move(message)); | 558 incoming_messages_.emplace(std::move(message)); |
| 559 incoming_messages_flag_.Set(true); | 559 incoming_messages_flag_.Set(true); |
| 560 } else { | 560 } else { |
| 561 SendPeerMessage(node, std::move(message)); | 561 SendPeerMessage(node, std::move(message)); |
| 562 } | 562 } |
| 563 } | 563 } |
| 564 | 564 |
| 565 void NodeController::BroadcastMessage(ports::ScopedMessage message) { | |
| 566 Channel::MessagePtr channel_message = | |
| 567 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); | |
| 568 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); | |
| 569 if (broker) | |
| 570 broker->Broadcast(std::move(channel_message)); | |
| 571 else | |
| 572 OnBroadcast(std::move(channel_message)); | |
| 573 } | |
| 574 | |
| 565 void NodeController::PortStatusChanged(const ports::PortRef& port) { | 575 void NodeController::PortStatusChanged(const ports::PortRef& port) { |
| 566 scoped_refptr<ports::UserData> user_data; | 576 scoped_refptr<ports::UserData> user_data; |
| 567 node_->GetUserData(port, &user_data); | 577 node_->GetUserData(port, &user_data); |
| 568 | 578 |
| 569 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); | 579 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); |
| 570 if (observer) { | 580 if (observer) { |
| 571 observer->OnPortStatusChanged(); | 581 observer->OnPortStatusChanged(); |
| 572 } else { | 582 } else { |
| 573 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " | 583 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " |
| 574 << "doesn't have an observer."; | 584 << "doesn't have an observer."; |
| (...skipping 302 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 877 return; | 887 return; |
| 878 } | 888 } |
| 879 | 889 |
| 880 scoped_refptr<NodeChannel> channel = | 890 scoped_refptr<NodeChannel> channel = |
| 881 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_); | 891 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_); |
| 882 | 892 |
| 883 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; | 893 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; |
| 884 AddPeer(name, channel, true /* start_channel */); | 894 AddPeer(name, channel, true /* start_channel */); |
| 885 } | 895 } |
| 886 | 896 |
| 897 void NodeController::OnBroadcast(Channel::MessagePtr message) { | |
| 898 { | |
| 899 DCHECK(!message->has_handles()); | |
| 900 base::AutoLock lock(peers_lock_); | |
| 901 for (auto iter : peers_) { | |
|
Anand Mistry (off Chromium)
2016/05/16 04:03:57
auto&
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Done
| |
| 902 Channel::MessagePtr peer_message( | |
| 903 new Channel::Message(message->payload_size(), 0)); | |
| 904 memcpy(peer_message->mutable_payload(), message->payload(), | |
| 905 message->payload_size()); | |
| 906 iter.second->PortsMessage(std::move(peer_message)); | |
| 907 } | |
| 908 } | |
| 909 } | |
| 910 | |
| 887 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) | 911 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
| 888 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, | 912 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
| 889 base::ProcessHandle from_process, | 913 base::ProcessHandle from_process, |
| 890 const ports::NodeName& destination, | 914 const ports::NodeName& destination, |
| 891 Channel::MessagePtr message) { | 915 Channel::MessagePtr message) { |
| 892 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 916 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
| 893 | 917 |
| 894 if (GetBrokerChannel()) { | 918 if (GetBrokerChannel()) { |
| 895 // Only the broker should be asked to relay a message. | 919 // Only the broker should be asked to relay a message. |
| 896 LOG(ERROR) << "Non-broker refusing to relay message."; | 920 LOG(ERROR) << "Non-broker refusing to relay message."; |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 998 shutdown_callback_flag_.Set(false); | 1022 shutdown_callback_flag_.Set(false); |
| 999 } | 1023 } |
| 1000 | 1024 |
| 1001 DCHECK(!callback.is_null()); | 1025 DCHECK(!callback.is_null()); |
| 1002 | 1026 |
| 1003 callback.Run(); | 1027 callback.Run(); |
| 1004 } | 1028 } |
| 1005 | 1029 |
| 1006 } // namespace edk | 1030 } // namespace edk |
| 1007 } // namespace mojo | 1031 } // namespace mojo |
| OLD | NEW |