Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(229)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 1975073002: [mojo-edk] Broadcast surprise port disruptions (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@reenable-clean-shutdown
Patch Set: rebase Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 544 matching lines...) Expand 10 before | Expand all | Expand 10 after
555 // (synchronously) in response to Node's ClosePort, SendMessage, or 555 // (synchronously) in response to Node's ClosePort, SendMessage, or
556 // AcceptMessage, we flush the queue after calling any of those methods. 556 // AcceptMessage, we flush the queue after calling any of those methods.
557 base::AutoLock lock(messages_lock_); 557 base::AutoLock lock(messages_lock_);
558 incoming_messages_.emplace(std::move(message)); 558 incoming_messages_.emplace(std::move(message));
559 incoming_messages_flag_.Set(true); 559 incoming_messages_flag_.Set(true);
560 } else { 560 } else {
561 SendPeerMessage(node, std::move(message)); 561 SendPeerMessage(node, std::move(message));
562 } 562 }
563 } 563 }
564 564
565 void NodeController::BroadcastMessage(ports::ScopedMessage message) {
566 Channel::MessagePtr channel_message =
567 static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
568 scoped_refptr<NodeChannel> broker = GetBrokerChannel();
569 if (broker)
570 broker->Broadcast(std::move(channel_message));
571 else
572 OnBroadcast(std::move(channel_message));
573 }
574
565 void NodeController::PortStatusChanged(const ports::PortRef& port) { 575 void NodeController::PortStatusChanged(const ports::PortRef& port) {
566 scoped_refptr<ports::UserData> user_data; 576 scoped_refptr<ports::UserData> user_data;
567 node_->GetUserData(port, &user_data); 577 node_->GetUserData(port, &user_data);
568 578
569 PortObserver* observer = static_cast<PortObserver*>(user_data.get()); 579 PortObserver* observer = static_cast<PortObserver*>(user_data.get());
570 if (observer) { 580 if (observer) {
571 observer->OnPortStatusChanged(); 581 observer->OnPortStatusChanged();
572 } else { 582 } else {
573 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " 583 DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
574 << "doesn't have an observer."; 584 << "doesn't have an observer.";
(...skipping 302 matching lines...) Expand 10 before | Expand all | Expand 10 after
877 return; 887 return;
878 } 888 }
879 889
880 scoped_refptr<NodeChannel> channel = 890 scoped_refptr<NodeChannel> channel =
881 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_); 891 NodeChannel::Create(this, std::move(channel_handle), io_task_runner_);
882 892
883 DVLOG(1) << "Adding new peer " << name << " via parent introduction."; 893 DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
884 AddPeer(name, channel, true /* start_channel */); 894 AddPeer(name, channel, true /* start_channel */);
885 } 895 }
886 896
897 void NodeController::OnBroadcast(Channel::MessagePtr message) {
898 {
899 DCHECK(!message->has_handles());
900 base::AutoLock lock(peers_lock_);
901 for (auto iter : peers_) {
Anand Mistry (off Chromium) 2016/05/16 04:03:57 auto&
Ken Rockot(use gerrit already) 2016/05/16 04:42:36 Done
902 Channel::MessagePtr peer_message(
903 new Channel::Message(message->payload_size(), 0));
904 memcpy(peer_message->mutable_payload(), message->payload(),
905 message->payload_size());
906 iter.second->PortsMessage(std::move(peer_message));
907 }
908 }
909 }
910
887 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 911 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
888 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, 912 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
889 base::ProcessHandle from_process, 913 base::ProcessHandle from_process,
890 const ports::NodeName& destination, 914 const ports::NodeName& destination,
891 Channel::MessagePtr message) { 915 Channel::MessagePtr message) {
892 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 916 DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
893 917
894 if (GetBrokerChannel()) { 918 if (GetBrokerChannel()) {
895 // Only the broker should be asked to relay a message. 919 // Only the broker should be asked to relay a message.
896 LOG(ERROR) << "Non-broker refusing to relay message."; 920 LOG(ERROR) << "Non-broker refusing to relay message.";
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
998 shutdown_callback_flag_.Set(false); 1022 shutdown_callback_flag_.Set(false);
999 } 1023 }
1000 1024
1001 DCHECK(!callback.is_null()); 1025 DCHECK(!callback.is_null());
1002 1026
1003 callback.Run(); 1027 callback.Run();
1004 } 1028 }
1005 1029
1006 } // namespace edk 1030 } // namespace edk
1007 } // namespace mojo 1031 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698