Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(383)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2888053002: Rename TaskRunner::RunsTasksOnCurrentThread() in //extensions, //headless, //mojo (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 } 99 }
100 100
101 // Used by NodeController to watch for shutdown. Since no IO can happen once 101 // Used by NodeController to watch for shutdown. Since no IO can happen once
102 // the IO thread is killed, the NodeController can cleanly drop all its peers 102 // the IO thread is killed, the NodeController can cleanly drop all its peers
103 // at that time. 103 // at that time.
104 class ThreadDestructionObserver : 104 class ThreadDestructionObserver :
105 public base::MessageLoop::DestructionObserver { 105 public base::MessageLoop::DestructionObserver {
106 public: 106 public:
107 static void Create(scoped_refptr<base::TaskRunner> task_runner, 107 static void Create(scoped_refptr<base::TaskRunner> task_runner,
108 const base::Closure& callback) { 108 const base::Closure& callback) {
109 if (task_runner->RunsTasksOnCurrentThread()) { 109 if (task_runner->RunsTasksInCurrentSequence()) {
110 // Owns itself. 110 // Owns itself.
111 new ThreadDestructionObserver(callback); 111 new ThreadDestructionObserver(callback);
112 } else { 112 } else {
113 task_runner->PostTask(FROM_HERE, 113 task_runner->PostTask(FROM_HERE,
114 base::Bind(&Create, task_runner, callback)); 114 base::Bind(&Create, task_runner, callback));
115 } 115 }
116 } 116 }
117 117
118 private: 118 private:
119 explicit ThreadDestructionObserver(const base::Closure& callback) 119 explicit ThreadDestructionObserver(const base::Closure& callback)
(...skipping 215 matching lines...) Expand 10 before | Expand all | Expand 10 after
335 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); 335 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
336 if (peer) 336 if (peer)
337 peer->NotifyBadMessage(error); 337 peer->NotifyBadMessage(error);
338 } 338 }
339 339
340 void NodeController::SendBrokerClientInvitationOnIOThread( 340 void NodeController::SendBrokerClientInvitationOnIOThread(
341 base::ProcessHandle target_process, 341 base::ProcessHandle target_process,
342 ConnectionParams connection_params, 342 ConnectionParams connection_params,
343 ports::NodeName temporary_node_name, 343 ports::NodeName temporary_node_name,
344 const ProcessErrorCallback& process_error_callback) { 344 const ProcessErrorCallback& process_error_callback) {
345 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 345 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
346 346
347 #if !defined(OS_MACOSX) && !defined(OS_NACL) 347 #if !defined(OS_MACOSX) && !defined(OS_NACL)
348 PlatformChannelPair node_channel; 348 PlatformChannelPair node_channel;
349 ScopedPlatformHandle server_handle = node_channel.PassServerHandle(); 349 ScopedPlatformHandle server_handle = node_channel.PassServerHandle();
350 // BrokerHost owns itself. 350 // BrokerHost owns itself.
351 BrokerHost* broker_host = 351 BrokerHost* broker_host =
352 new BrokerHost(target_process, connection_params.TakeChannelHandle()); 352 new BrokerHost(target_process, connection_params.TakeChannelHandle());
353 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle()); 353 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle());
354 354
355 #if defined(OS_WIN) 355 #if defined(OS_WIN)
(...skipping 29 matching lines...) Expand all
385 385
386 channel->SetRemoteNodeName(temporary_node_name); 386 channel->SetRemoteNodeName(temporary_node_name);
387 channel->SetRemoteProcessHandle(target_process); 387 channel->SetRemoteProcessHandle(target_process);
388 channel->Start(); 388 channel->Start();
389 389
390 channel->AcceptChild(name_, temporary_node_name); 390 channel->AcceptChild(name_, temporary_node_name);
391 } 391 }
392 392
393 void NodeController::AcceptBrokerClientInvitationOnIOThread( 393 void NodeController::AcceptBrokerClientInvitationOnIOThread(
394 ConnectionParams connection_params) { 394 ConnectionParams connection_params) {
395 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 395 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
396 396
397 { 397 {
398 base::AutoLock lock(parent_lock_); 398 base::AutoLock lock(parent_lock_);
399 DCHECK(parent_name_ == ports::kInvalidNodeName); 399 DCHECK(parent_name_ == ports::kInvalidNodeName);
400 400
401 // At this point we don't know the parent's name, so we can't yet insert it 401 // At this point we don't know the parent's name, so we can't yet insert it
402 // into our |peers_| map. That will happen as soon as we receive an 402 // into our |peers_| map. That will happen as soon as we receive an
403 // AcceptChild message from them. 403 // AcceptChild message from them.
404 bootstrap_parent_channel_ = 404 bootstrap_parent_channel_ =
405 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, 405 NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
406 ProcessErrorCallback()); 406 ProcessErrorCallback());
407 // Prevent the parent pipe handle from being closed on shutdown. Pipe 407 // Prevent the parent pipe handle from being closed on shutdown. Pipe
408 // closure is used by the parent to detect the child process has exited. 408 // closure is used by the parent to detect the child process has exited.
409 // Relying on message pipes to be closed is not enough because the parent 409 // Relying on message pipes to be closed is not enough because the parent
410 // may see the message pipe closure before the child is dead, causing the 410 // may see the message pipe closure before the child is dead, causing the
411 // child process to be unexpectedly SIGKILL'd. 411 // child process to be unexpectedly SIGKILL'd.
412 bootstrap_parent_channel_->LeakHandleOnShutdown(); 412 bootstrap_parent_channel_->LeakHandleOnShutdown();
413 } 413 }
414 bootstrap_parent_channel_->Start(); 414 bootstrap_parent_channel_->Start();
415 } 415 }
416 416
417 void NodeController::ConnectToPeerOnIOThread(ConnectionParams connection_params, 417 void NodeController::ConnectToPeerOnIOThread(ConnectionParams connection_params,
418 ports::NodeName token, 418 ports::NodeName token,
419 ports::PortRef port, 419 ports::PortRef port,
420 const std::string& peer_token) { 420 const std::string& peer_token) {
421 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 421 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
422 422
423 scoped_refptr<NodeChannel> channel = NodeChannel::Create( 423 scoped_refptr<NodeChannel> channel = NodeChannel::Create(
424 this, std::move(connection_params), io_task_runner_, {}); 424 this, std::move(connection_params), io_task_runner_, {});
425 peer_connections_.insert( 425 peer_connections_.insert(
426 {token, PeerConnection{channel, port, peer_token}}); 426 {token, PeerConnection{channel, port, peer_token}});
427 peers_by_token_.insert({peer_token, token}); 427 peers_by_token_.insert({peer_token, token});
428 428
429 channel->SetRemoteNodeName(token); 429 channel->SetRemoteNodeName(token);
430 channel->Start(); 430 channel->Start();
431 431
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
471 { 471 {
472 base::AutoLock lock(broker_lock_); 472 base::AutoLock lock(broker_lock_);
473 broker_name = broker_name_; 473 broker_name = broker_name_;
474 } 474 }
475 return GetPeerChannel(broker_name); 475 return GetPeerChannel(broker_name);
476 } 476 }
477 477
478 void NodeController::AddPeer(const ports::NodeName& name, 478 void NodeController::AddPeer(const ports::NodeName& name,
479 scoped_refptr<NodeChannel> channel, 479 scoped_refptr<NodeChannel> channel,
480 bool start_channel) { 480 bool start_channel) {
481 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 481 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
482 482
483 DCHECK(name != ports::kInvalidNodeName); 483 DCHECK(name != ports::kInvalidNodeName);
484 DCHECK(channel); 484 DCHECK(channel);
485 485
486 channel->SetRemoteNodeName(name); 486 channel->SetRemoteNodeName(name);
487 487
488 OutgoingMessageQueue pending_messages; 488 OutgoingMessageQueue pending_messages;
489 { 489 {
490 base::AutoLock lock(peers_lock_); 490 base::AutoLock lock(peers_lock_);
491 if (peers_.find(name) != peers_.end()) { 491 if (peers_.find(name) != peers_.end()) {
(...skipping 23 matching lines...) Expand all
515 515
516 // Flush any queued message we need to deliver to this node. 516 // Flush any queued message we need to deliver to this node.
517 while (!pending_messages.empty()) { 517 while (!pending_messages.empty()) {
518 channel->PortsMessage(std::move(pending_messages.front())); 518 channel->PortsMessage(std::move(pending_messages.front()));
519 pending_messages.pop(); 519 pending_messages.pop();
520 } 520 }
521 } 521 }
522 522
523 void NodeController::DropPeer(const ports::NodeName& name, 523 void NodeController::DropPeer(const ports::NodeName& name,
524 NodeChannel* channel) { 524 NodeChannel* channel) {
525 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 525 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
526 526
527 { 527 {
528 base::AutoLock lock(peers_lock_); 528 base::AutoLock lock(peers_lock_);
529 auto it = peers_.find(name); 529 auto it = peers_.find(name);
530 530
531 if (it != peers_.end()) { 531 if (it != peers_.end()) {
532 ports::NodeName peer = it->first; 532 ports::NodeName peer = it->first;
533 peers_.erase(it); 533 peers_.erase(it);
534 DVLOG(1) << "Dropped peer " << peer; 534 DVLOG(1) << "Dropped peer " << peer;
535 } 535 }
(...skipping 176 matching lines...) Expand 10 before | Expand all | Expand 10 after
712 // done after AcceptIncomingMessages() otherwise a message might be missed. 712 // done after AcceptIncomingMessages() otherwise a message might be missed.
713 // Doing it here may result in at most two tasks existing at the same time; 713 // Doing it here may result in at most two tasks existing at the same time;
714 // this running one, and one pending in the task runner. 714 // this running one, and one pending in the task runner.
715 incoming_messages_task_posted_ = false; 715 incoming_messages_task_posted_ = false;
716 } 716 }
717 717
718 AcceptIncomingMessages(); 718 AcceptIncomingMessages();
719 } 719 }
720 720
721 void NodeController::DropAllPeers() { 721 void NodeController::DropAllPeers() {
722 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 722 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
723 723
724 std::vector<scoped_refptr<NodeChannel>> all_peers; 724 std::vector<scoped_refptr<NodeChannel>> all_peers;
725 { 725 {
726 base::AutoLock lock(parent_lock_); 726 base::AutoLock lock(parent_lock_);
727 if (bootstrap_parent_channel_) { 727 if (bootstrap_parent_channel_) {
728 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its 728 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its
729 // existence to determine whether or not this is the root node. Once 729 // existence to determine whether or not this is the root node. Once
730 // bootstrap_parent_channel_->ShutDown() has been called, 730 // bootstrap_parent_channel_->ShutDown() has been called,
731 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't 731 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't
732 // matter if it's deleted now or when |this| is deleted. 732 // matter if it's deleted now or when |this| is deleted.
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
825 observer->OnPortStatusChanged(); 825 observer->OnPortStatusChanged();
826 } else { 826 } else {
827 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " 827 DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
828 << "doesn't have an observer."; 828 << "doesn't have an observer.";
829 } 829 }
830 } 830 }
831 831
832 void NodeController::OnAcceptChild(const ports::NodeName& from_node, 832 void NodeController::OnAcceptChild(const ports::NodeName& from_node,
833 const ports::NodeName& parent_name, 833 const ports::NodeName& parent_name,
834 const ports::NodeName& token) { 834 const ports::NodeName& token) {
835 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 835 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
836 836
837 scoped_refptr<NodeChannel> parent; 837 scoped_refptr<NodeChannel> parent;
838 { 838 {
839 base::AutoLock lock(parent_lock_); 839 base::AutoLock lock(parent_lock_);
840 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { 840 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) {
841 parent_name_ = parent_name; 841 parent_name_ = parent_name;
842 parent = bootstrap_parent_channel_; 842 parent = bootstrap_parent_channel_;
843 } 843 }
844 } 844 }
845 845
846 if (!parent) { 846 if (!parent) {
847 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; 847 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
848 DropPeer(from_node, nullptr); 848 DropPeer(from_node, nullptr);
849 return; 849 return;
850 } 850 }
851 851
852 parent->SetRemoteNodeName(parent_name); 852 parent->SetRemoteNodeName(parent_name);
853 parent->AcceptParent(token, name_); 853 parent->AcceptParent(token, name_);
854 854
855 // NOTE: The child does not actually add its parent as a peer until 855 // NOTE: The child does not actually add its parent as a peer until
856 // receiving an AcceptBrokerClient message from the broker. The parent 856 // receiving an AcceptBrokerClient message from the broker. The parent
857 // will request that said message be sent upon receiving AcceptParent. 857 // will request that said message be sent upon receiving AcceptParent.
858 858
859 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; 859 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
860 } 860 }
861 861
862 void NodeController::OnAcceptParent(const ports::NodeName& from_node, 862 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
863 const ports::NodeName& token, 863 const ports::NodeName& token,
864 const ports::NodeName& child_name) { 864 const ports::NodeName& child_name) {
865 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 865 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
866 866
867 auto it = pending_invitations_.find(from_node); 867 auto it = pending_invitations_.find(from_node);
868 if (it == pending_invitations_.end() || token != from_node) { 868 if (it == pending_invitations_.end() || token != from_node) {
869 DLOG(ERROR) << "Received unexpected AcceptParent message from " 869 DLOG(ERROR) << "Received unexpected AcceptParent message from "
870 << from_node; 870 << from_node;
871 DropPeer(from_node, nullptr); 871 DropPeer(from_node, nullptr);
872 return; 872 return;
873 } 873 }
874 874
875 { 875 {
(...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after
1060 message_queue.pop(); 1060 message_queue.pop();
1061 } 1061 }
1062 } 1062 }
1063 #endif 1063 #endif
1064 1064
1065 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; 1065 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name;
1066 } 1066 }
1067 1067
1068 void NodeController::OnPortsMessage(const ports::NodeName& from_node, 1068 void NodeController::OnPortsMessage(const ports::NodeName& from_node,
1069 Channel::MessagePtr channel_message) { 1069 Channel::MessagePtr channel_message) {
1070 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1070 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1071 1071
1072 void* data; 1072 void* data;
1073 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; 1073 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1074 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, 1074 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
1075 &num_header_bytes, &num_payload_bytes, 1075 &num_header_bytes, &num_payload_bytes,
1076 &num_ports_bytes)) { 1076 &num_ports_bytes)) {
1077 DropPeer(from_node, nullptr); 1077 DropPeer(from_node, nullptr);
1078 return; 1078 return;
1079 } 1079 }
1080 1080
1081 CHECK(channel_message); 1081 CHECK(channel_message);
1082 std::unique_ptr<PortsMessage> ports_message( 1082 std::unique_ptr<PortsMessage> ports_message(
1083 new PortsMessage(num_header_bytes, 1083 new PortsMessage(num_header_bytes,
1084 num_payload_bytes, 1084 num_payload_bytes,
1085 num_ports_bytes, 1085 num_ports_bytes,
1086 std::move(channel_message))); 1086 std::move(channel_message)));
1087 ports_message->set_source_node(from_node); 1087 ports_message->set_source_node(from_node);
1088 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); 1088 node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
1089 AcceptIncomingMessages(); 1089 AcceptIncomingMessages();
1090 } 1090 }
1091 1091
1092 void NodeController::OnRequestPortMerge( 1092 void NodeController::OnRequestPortMerge(
1093 const ports::NodeName& from_node, 1093 const ports::NodeName& from_node,
1094 const ports::PortName& connector_port_name, 1094 const ports::PortName& connector_port_name,
1095 const std::string& name) { 1095 const std::string& name) {
1096 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1096 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1097 1097
1098 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name 1098 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name
1099 << " and port " << connector_port_name << "@" << from_node; 1099 << " and port " << connector_port_name << "@" << from_node;
1100 1100
1101 ports::PortRef local_port; 1101 ports::PortRef local_port;
1102 { 1102 {
1103 base::AutoLock lock(reserved_ports_lock_); 1103 base::AutoLock lock(reserved_ports_lock_);
1104 auto it = reserved_ports_.find(from_node); 1104 auto it = reserved_ports_.find(from_node);
1105 if (it == reserved_ports_.end()) { 1105 if (it == reserved_ports_.end()) {
1106 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". " 1106 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". "
(...skipping 16 matching lines...) Expand all
1123 1123
1124 int rv = node_->MergePorts(local_port, from_node, connector_port_name); 1124 int rv = node_->MergePorts(local_port, from_node, connector_port_name);
1125 if (rv != ports::OK) 1125 if (rv != ports::OK)
1126 DLOG(ERROR) << "MergePorts failed: " << rv; 1126 DLOG(ERROR) << "MergePorts failed: " << rv;
1127 1127
1128 AcceptIncomingMessages(); 1128 AcceptIncomingMessages();
1129 } 1129 }
1130 1130
1131 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 1131 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
1132 const ports::NodeName& name) { 1132 const ports::NodeName& name) {
1133 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1133 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1134 1134
1135 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); 1135 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
1136 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { 1136 if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
1137 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " 1137 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
1138 << from_node; 1138 << from_node;
1139 DropPeer(from_node, nullptr); 1139 DropPeer(from_node, nullptr);
1140 return; 1140 return;
1141 } 1141 }
1142 1142
1143 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); 1143 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
1144 if (!new_friend) { 1144 if (!new_friend) {
1145 // We don't know who they're talking about! 1145 // We don't know who they're talking about!
1146 requestor->Introduce(name, ScopedPlatformHandle()); 1146 requestor->Introduce(name, ScopedPlatformHandle());
1147 } else { 1147 } else {
1148 PlatformChannelPair new_channel; 1148 PlatformChannelPair new_channel;
1149 requestor->Introduce(name, new_channel.PassServerHandle()); 1149 requestor->Introduce(name, new_channel.PassServerHandle());
1150 new_friend->Introduce(from_node, new_channel.PassClientHandle()); 1150 new_friend->Introduce(from_node, new_channel.PassClientHandle());
1151 } 1151 }
1152 } 1152 }
1153 1153
1154 void NodeController::OnIntroduce(const ports::NodeName& from_node, 1154 void NodeController::OnIntroduce(const ports::NodeName& from_node,
1155 const ports::NodeName& name, 1155 const ports::NodeName& name,
1156 ScopedPlatformHandle channel_handle) { 1156 ScopedPlatformHandle channel_handle) {
1157 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1157 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1158 1158
1159 if (!channel_handle.is_valid()) { 1159 if (!channel_handle.is_valid()) {
1160 node_->LostConnectionToNode(name); 1160 node_->LostConnectionToNode(name);
1161 1161
1162 DVLOG(1) << "Could not be introduced to peer " << name; 1162 DVLOG(1) << "Could not be introduced to peer " << name;
1163 base::AutoLock lock(peers_lock_); 1163 base::AutoLock lock(peers_lock_);
1164 pending_peer_messages_.erase(name); 1164 pending_peer_messages_.erase(name);
1165 return; 1165 return;
1166 } 1166 }
1167 1167
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
1202 message->payload_size()); 1202 message->payload_size());
1203 iter.second->PortsMessage(std::move(peer_message)); 1203 iter.second->PortsMessage(std::move(peer_message));
1204 } 1204 }
1205 } 1205 }
1206 1206
1207 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1207 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
1208 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, 1208 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
1209 base::ProcessHandle from_process, 1209 base::ProcessHandle from_process,
1210 const ports::NodeName& destination, 1210 const ports::NodeName& destination,
1211 Channel::MessagePtr message) { 1211 Channel::MessagePtr message) {
1212 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1212 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1213 1213
1214 if (GetBrokerChannel()) { 1214 if (GetBrokerChannel()) {
1215 // Only the broker should be asked to relay a message. 1215 // Only the broker should be asked to relay a message.
1216 LOG(ERROR) << "Non-broker refusing to relay message."; 1216 LOG(ERROR) << "Non-broker refusing to relay message.";
1217 DropPeer(from_node, nullptr); 1217 DropPeer(from_node, nullptr);
1218 return; 1218 return;
1219 } 1219 }
1220 1220
1221 // The parent should always know which process this came from. 1221 // The parent should always know which process this came from.
1222 DCHECK(from_process != base::kNullProcessHandle); 1222 DCHECK(from_process != base::kNullProcessHandle);
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
1280 } 1280 }
1281 1281
1282 OnPortsMessage(source_node, std::move(message)); 1282 OnPortsMessage(source_node, std::move(message));
1283 } 1283 }
1284 #endif 1284 #endif
1285 1285
1286 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, 1286 void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
1287 const ports::NodeName& token, 1287 const ports::NodeName& token,
1288 const ports::NodeName& peer_name, 1288 const ports::NodeName& peer_name,
1289 const ports::PortName& port_name) { 1289 const ports::PortName& port_name) {
1290 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1290 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1291 1291
1292 auto it = peer_connections_.find(from_node); 1292 auto it = peer_connections_.find(from_node);
1293 if (it == peer_connections_.end()) { 1293 if (it == peer_connections_.end()) {
1294 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; 1294 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
1295 DropPeer(from_node, nullptr); 1295 DropPeer(from_node, nullptr);
1296 return; 1296 return;
1297 } 1297 }
1298 1298
1299 scoped_refptr<NodeChannel> channel = std::move(it->second.channel); 1299 scoped_refptr<NodeChannel> channel = std::move(it->second.channel);
1300 ports::PortRef local_port = it->second.local_port; 1300 ports::PortRef local_port = it->second.local_port;
(...skipping 17 matching lines...) Expand all
1318 // We need to choose one side to initiate the port merge. It doesn't matter 1318 // We need to choose one side to initiate the port merge. It doesn't matter
1319 // who does it as long as they don't both try. Simple solution: pick the one 1319 // who does it as long as they don't both try. Simple solution: pick the one
1320 // with the "smaller" port name. 1320 // with the "smaller" port name.
1321 if (local_port.name() < port_name) { 1321 if (local_port.name() < port_name) {
1322 node()->MergePorts(local_port, peer_name, port_name); 1322 node()->MergePorts(local_port, peer_name, port_name);
1323 } 1323 }
1324 } 1324 }
1325 1325
1326 void NodeController::OnChannelError(const ports::NodeName& from_node, 1326 void NodeController::OnChannelError(const ports::NodeName& from_node,
1327 NodeChannel* channel) { 1327 NodeChannel* channel) {
1328 if (io_task_runner_->RunsTasksOnCurrentThread()) { 1328 if (io_task_runner_->RunsTasksInCurrentSequence()) {
1329 DropPeer(from_node, channel); 1329 DropPeer(from_node, channel);
1330 // DropPeer may have caused local port closures, so be sure to process any 1330 // DropPeer may have caused local port closures, so be sure to process any
1331 // pending local messages. 1331 // pending local messages.
1332 AcceptIncomingMessages(); 1332 AcceptIncomingMessages();
1333 } else { 1333 } else {
1334 io_task_runner_->PostTask( 1334 io_task_runner_->PostTask(
1335 FROM_HERE, 1335 FROM_HERE,
1336 base::Bind(&NodeController::OnChannelError, base::Unretained(this), 1336 base::Bind(&NodeController::OnChannelError, base::Unretained(this),
1337 from_node, channel)); 1337 from_node, channel));
1338 } 1338 }
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
1415 NodeController::PeerConnection::~PeerConnection() = default; 1415 NodeController::PeerConnection::~PeerConnection() = default;
1416 1416
1417 NodeController::PeerConnection& NodeController::PeerConnection:: 1417 NodeController::PeerConnection& NodeController::PeerConnection::
1418 operator=(const PeerConnection& other) = default; 1418 operator=(const PeerConnection& other) = default;
1419 1419
1420 NodeController::PeerConnection& NodeController::PeerConnection:: 1420 NodeController::PeerConnection& NodeController::PeerConnection::
1421 operator=(PeerConnection&& other) = default; 1421 operator=(PeerConnection&& other) = default;
1422 1422
1423 } // namespace edk 1423 } // namespace edk
1424 } // namespace mojo 1424 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698