OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 544 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
555 // (synchronously) in response to Node's ClosePort, SendMessage, or | 555 // (synchronously) in response to Node's ClosePort, SendMessage, or |
556 // AcceptMessage, we flush the queue after calling any of those methods. | 556 // AcceptMessage, we flush the queue after calling any of those methods. |
557 base::AutoLock lock(messages_lock_); | 557 base::AutoLock lock(messages_lock_); |
558 incoming_messages_.emplace(std::move(message)); | 558 incoming_messages_.emplace(std::move(message)); |
559 incoming_messages_flag_.Set(true); | 559 incoming_messages_flag_.Set(true); |
560 } else { | 560 } else { |
561 SendPeerMessage(node, std::move(message)); | 561 SendPeerMessage(node, std::move(message)); |
562 } | 562 } |
563 } | 563 } |
564 | 564 |
565 void NodeController::BroadcastMessage(ports::ScopedMessage message) { | |
566 Channel::MessagePtr channel_message = | |
567 static_cast<PortsMessage*>(message.get())->TakeChannelMessage(); | |
568 scoped_refptr<NodeChannel> broker = GetBrokerChannel(); | |
569 if (broker) | |
570 broker->Broadcast(std::move(channel_message)); | |
571 else | |
572 OnBroadcast(std::move(channel_message)); | |
573 } | |
574 | |
565 void NodeController::PortStatusChanged(const ports::PortRef& port) { | 575 void NodeController::PortStatusChanged(const ports::PortRef& port) { |
566 scoped_refptr<ports::UserData> user_data; | 576 scoped_refptr<ports::UserData> user_data; |
567 node_->GetUserData(port, &user_data); | 577 node_->GetUserData(port, &user_data); |
568 | 578 |
569 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); | 579 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); |
570 if (observer) { | 580 if (observer) { |
571 observer->OnPortStatusChanged(); | 581 observer->OnPortStatusChanged(); |
572 } else { | 582 } else { |
573 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " | 583 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " |
574 << "doesn't have an observer."; | 584 << "doesn't have an observer."; |
(...skipping 302 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
877 return; | 887 return; |
878 } | 888 } |
879 | 889 |
880 scoped_refptr<NodeChannel> channel = | 890 scoped_refptr<NodeChannel> channel = |
881 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_); | 891 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_); |
882 | 892 |
883 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; | 893 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; |
884 AddPeer(name, channel, true /* start_channel */); | 894 AddPeer(name, channel, true /* start_channel */); |
885 } | 895 } |
886 | 896 |
897 void NodeController::OnBroadcast(Channel::MessagePtr message) { | |
898 { | |
899 DCHECK(!message->has_handles()); | |
900 base::AutoLock lock(peers_lock_); | |
901 for (auto iter : peers_) { | |
Anand Mistry (off Chromium)
2016/05/16 04:03:57
auto&
Ken Rockot(use gerrit already)
2016/05/16 04:42:36
Done
| |
902 Channel::MessagePtr peer_message( | |
903 new Channel::Message(message->payload_size(), 0)); | |
904 memcpy(peer_message->mutable_payload(), message->payload(), | |
905 message->payload_size()); | |
906 iter.second->PortsMessage(std::move(peer_message)); | |
907 } | |
908 } | |
909 } | |
910 | |
887 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) | 911 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
888 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, | 912 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
889 base::ProcessHandle from_process, | 913 base::ProcessHandle from_process, |
890 const ports::NodeName& destination, | 914 const ports::NodeName& destination, |
891 Channel::MessagePtr message) { | 915 Channel::MessagePtr message) { |
892 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 916 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); |
893 | 917 |
894 if (GetBrokerChannel()) { | 918 if (GetBrokerChannel()) { |
895 // Only the broker should be asked to relay a message. | 919 // Only the broker should be asked to relay a message. |
896 LOG(ERROR) << "Non-broker refusing to relay message."; | 920 LOG(ERROR) << "Non-broker refusing to relay message."; |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
998 shutdown_callback_flag_.Set(false); | 1022 shutdown_callback_flag_.Set(false); |
999 } | 1023 } |
1000 | 1024 |
1001 DCHECK(!callback.is_null()); | 1025 DCHECK(!callback.is_null()); |
1002 | 1026 |
1003 callback.Run(); | 1027 callback.Run(); |
1004 } | 1028 } |
1005 | 1029 |
1006 } // namespace edk | 1030 } // namespace edk |
1007 } // namespace mojo | 1031 } // namespace mojo |
OLD | NEW |