OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 #include <limits> | 8 #include <limits> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
99 } | 99 } |
100 | 100 |
101 // Used by NodeController to watch for shutdown. Since no IO can happen once | 101 // Used by NodeController to watch for shutdown. Since no IO can happen once |
102 // the IO thread is killed, the NodeController can cleanly drop all its peers | 102 // the IO thread is killed, the NodeController can cleanly drop all its peers |
103 // at that time. | 103 // at that time. |
104 class ThreadDestructionObserver : | 104 class ThreadDestructionObserver : |
105 public base::MessageLoop::DestructionObserver { | 105 public base::MessageLoop::DestructionObserver { |
106 public: | 106 public: |
107 static void Create(scoped_refptr<base::TaskRunner> task_runner, | 107 static void Create(scoped_refptr<base::TaskRunner> task_runner, |
108 const base::Closure& callback) { | 108 const base::Closure& callback) { |
109 if (task_runner->RunsTasksOnCurrentThread()) { | 109 if (task_runner->RunsTasksInCurrentSequence()) { |
110 // Owns itself. | 110 // Owns itself. |
111 new ThreadDestructionObserver(callback); | 111 new ThreadDestructionObserver(callback); |
112 } else { | 112 } else { |
113 task_runner->PostTask(FROM_HERE, | 113 task_runner->PostTask(FROM_HERE, |
114 base::Bind(&Create, task_runner, callback)); | 114 base::Bind(&Create, task_runner, callback)); |
115 } | 115 } |
116 } | 116 } |
117 | 117 |
118 private: | 118 private: |
119 explicit ThreadDestructionObserver(const base::Closure& callback) | 119 explicit ThreadDestructionObserver(const base::Closure& callback) |
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
337 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); | 337 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); |
338 if (peer) | 338 if (peer) |
339 peer->NotifyBadMessage(error); | 339 peer->NotifyBadMessage(error); |
340 } | 340 } |
341 | 341 |
342 void NodeController::SendBrokerClientInvitationOnIOThread( | 342 void NodeController::SendBrokerClientInvitationOnIOThread( |
343 base::ProcessHandle target_process, | 343 base::ProcessHandle target_process, |
344 ConnectionParams connection_params, | 344 ConnectionParams connection_params, |
345 ports::NodeName temporary_node_name, | 345 ports::NodeName temporary_node_name, |
346 const ProcessErrorCallback& process_error_callback) { | 346 const ProcessErrorCallback& process_error_callback) { |
347 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 347 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
348 | 348 |
349 #if !defined(OS_MACOSX) && !defined(OS_NACL) | 349 #if !defined(OS_MACOSX) && !defined(OS_NACL) |
350 PlatformChannelPair node_channel; | 350 PlatformChannelPair node_channel; |
351 ScopedPlatformHandle server_handle = node_channel.PassServerHandle(); | 351 ScopedPlatformHandle server_handle = node_channel.PassServerHandle(); |
352 // BrokerHost owns itself. | 352 // BrokerHost owns itself. |
353 BrokerHost* broker_host = | 353 BrokerHost* broker_host = |
354 new BrokerHost(target_process, connection_params.TakeChannelHandle()); | 354 new BrokerHost(target_process, connection_params.TakeChannelHandle()); |
355 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle()); | 355 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle()); |
356 | 356 |
357 #if defined(OS_WIN) | 357 #if defined(OS_WIN) |
(...skipping 29 matching lines...) Expand all Loading... |
387 | 387 |
388 channel->SetRemoteNodeName(temporary_node_name); | 388 channel->SetRemoteNodeName(temporary_node_name); |
389 channel->SetRemoteProcessHandle(target_process); | 389 channel->SetRemoteProcessHandle(target_process); |
390 channel->Start(); | 390 channel->Start(); |
391 | 391 |
392 channel->AcceptChild(name_, temporary_node_name); | 392 channel->AcceptChild(name_, temporary_node_name); |
393 } | 393 } |
394 | 394 |
395 void NodeController::AcceptBrokerClientInvitationOnIOThread( | 395 void NodeController::AcceptBrokerClientInvitationOnIOThread( |
396 ConnectionParams connection_params) { | 396 ConnectionParams connection_params) { |
397 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 397 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
398 | 398 |
399 { | 399 { |
400 base::AutoLock lock(parent_lock_); | 400 base::AutoLock lock(parent_lock_); |
401 DCHECK(parent_name_ == ports::kInvalidNodeName); | 401 DCHECK(parent_name_ == ports::kInvalidNodeName); |
402 | 402 |
403 // At this point we don't know the parent's name, so we can't yet insert it | 403 // At this point we don't know the parent's name, so we can't yet insert it |
404 // into our |peers_| map. That will happen as soon as we receive an | 404 // into our |peers_| map. That will happen as soon as we receive an |
405 // AcceptChild message from them. | 405 // AcceptChild message from them. |
406 bootstrap_parent_channel_ = | 406 bootstrap_parent_channel_ = |
407 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, | 407 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, |
408 ProcessErrorCallback()); | 408 ProcessErrorCallback()); |
409 // Prevent the parent pipe handle from being closed on shutdown. Pipe | 409 // Prevent the parent pipe handle from being closed on shutdown. Pipe |
410 // closure is used by the parent to detect the child process has exited. | 410 // closure is used by the parent to detect the child process has exited. |
411 // Relying on message pipes to be closed is not enough because the parent | 411 // Relying on message pipes to be closed is not enough because the parent |
412 // may see the message pipe closure before the child is dead, causing the | 412 // may see the message pipe closure before the child is dead, causing the |
413 // child process to be unexpectedly SIGKILL'd. | 413 // child process to be unexpectedly SIGKILL'd. |
414 bootstrap_parent_channel_->LeakHandleOnShutdown(); | 414 bootstrap_parent_channel_->LeakHandleOnShutdown(); |
415 } | 415 } |
416 bootstrap_parent_channel_->Start(); | 416 bootstrap_parent_channel_->Start(); |
417 } | 417 } |
418 | 418 |
419 void NodeController::ConnectToPeerOnIOThread(uint64_t peer_connection_id, | 419 void NodeController::ConnectToPeerOnIOThread(uint64_t peer_connection_id, |
420 ConnectionParams connection_params, | 420 ConnectionParams connection_params, |
421 ports::PortRef port) { | 421 ports::PortRef port) { |
422 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 422 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
423 | 423 |
424 scoped_refptr<NodeChannel> channel = NodeChannel::Create( | 424 scoped_refptr<NodeChannel> channel = NodeChannel::Create( |
425 this, std::move(connection_params), io_task_runner_, {}); | 425 this, std::move(connection_params), io_task_runner_, {}); |
426 | 426 |
427 ports::NodeName token; | 427 ports::NodeName token; |
428 GenerateRandomName(&token); | 428 GenerateRandomName(&token); |
429 peer_connections_.emplace(token, | 429 peer_connections_.emplace(token, |
430 PeerConnection{channel, port, peer_connection_id}); | 430 PeerConnection{channel, port, peer_connection_id}); |
431 peer_connections_by_id_.emplace(peer_connection_id, token); | 431 peer_connections_by_id_.emplace(peer_connection_id, token); |
432 | 432 |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
475 { | 475 { |
476 base::AutoLock lock(broker_lock_); | 476 base::AutoLock lock(broker_lock_); |
477 broker_name = broker_name_; | 477 broker_name = broker_name_; |
478 } | 478 } |
479 return GetPeerChannel(broker_name); | 479 return GetPeerChannel(broker_name); |
480 } | 480 } |
481 | 481 |
482 void NodeController::AddPeer(const ports::NodeName& name, | 482 void NodeController::AddPeer(const ports::NodeName& name, |
483 scoped_refptr<NodeChannel> channel, | 483 scoped_refptr<NodeChannel> channel, |
484 bool start_channel) { | 484 bool start_channel) { |
485 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 485 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
486 | 486 |
487 DCHECK(name != ports::kInvalidNodeName); | 487 DCHECK(name != ports::kInvalidNodeName); |
488 DCHECK(channel); | 488 DCHECK(channel); |
489 | 489 |
490 channel->SetRemoteNodeName(name); | 490 channel->SetRemoteNodeName(name); |
491 | 491 |
492 OutgoingMessageQueue pending_messages; | 492 OutgoingMessageQueue pending_messages; |
493 { | 493 { |
494 base::AutoLock lock(peers_lock_); | 494 base::AutoLock lock(peers_lock_); |
495 if (peers_.find(name) != peers_.end()) { | 495 if (peers_.find(name) != peers_.end()) { |
(...skipping 23 matching lines...) Expand all Loading... |
519 | 519 |
520 // Flush any queued message we need to deliver to this node. | 520 // Flush any queued message we need to deliver to this node. |
521 while (!pending_messages.empty()) { | 521 while (!pending_messages.empty()) { |
522 channel->PortsMessage(std::move(pending_messages.front())); | 522 channel->PortsMessage(std::move(pending_messages.front())); |
523 pending_messages.pop(); | 523 pending_messages.pop(); |
524 } | 524 } |
525 } | 525 } |
526 | 526 |
527 void NodeController::DropPeer(const ports::NodeName& name, | 527 void NodeController::DropPeer(const ports::NodeName& name, |
528 NodeChannel* channel) { | 528 NodeChannel* channel) { |
529 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 529 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
530 | 530 |
531 { | 531 { |
532 base::AutoLock lock(peers_lock_); | 532 base::AutoLock lock(peers_lock_); |
533 auto it = peers_.find(name); | 533 auto it = peers_.find(name); |
534 | 534 |
535 if (it != peers_.end()) { | 535 if (it != peers_.end()) { |
536 ports::NodeName peer = it->first; | 536 ports::NodeName peer = it->first; |
537 peers_.erase(it); | 537 peers_.erase(it); |
538 DVLOG(1) << "Dropped peer " << peer; | 538 DVLOG(1) << "Dropped peer " << peer; |
539 } | 539 } |
(...skipping 176 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
716 // done after AcceptIncomingMessages() otherwise a message might be missed. | 716 // done after AcceptIncomingMessages() otherwise a message might be missed. |
717 // Doing it here may result in at most two tasks existing at the same time; | 717 // Doing it here may result in at most two tasks existing at the same time; |
718 // this running one, and one pending in the task runner. | 718 // this running one, and one pending in the task runner. |
719 incoming_messages_task_posted_ = false; | 719 incoming_messages_task_posted_ = false; |
720 } | 720 } |
721 | 721 |
722 AcceptIncomingMessages(); | 722 AcceptIncomingMessages(); |
723 } | 723 } |
724 | 724 |
725 void NodeController::DropAllPeers() { | 725 void NodeController::DropAllPeers() { |
726 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 726 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
727 | 727 |
728 std::vector<scoped_refptr<NodeChannel>> all_peers; | 728 std::vector<scoped_refptr<NodeChannel>> all_peers; |
729 { | 729 { |
730 base::AutoLock lock(parent_lock_); | 730 base::AutoLock lock(parent_lock_); |
731 if (bootstrap_parent_channel_) { | 731 if (bootstrap_parent_channel_) { |
732 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its | 732 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its |
733 // existence to determine whether or not this is the root node. Once | 733 // existence to determine whether or not this is the root node. Once |
734 // bootstrap_parent_channel_->ShutDown() has been called, | 734 // bootstrap_parent_channel_->ShutDown() has been called, |
735 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't | 735 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't |
736 // matter if it's deleted now or when |this| is deleted. | 736 // matter if it's deleted now or when |this| is deleted. |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
829 observer->OnPortStatusChanged(); | 829 observer->OnPortStatusChanged(); |
830 } else { | 830 } else { |
831 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " | 831 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " |
832 << "doesn't have an observer."; | 832 << "doesn't have an observer."; |
833 } | 833 } |
834 } | 834 } |
835 | 835 |
836 void NodeController::OnAcceptChild(const ports::NodeName& from_node, | 836 void NodeController::OnAcceptChild(const ports::NodeName& from_node, |
837 const ports::NodeName& parent_name, | 837 const ports::NodeName& parent_name, |
838 const ports::NodeName& token) { | 838 const ports::NodeName& token) { |
839 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 839 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
840 | 840 |
841 scoped_refptr<NodeChannel> parent; | 841 scoped_refptr<NodeChannel> parent; |
842 { | 842 { |
843 base::AutoLock lock(parent_lock_); | 843 base::AutoLock lock(parent_lock_); |
844 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { | 844 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { |
845 parent_name_ = parent_name; | 845 parent_name_ = parent_name; |
846 parent = bootstrap_parent_channel_; | 846 parent = bootstrap_parent_channel_; |
847 } | 847 } |
848 } | 848 } |
849 | 849 |
850 if (!parent) { | 850 if (!parent) { |
851 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; | 851 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; |
852 DropPeer(from_node, nullptr); | 852 DropPeer(from_node, nullptr); |
853 return; | 853 return; |
854 } | 854 } |
855 | 855 |
856 parent->SetRemoteNodeName(parent_name); | 856 parent->SetRemoteNodeName(parent_name); |
857 parent->AcceptParent(token, name_); | 857 parent->AcceptParent(token, name_); |
858 | 858 |
859 // NOTE: The child does not actually add its parent as a peer until | 859 // NOTE: The child does not actually add its parent as a peer until |
860 // receiving an AcceptBrokerClient message from the broker. The parent | 860 // receiving an AcceptBrokerClient message from the broker. The parent |
861 // will request that said message be sent upon receiving AcceptParent. | 861 // will request that said message be sent upon receiving AcceptParent. |
862 | 862 |
863 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; | 863 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; |
864 } | 864 } |
865 | 865 |
866 void NodeController::OnAcceptParent(const ports::NodeName& from_node, | 866 void NodeController::OnAcceptParent(const ports::NodeName& from_node, |
867 const ports::NodeName& token, | 867 const ports::NodeName& token, |
868 const ports::NodeName& child_name) { | 868 const ports::NodeName& child_name) { |
869 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 869 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
870 | 870 |
871 auto it = pending_invitations_.find(from_node); | 871 auto it = pending_invitations_.find(from_node); |
872 if (it == pending_invitations_.end() || token != from_node) { | 872 if (it == pending_invitations_.end() || token != from_node) { |
873 DLOG(ERROR) << "Received unexpected AcceptParent message from " | 873 DLOG(ERROR) << "Received unexpected AcceptParent message from " |
874 << from_node; | 874 << from_node; |
875 DropPeer(from_node, nullptr); | 875 DropPeer(from_node, nullptr); |
876 return; | 876 return; |
877 } | 877 } |
878 | 878 |
879 { | 879 { |
(...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1064 message_queue.pop(); | 1064 message_queue.pop(); |
1065 } | 1065 } |
1066 } | 1066 } |
1067 #endif | 1067 #endif |
1068 | 1068 |
1069 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; | 1069 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; |
1070 } | 1070 } |
1071 | 1071 |
1072 void NodeController::OnPortsMessage(const ports::NodeName& from_node, | 1072 void NodeController::OnPortsMessage(const ports::NodeName& from_node, |
1073 Channel::MessagePtr channel_message) { | 1073 Channel::MessagePtr channel_message) { |
1074 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1074 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
1075 | 1075 |
1076 void* data; | 1076 void* data; |
1077 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; | 1077 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; |
1078 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, | 1078 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, |
1079 &num_header_bytes, &num_payload_bytes, | 1079 &num_header_bytes, &num_payload_bytes, |
1080 &num_ports_bytes)) { | 1080 &num_ports_bytes)) { |
1081 DropPeer(from_node, nullptr); | 1081 DropPeer(from_node, nullptr); |
1082 return; | 1082 return; |
1083 } | 1083 } |
1084 | 1084 |
1085 CHECK(channel_message); | 1085 CHECK(channel_message); |
1086 std::unique_ptr<PortsMessage> ports_message( | 1086 std::unique_ptr<PortsMessage> ports_message( |
1087 new PortsMessage(num_header_bytes, | 1087 new PortsMessage(num_header_bytes, |
1088 num_payload_bytes, | 1088 num_payload_bytes, |
1089 num_ports_bytes, | 1089 num_ports_bytes, |
1090 std::move(channel_message))); | 1090 std::move(channel_message))); |
1091 ports_message->set_source_node(from_node); | 1091 ports_message->set_source_node(from_node); |
1092 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); | 1092 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); |
1093 AcceptIncomingMessages(); | 1093 AcceptIncomingMessages(); |
1094 } | 1094 } |
1095 | 1095 |
1096 void NodeController::OnRequestPortMerge( | 1096 void NodeController::OnRequestPortMerge( |
1097 const ports::NodeName& from_node, | 1097 const ports::NodeName& from_node, |
1098 const ports::PortName& connector_port_name, | 1098 const ports::PortName& connector_port_name, |
1099 const std::string& name) { | 1099 const std::string& name) { |
1100 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1100 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
1101 | 1101 |
1102 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name | 1102 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name |
1103 << " and port " << connector_port_name << "@" << from_node; | 1103 << " and port " << connector_port_name << "@" << from_node; |
1104 | 1104 |
1105 ports::PortRef local_port; | 1105 ports::PortRef local_port; |
1106 { | 1106 { |
1107 base::AutoLock lock(reserved_ports_lock_); | 1107 base::AutoLock lock(reserved_ports_lock_); |
1108 auto it = reserved_ports_.find(from_node); | 1108 auto it = reserved_ports_.find(from_node); |
1109 if (it == reserved_ports_.end()) { | 1109 if (it == reserved_ports_.end()) { |
1110 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". " | 1110 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". " |
(...skipping 16 matching lines...) Expand all Loading... |
1127 | 1127 |
1128 int rv = node_->MergePorts(local_port, from_node, connector_port_name); | 1128 int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
1129 if (rv != ports::OK) | 1129 if (rv != ports::OK) |
1130 DLOG(ERROR) << "MergePorts failed: " << rv; | 1130 DLOG(ERROR) << "MergePorts failed: " << rv; |
1131 | 1131 |
1132 AcceptIncomingMessages(); | 1132 AcceptIncomingMessages(); |
1133 } | 1133 } |
1134 | 1134 |
1135 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, | 1135 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |
1136 const ports::NodeName& name) { | 1136 const ports::NodeName& name) { |
1137 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1137 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
1138 | 1138 |
1139 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); | 1139 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); |
1140 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { | 1140 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { |
1141 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " | 1141 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " |
1142 << from_node; | 1142 << from_node; |
1143 DropPeer(from_node, nullptr); | 1143 DropPeer(from_node, nullptr); |
1144 return; | 1144 return; |
1145 } | 1145 } |
1146 | 1146 |
1147 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); | 1147 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); |
1148 if (!new_friend) { | 1148 if (!new_friend) { |
1149 // We don't know who they're talking about! | 1149 // We don't know who they're talking about! |
1150 requestor->Introduce(name, ScopedPlatformHandle()); | 1150 requestor->Introduce(name, ScopedPlatformHandle()); |
1151 } else { | 1151 } else { |
1152 PlatformChannelPair new_channel; | 1152 PlatformChannelPair new_channel; |
1153 requestor->Introduce(name, new_channel.PassServerHandle()); | 1153 requestor->Introduce(name, new_channel.PassServerHandle()); |
1154 new_friend->Introduce(from_node, new_channel.PassClientHandle()); | 1154 new_friend->Introduce(from_node, new_channel.PassClientHandle()); |
1155 } | 1155 } |
1156 } | 1156 } |
1157 | 1157 |
1158 void NodeController::OnIntroduce(const ports::NodeName& from_node, | 1158 void NodeController::OnIntroduce(const ports::NodeName& from_node, |
1159 const ports::NodeName& name, | 1159 const ports::NodeName& name, |
1160 ScopedPlatformHandle channel_handle) { | 1160 ScopedPlatformHandle channel_handle) { |
1161 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1161 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
1162 | 1162 |
1163 if (!channel_handle.is_valid()) { | 1163 if (!channel_handle.is_valid()) { |
1164 node_->LostConnectionToNode(name); | 1164 node_->LostConnectionToNode(name); |
1165 | 1165 |
1166 DVLOG(1) << "Could not be introduced to peer " << name; | 1166 DVLOG(1) << "Could not be introduced to peer " << name; |
1167 base::AutoLock lock(peers_lock_); | 1167 base::AutoLock lock(peers_lock_); |
1168 pending_peer_messages_.erase(name); | 1168 pending_peer_messages_.erase(name); |
1169 return; | 1169 return; |
1170 } | 1170 } |
1171 | 1171 |
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1206 message->payload_size()); | 1206 message->payload_size()); |
1207 iter.second->PortsMessage(std::move(peer_message)); | 1207 iter.second->PortsMessage(std::move(peer_message)); |
1208 } | 1208 } |
1209 } | 1209 } |
1210 | 1210 |
1211 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) | 1211 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
1212 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, | 1212 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
1213 base::ProcessHandle from_process, | 1213 base::ProcessHandle from_process, |
1214 const ports::NodeName& destination, | 1214 const ports::NodeName& destination, |
1215 Channel::MessagePtr message) { | 1215 Channel::MessagePtr message) { |
1216 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1216 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
1217 | 1217 |
1218 if (GetBrokerChannel()) { | 1218 if (GetBrokerChannel()) { |
1219 // Only the broker should be asked to relay a message. | 1219 // Only the broker should be asked to relay a message. |
1220 LOG(ERROR) << "Non-broker refusing to relay message."; | 1220 LOG(ERROR) << "Non-broker refusing to relay message."; |
1221 DropPeer(from_node, nullptr); | 1221 DropPeer(from_node, nullptr); |
1222 return; | 1222 return; |
1223 } | 1223 } |
1224 | 1224 |
1225 // The parent should always know which process this came from. | 1225 // The parent should always know which process this came from. |
1226 DCHECK(from_process != base::kNullProcessHandle); | 1226 DCHECK(from_process != base::kNullProcessHandle); |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1284 } | 1284 } |
1285 | 1285 |
1286 OnPortsMessage(source_node, std::move(message)); | 1286 OnPortsMessage(source_node, std::move(message)); |
1287 } | 1287 } |
1288 #endif | 1288 #endif |
1289 | 1289 |
1290 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, | 1290 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, |
1291 const ports::NodeName& token, | 1291 const ports::NodeName& token, |
1292 const ports::NodeName& peer_name, | 1292 const ports::NodeName& peer_name, |
1293 const ports::PortName& port_name) { | 1293 const ports::PortName& port_name) { |
1294 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1294 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
1295 | 1295 |
1296 auto it = peer_connections_.find(from_node); | 1296 auto it = peer_connections_.find(from_node); |
1297 if (it == peer_connections_.end()) { | 1297 if (it == peer_connections_.end()) { |
1298 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; | 1298 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; |
1299 DropPeer(from_node, nullptr); | 1299 DropPeer(from_node, nullptr); |
1300 return; | 1300 return; |
1301 } | 1301 } |
1302 | 1302 |
1303 scoped_refptr<NodeChannel> channel = std::move(it->second.channel); | 1303 scoped_refptr<NodeChannel> channel = std::move(it->second.channel); |
1304 ports::PortRef local_port = it->second.local_port; | 1304 ports::PortRef local_port = it->second.local_port; |
(...skipping 16 matching lines...) Expand all Loading... |
1321 | 1321 |
1322 // We need to choose one side to initiate the port merge. It doesn't matter | 1322 // We need to choose one side to initiate the port merge. It doesn't matter |
1323 // who does it as long as they don't both try. Simple solution: pick the one | 1323 // who does it as long as they don't both try. Simple solution: pick the one |
1324 // with the "smaller" port name. | 1324 // with the "smaller" port name. |
1325 if (local_port.name() < port_name) | 1325 if (local_port.name() < port_name) |
1326 node()->MergePorts(local_port, peer_name, port_name); | 1326 node()->MergePorts(local_port, peer_name, port_name); |
1327 } | 1327 } |
1328 | 1328 |
1329 void NodeController::OnChannelError(const ports::NodeName& from_node, | 1329 void NodeController::OnChannelError(const ports::NodeName& from_node, |
1330 NodeChannel* channel) { | 1330 NodeChannel* channel) { |
1331 if (io_task_runner_->RunsTasksOnCurrentThread()) { | 1331 if (io_task_runner_->RunsTasksInCurrentSequence()) { |
1332 DropPeer(from_node, channel); | 1332 DropPeer(from_node, channel); |
1333 // DropPeer may have caused local port closures, so be sure to process any | 1333 // DropPeer may have caused local port closures, so be sure to process any |
1334 // pending local messages. | 1334 // pending local messages. |
1335 AcceptIncomingMessages(); | 1335 AcceptIncomingMessages(); |
1336 } else { | 1336 } else { |
1337 io_task_runner_->PostTask( | 1337 io_task_runner_->PostTask( |
1338 FROM_HERE, | 1338 FROM_HERE, |
1339 base::Bind(&NodeController::OnChannelError, base::Unretained(this), | 1339 base::Bind(&NodeController::OnChannelError, base::Unretained(this), |
1340 from_node, channel)); | 1340 from_node, channel)); |
1341 } | 1341 } |
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1418 NodeController::PeerConnection::~PeerConnection() = default; | 1418 NodeController::PeerConnection::~PeerConnection() = default; |
1419 | 1419 |
1420 NodeController::PeerConnection& NodeController::PeerConnection:: | 1420 NodeController::PeerConnection& NodeController::PeerConnection:: |
1421 operator=(const PeerConnection& other) = default; | 1421 operator=(const PeerConnection& other) = default; |
1422 | 1422 |
1423 NodeController::PeerConnection& NodeController::PeerConnection:: | 1423 NodeController::PeerConnection& NodeController::PeerConnection:: |
1424 operator=(PeerConnection&& other) = default; | 1424 operator=(PeerConnection&& other) = default; |
1425 | 1425 |
1426 } // namespace edk | 1426 } // namespace edk |
1427 } // namespace mojo | 1427 } // namespace mojo |
OLD | NEW |