Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(999)

Side by Side Diff: mojo/edk/system/node_controller.cc

Issue 2888053002: Rename TaskRunner::RunsTasksOnCurrentThread() in //extensions, //headless, //mojo (Closed)
Patch Set: rebase Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/node_controller.h" 5 #include "mojo/edk/system/node_controller.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <limits> 8 #include <limits>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 } 99 }
100 100
101 // Used by NodeController to watch for shutdown. Since no IO can happen once 101 // Used by NodeController to watch for shutdown. Since no IO can happen once
102 // the IO thread is killed, the NodeController can cleanly drop all its peers 102 // the IO thread is killed, the NodeController can cleanly drop all its peers
103 // at that time. 103 // at that time.
104 class ThreadDestructionObserver : 104 class ThreadDestructionObserver :
105 public base::MessageLoop::DestructionObserver { 105 public base::MessageLoop::DestructionObserver {
106 public: 106 public:
107 static void Create(scoped_refptr<base::TaskRunner> task_runner, 107 static void Create(scoped_refptr<base::TaskRunner> task_runner,
108 const base::Closure& callback) { 108 const base::Closure& callback) {
109 if (task_runner->RunsTasksOnCurrentThread()) { 109 if (task_runner->RunsTasksInCurrentSequence()) {
110 // Owns itself. 110 // Owns itself.
111 new ThreadDestructionObserver(callback); 111 new ThreadDestructionObserver(callback);
112 } else { 112 } else {
113 task_runner->PostTask(FROM_HERE, 113 task_runner->PostTask(FROM_HERE,
114 base::Bind(&Create, task_runner, callback)); 114 base::Bind(&Create, task_runner, callback));
115 } 115 }
116 } 116 }
117 117
118 private: 118 private:
119 explicit ThreadDestructionObserver(const base::Closure& callback) 119 explicit ThreadDestructionObserver(const base::Closure& callback)
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after
337 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); 337 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
338 if (peer) 338 if (peer)
339 peer->NotifyBadMessage(error); 339 peer->NotifyBadMessage(error);
340 } 340 }
341 341
342 void NodeController::SendBrokerClientInvitationOnIOThread( 342 void NodeController::SendBrokerClientInvitationOnIOThread(
343 base::ProcessHandle target_process, 343 base::ProcessHandle target_process,
344 ConnectionParams connection_params, 344 ConnectionParams connection_params,
345 ports::NodeName temporary_node_name, 345 ports::NodeName temporary_node_name,
346 const ProcessErrorCallback& process_error_callback) { 346 const ProcessErrorCallback& process_error_callback) {
347 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 347 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
348 348
349 #if !defined(OS_MACOSX) && !defined(OS_NACL) 349 #if !defined(OS_MACOSX) && !defined(OS_NACL)
350 PlatformChannelPair node_channel; 350 PlatformChannelPair node_channel;
351 ScopedPlatformHandle server_handle = node_channel.PassServerHandle(); 351 ScopedPlatformHandle server_handle = node_channel.PassServerHandle();
352 // BrokerHost owns itself. 352 // BrokerHost owns itself.
353 BrokerHost* broker_host = 353 BrokerHost* broker_host =
354 new BrokerHost(target_process, connection_params.TakeChannelHandle()); 354 new BrokerHost(target_process, connection_params.TakeChannelHandle());
355 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle()); 355 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle());
356 356
357 #if defined(OS_WIN) 357 #if defined(OS_WIN)
(...skipping 29 matching lines...) Expand all
387 387
388 channel->SetRemoteNodeName(temporary_node_name); 388 channel->SetRemoteNodeName(temporary_node_name);
389 channel->SetRemoteProcessHandle(target_process); 389 channel->SetRemoteProcessHandle(target_process);
390 channel->Start(); 390 channel->Start();
391 391
392 channel->AcceptChild(name_, temporary_node_name); 392 channel->AcceptChild(name_, temporary_node_name);
393 } 393 }
394 394
395 void NodeController::AcceptBrokerClientInvitationOnIOThread( 395 void NodeController::AcceptBrokerClientInvitationOnIOThread(
396 ConnectionParams connection_params) { 396 ConnectionParams connection_params) {
397 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 397 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
398 398
399 { 399 {
400 base::AutoLock lock(parent_lock_); 400 base::AutoLock lock(parent_lock_);
401 DCHECK(parent_name_ == ports::kInvalidNodeName); 401 DCHECK(parent_name_ == ports::kInvalidNodeName);
402 402
403 // At this point we don't know the parent's name, so we can't yet insert it 403 // At this point we don't know the parent's name, so we can't yet insert it
404 // into our |peers_| map. That will happen as soon as we receive an 404 // into our |peers_| map. That will happen as soon as we receive an
405 // AcceptChild message from them. 405 // AcceptChild message from them.
406 bootstrap_parent_channel_ = 406 bootstrap_parent_channel_ =
407 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, 407 NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
408 ProcessErrorCallback()); 408 ProcessErrorCallback());
409 // Prevent the parent pipe handle from being closed on shutdown. Pipe 409 // Prevent the parent pipe handle from being closed on shutdown. Pipe
410 // closure is used by the parent to detect the child process has exited. 410 // closure is used by the parent to detect the child process has exited.
411 // Relying on message pipes to be closed is not enough because the parent 411 // Relying on message pipes to be closed is not enough because the parent
412 // may see the message pipe closure before the child is dead, causing the 412 // may see the message pipe closure before the child is dead, causing the
413 // child process to be unexpectedly SIGKILL'd. 413 // child process to be unexpectedly SIGKILL'd.
414 bootstrap_parent_channel_->LeakHandleOnShutdown(); 414 bootstrap_parent_channel_->LeakHandleOnShutdown();
415 } 415 }
416 bootstrap_parent_channel_->Start(); 416 bootstrap_parent_channel_->Start();
417 } 417 }
418 418
419 void NodeController::ConnectToPeerOnIOThread(uint64_t peer_connection_id, 419 void NodeController::ConnectToPeerOnIOThread(uint64_t peer_connection_id,
420 ConnectionParams connection_params, 420 ConnectionParams connection_params,
421 ports::PortRef port) { 421 ports::PortRef port) {
422 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 422 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
423 423
424 scoped_refptr<NodeChannel> channel = NodeChannel::Create( 424 scoped_refptr<NodeChannel> channel = NodeChannel::Create(
425 this, std::move(connection_params), io_task_runner_, {}); 425 this, std::move(connection_params), io_task_runner_, {});
426 426
427 ports::NodeName token; 427 ports::NodeName token;
428 GenerateRandomName(&token); 428 GenerateRandomName(&token);
429 peer_connections_.emplace(token, 429 peer_connections_.emplace(token,
430 PeerConnection{channel, port, peer_connection_id}); 430 PeerConnection{channel, port, peer_connection_id});
431 peer_connections_by_id_.emplace(peer_connection_id, token); 431 peer_connections_by_id_.emplace(peer_connection_id, token);
432 432
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
475 { 475 {
476 base::AutoLock lock(broker_lock_); 476 base::AutoLock lock(broker_lock_);
477 broker_name = broker_name_; 477 broker_name = broker_name_;
478 } 478 }
479 return GetPeerChannel(broker_name); 479 return GetPeerChannel(broker_name);
480 } 480 }
481 481
482 void NodeController::AddPeer(const ports::NodeName& name, 482 void NodeController::AddPeer(const ports::NodeName& name,
483 scoped_refptr<NodeChannel> channel, 483 scoped_refptr<NodeChannel> channel,
484 bool start_channel) { 484 bool start_channel) {
485 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 485 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
486 486
487 DCHECK(name != ports::kInvalidNodeName); 487 DCHECK(name != ports::kInvalidNodeName);
488 DCHECK(channel); 488 DCHECK(channel);
489 489
490 channel->SetRemoteNodeName(name); 490 channel->SetRemoteNodeName(name);
491 491
492 OutgoingMessageQueue pending_messages; 492 OutgoingMessageQueue pending_messages;
493 { 493 {
494 base::AutoLock lock(peers_lock_); 494 base::AutoLock lock(peers_lock_);
495 if (peers_.find(name) != peers_.end()) { 495 if (peers_.find(name) != peers_.end()) {
(...skipping 23 matching lines...) Expand all
519 519
520 // Flush any queued message we need to deliver to this node. 520 // Flush any queued message we need to deliver to this node.
521 while (!pending_messages.empty()) { 521 while (!pending_messages.empty()) {
522 channel->PortsMessage(std::move(pending_messages.front())); 522 channel->PortsMessage(std::move(pending_messages.front()));
523 pending_messages.pop(); 523 pending_messages.pop();
524 } 524 }
525 } 525 }
526 526
527 void NodeController::DropPeer(const ports::NodeName& name, 527 void NodeController::DropPeer(const ports::NodeName& name,
528 NodeChannel* channel) { 528 NodeChannel* channel) {
529 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 529 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
530 530
531 { 531 {
532 base::AutoLock lock(peers_lock_); 532 base::AutoLock lock(peers_lock_);
533 auto it = peers_.find(name); 533 auto it = peers_.find(name);
534 534
535 if (it != peers_.end()) { 535 if (it != peers_.end()) {
536 ports::NodeName peer = it->first; 536 ports::NodeName peer = it->first;
537 peers_.erase(it); 537 peers_.erase(it);
538 DVLOG(1) << "Dropped peer " << peer; 538 DVLOG(1) << "Dropped peer " << peer;
539 } 539 }
(...skipping 176 matching lines...) Expand 10 before | Expand all | Expand 10 after
716 // done after AcceptIncomingMessages() otherwise a message might be missed. 716 // done after AcceptIncomingMessages() otherwise a message might be missed.
717 // Doing it here may result in at most two tasks existing at the same time; 717 // Doing it here may result in at most two tasks existing at the same time;
718 // this running one, and one pending in the task runner. 718 // this running one, and one pending in the task runner.
719 incoming_messages_task_posted_ = false; 719 incoming_messages_task_posted_ = false;
720 } 720 }
721 721
722 AcceptIncomingMessages(); 722 AcceptIncomingMessages();
723 } 723 }
724 724
725 void NodeController::DropAllPeers() { 725 void NodeController::DropAllPeers() {
726 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 726 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
727 727
728 std::vector<scoped_refptr<NodeChannel>> all_peers; 728 std::vector<scoped_refptr<NodeChannel>> all_peers;
729 { 729 {
730 base::AutoLock lock(parent_lock_); 730 base::AutoLock lock(parent_lock_);
731 if (bootstrap_parent_channel_) { 731 if (bootstrap_parent_channel_) {
732 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its 732 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its
733 // existence to determine whether or not this is the root node. Once 733 // existence to determine whether or not this is the root node. Once
734 // bootstrap_parent_channel_->ShutDown() has been called, 734 // bootstrap_parent_channel_->ShutDown() has been called,
735 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't 735 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't
736 // matter if it's deleted now or when |this| is deleted. 736 // matter if it's deleted now or when |this| is deleted.
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
829 observer->OnPortStatusChanged(); 829 observer->OnPortStatusChanged();
830 } else { 830 } else {
831 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " 831 DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
832 << "doesn't have an observer."; 832 << "doesn't have an observer.";
833 } 833 }
834 } 834 }
835 835
836 void NodeController::OnAcceptChild(const ports::NodeName& from_node, 836 void NodeController::OnAcceptChild(const ports::NodeName& from_node,
837 const ports::NodeName& parent_name, 837 const ports::NodeName& parent_name,
838 const ports::NodeName& token) { 838 const ports::NodeName& token) {
839 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 839 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
840 840
841 scoped_refptr<NodeChannel> parent; 841 scoped_refptr<NodeChannel> parent;
842 { 842 {
843 base::AutoLock lock(parent_lock_); 843 base::AutoLock lock(parent_lock_);
844 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { 844 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) {
845 parent_name_ = parent_name; 845 parent_name_ = parent_name;
846 parent = bootstrap_parent_channel_; 846 parent = bootstrap_parent_channel_;
847 } 847 }
848 } 848 }
849 849
850 if (!parent) { 850 if (!parent) {
851 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; 851 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
852 DropPeer(from_node, nullptr); 852 DropPeer(from_node, nullptr);
853 return; 853 return;
854 } 854 }
855 855
856 parent->SetRemoteNodeName(parent_name); 856 parent->SetRemoteNodeName(parent_name);
857 parent->AcceptParent(token, name_); 857 parent->AcceptParent(token, name_);
858 858
859 // NOTE: The child does not actually add its parent as a peer until 859 // NOTE: The child does not actually add its parent as a peer until
860 // receiving an AcceptBrokerClient message from the broker. The parent 860 // receiving an AcceptBrokerClient message from the broker. The parent
861 // will request that said message be sent upon receiving AcceptParent. 861 // will request that said message be sent upon receiving AcceptParent.
862 862
863 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; 863 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
864 } 864 }
865 865
866 void NodeController::OnAcceptParent(const ports::NodeName& from_node, 866 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
867 const ports::NodeName& token, 867 const ports::NodeName& token,
868 const ports::NodeName& child_name) { 868 const ports::NodeName& child_name) {
869 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 869 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
870 870
871 auto it = pending_invitations_.find(from_node); 871 auto it = pending_invitations_.find(from_node);
872 if (it == pending_invitations_.end() || token != from_node) { 872 if (it == pending_invitations_.end() || token != from_node) {
873 DLOG(ERROR) << "Received unexpected AcceptParent message from " 873 DLOG(ERROR) << "Received unexpected AcceptParent message from "
874 << from_node; 874 << from_node;
875 DropPeer(from_node, nullptr); 875 DropPeer(from_node, nullptr);
876 return; 876 return;
877 } 877 }
878 878
879 { 879 {
(...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after
1064 message_queue.pop(); 1064 message_queue.pop();
1065 } 1065 }
1066 } 1066 }
1067 #endif 1067 #endif
1068 1068
1069 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; 1069 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name;
1070 } 1070 }
1071 1071
1072 void NodeController::OnPortsMessage(const ports::NodeName& from_node, 1072 void NodeController::OnPortsMessage(const ports::NodeName& from_node,
1073 Channel::MessagePtr channel_message) { 1073 Channel::MessagePtr channel_message) {
1074 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1074 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1075 1075
1076 void* data; 1076 void* data;
1077 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; 1077 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1078 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, 1078 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
1079 &num_header_bytes, &num_payload_bytes, 1079 &num_header_bytes, &num_payload_bytes,
1080 &num_ports_bytes)) { 1080 &num_ports_bytes)) {
1081 DropPeer(from_node, nullptr); 1081 DropPeer(from_node, nullptr);
1082 return; 1082 return;
1083 } 1083 }
1084 1084
1085 CHECK(channel_message); 1085 CHECK(channel_message);
1086 std::unique_ptr<PortsMessage> ports_message( 1086 std::unique_ptr<PortsMessage> ports_message(
1087 new PortsMessage(num_header_bytes, 1087 new PortsMessage(num_header_bytes,
1088 num_payload_bytes, 1088 num_payload_bytes,
1089 num_ports_bytes, 1089 num_ports_bytes,
1090 std::move(channel_message))); 1090 std::move(channel_message)));
1091 ports_message->set_source_node(from_node); 1091 ports_message->set_source_node(from_node);
1092 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); 1092 node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
1093 AcceptIncomingMessages(); 1093 AcceptIncomingMessages();
1094 } 1094 }
1095 1095
1096 void NodeController::OnRequestPortMerge( 1096 void NodeController::OnRequestPortMerge(
1097 const ports::NodeName& from_node, 1097 const ports::NodeName& from_node,
1098 const ports::PortName& connector_port_name, 1098 const ports::PortName& connector_port_name,
1099 const std::string& name) { 1099 const std::string& name) {
1100 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1100 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1101 1101
1102 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name 1102 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name
1103 << " and port " << connector_port_name << "@" << from_node; 1103 << " and port " << connector_port_name << "@" << from_node;
1104 1104
1105 ports::PortRef local_port; 1105 ports::PortRef local_port;
1106 { 1106 {
1107 base::AutoLock lock(reserved_ports_lock_); 1107 base::AutoLock lock(reserved_ports_lock_);
1108 auto it = reserved_ports_.find(from_node); 1108 auto it = reserved_ports_.find(from_node);
1109 if (it == reserved_ports_.end()) { 1109 if (it == reserved_ports_.end()) {
1110 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". " 1110 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". "
(...skipping 16 matching lines...) Expand all
1127 1127
1128 int rv = node_->MergePorts(local_port, from_node, connector_port_name); 1128 int rv = node_->MergePorts(local_port, from_node, connector_port_name);
1129 if (rv != ports::OK) 1129 if (rv != ports::OK)
1130 DLOG(ERROR) << "MergePorts failed: " << rv; 1130 DLOG(ERROR) << "MergePorts failed: " << rv;
1131 1131
1132 AcceptIncomingMessages(); 1132 AcceptIncomingMessages();
1133 } 1133 }
1134 1134
1135 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, 1135 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
1136 const ports::NodeName& name) { 1136 const ports::NodeName& name) {
1137 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1137 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1138 1138
1139 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); 1139 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
1140 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { 1140 if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
1141 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " 1141 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
1142 << from_node; 1142 << from_node;
1143 DropPeer(from_node, nullptr); 1143 DropPeer(from_node, nullptr);
1144 return; 1144 return;
1145 } 1145 }
1146 1146
1147 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); 1147 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
1148 if (!new_friend) { 1148 if (!new_friend) {
1149 // We don't know who they're talking about! 1149 // We don't know who they're talking about!
1150 requestor->Introduce(name, ScopedPlatformHandle()); 1150 requestor->Introduce(name, ScopedPlatformHandle());
1151 } else { 1151 } else {
1152 PlatformChannelPair new_channel; 1152 PlatformChannelPair new_channel;
1153 requestor->Introduce(name, new_channel.PassServerHandle()); 1153 requestor->Introduce(name, new_channel.PassServerHandle());
1154 new_friend->Introduce(from_node, new_channel.PassClientHandle()); 1154 new_friend->Introduce(from_node, new_channel.PassClientHandle());
1155 } 1155 }
1156 } 1156 }
1157 1157
1158 void NodeController::OnIntroduce(const ports::NodeName& from_node, 1158 void NodeController::OnIntroduce(const ports::NodeName& from_node,
1159 const ports::NodeName& name, 1159 const ports::NodeName& name,
1160 ScopedPlatformHandle channel_handle) { 1160 ScopedPlatformHandle channel_handle) {
1161 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1161 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1162 1162
1163 if (!channel_handle.is_valid()) { 1163 if (!channel_handle.is_valid()) {
1164 node_->LostConnectionToNode(name); 1164 node_->LostConnectionToNode(name);
1165 1165
1166 DVLOG(1) << "Could not be introduced to peer " << name; 1166 DVLOG(1) << "Could not be introduced to peer " << name;
1167 base::AutoLock lock(peers_lock_); 1167 base::AutoLock lock(peers_lock_);
1168 pending_peer_messages_.erase(name); 1168 pending_peer_messages_.erase(name);
1169 return; 1169 return;
1170 } 1170 }
1171 1171
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
1206 message->payload_size()); 1206 message->payload_size());
1207 iter.second->PortsMessage(std::move(peer_message)); 1207 iter.second->PortsMessage(std::move(peer_message));
1208 } 1208 }
1209 } 1209 }
1210 1210
1211 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) 1211 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
1212 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, 1212 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
1213 base::ProcessHandle from_process, 1213 base::ProcessHandle from_process,
1214 const ports::NodeName& destination, 1214 const ports::NodeName& destination,
1215 Channel::MessagePtr message) { 1215 Channel::MessagePtr message) {
1216 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1216 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1217 1217
1218 if (GetBrokerChannel()) { 1218 if (GetBrokerChannel()) {
1219 // Only the broker should be asked to relay a message. 1219 // Only the broker should be asked to relay a message.
1220 LOG(ERROR) << "Non-broker refusing to relay message."; 1220 LOG(ERROR) << "Non-broker refusing to relay message.";
1221 DropPeer(from_node, nullptr); 1221 DropPeer(from_node, nullptr);
1222 return; 1222 return;
1223 } 1223 }
1224 1224
1225 // The parent should always know which process this came from. 1225 // The parent should always know which process this came from.
1226 DCHECK(from_process != base::kNullProcessHandle); 1226 DCHECK(from_process != base::kNullProcessHandle);
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
1284 } 1284 }
1285 1285
1286 OnPortsMessage(source_node, std::move(message)); 1286 OnPortsMessage(source_node, std::move(message));
1287 } 1287 }
1288 #endif 1288 #endif
1289 1289
1290 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, 1290 void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
1291 const ports::NodeName& token, 1291 const ports::NodeName& token,
1292 const ports::NodeName& peer_name, 1292 const ports::NodeName& peer_name,
1293 const ports::PortName& port_name) { 1293 const ports::PortName& port_name) {
1294 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); 1294 DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
1295 1295
1296 auto it = peer_connections_.find(from_node); 1296 auto it = peer_connections_.find(from_node);
1297 if (it == peer_connections_.end()) { 1297 if (it == peer_connections_.end()) {
1298 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; 1298 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
1299 DropPeer(from_node, nullptr); 1299 DropPeer(from_node, nullptr);
1300 return; 1300 return;
1301 } 1301 }
1302 1302
1303 scoped_refptr<NodeChannel> channel = std::move(it->second.channel); 1303 scoped_refptr<NodeChannel> channel = std::move(it->second.channel);
1304 ports::PortRef local_port = it->second.local_port; 1304 ports::PortRef local_port = it->second.local_port;
(...skipping 16 matching lines...) Expand all
1321 1321
1322 // We need to choose one side to initiate the port merge. It doesn't matter 1322 // We need to choose one side to initiate the port merge. It doesn't matter
1323 // who does it as long as they don't both try. Simple solution: pick the one 1323 // who does it as long as they don't both try. Simple solution: pick the one
1324 // with the "smaller" port name. 1324 // with the "smaller" port name.
1325 if (local_port.name() < port_name) 1325 if (local_port.name() < port_name)
1326 node()->MergePorts(local_port, peer_name, port_name); 1326 node()->MergePorts(local_port, peer_name, port_name);
1327 } 1327 }
1328 1328
1329 void NodeController::OnChannelError(const ports::NodeName& from_node, 1329 void NodeController::OnChannelError(const ports::NodeName& from_node,
1330 NodeChannel* channel) { 1330 NodeChannel* channel) {
1331 if (io_task_runner_->RunsTasksOnCurrentThread()) { 1331 if (io_task_runner_->RunsTasksInCurrentSequence()) {
1332 DropPeer(from_node, channel); 1332 DropPeer(from_node, channel);
1333 // DropPeer may have caused local port closures, so be sure to process any 1333 // DropPeer may have caused local port closures, so be sure to process any
1334 // pending local messages. 1334 // pending local messages.
1335 AcceptIncomingMessages(); 1335 AcceptIncomingMessages();
1336 } else { 1336 } else {
1337 io_task_runner_->PostTask( 1337 io_task_runner_->PostTask(
1338 FROM_HERE, 1338 FROM_HERE,
1339 base::Bind(&NodeController::OnChannelError, base::Unretained(this), 1339 base::Bind(&NodeController::OnChannelError, base::Unretained(this),
1340 from_node, channel)); 1340 from_node, channel));
1341 } 1341 }
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
1418 NodeController::PeerConnection::~PeerConnection() = default; 1418 NodeController::PeerConnection::~PeerConnection() = default;
1419 1419
1420 NodeController::PeerConnection& NodeController::PeerConnection:: 1420 NodeController::PeerConnection& NodeController::PeerConnection::
1421 operator=(const PeerConnection& other) = default; 1421 operator=(const PeerConnection& other) = default;
1422 1422
1423 NodeController::PeerConnection& NodeController::PeerConnection:: 1423 NodeController::PeerConnection& NodeController::PeerConnection::
1424 operator=(PeerConnection&& other) = default; 1424 operator=(PeerConnection&& other) = default;
1425 1425
1426 } // namespace edk 1426 } // namespace edk
1427 } // namespace mojo 1427 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/node_channel.cc ('k') | mojo/public/cpp/bindings/lib/interface_endpoint_client.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698