| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 99 } | 99 } |
| 100 | 100 |
| 101 // Used by NodeController to watch for shutdown. Since no IO can happen once | 101 // Used by NodeController to watch for shutdown. Since no IO can happen once |
| 102 // the IO thread is killed, the NodeController can cleanly drop all its peers | 102 // the IO thread is killed, the NodeController can cleanly drop all its peers |
| 103 // at that time. | 103 // at that time. |
| 104 class ThreadDestructionObserver : | 104 class ThreadDestructionObserver : |
| 105 public base::MessageLoop::DestructionObserver { | 105 public base::MessageLoop::DestructionObserver { |
| 106 public: | 106 public: |
| 107 static void Create(scoped_refptr<base::TaskRunner> task_runner, | 107 static void Create(scoped_refptr<base::TaskRunner> task_runner, |
| 108 const base::Closure& callback) { | 108 const base::Closure& callback) { |
| 109 if (task_runner->RunsTasksOnCurrentThread()) { | 109 if (task_runner->RunsTasksInCurrentSequence()) { |
| 110 // Owns itself. | 110 // Owns itself. |
| 111 new ThreadDestructionObserver(callback); | 111 new ThreadDestructionObserver(callback); |
| 112 } else { | 112 } else { |
| 113 task_runner->PostTask(FROM_HERE, | 113 task_runner->PostTask(FROM_HERE, |
| 114 base::Bind(&Create, task_runner, callback)); | 114 base::Bind(&Create, task_runner, callback)); |
| 115 } | 115 } |
| 116 } | 116 } |
| 117 | 117 |
| 118 private: | 118 private: |
| 119 explicit ThreadDestructionObserver(const base::Closure& callback) | 119 explicit ThreadDestructionObserver(const base::Closure& callback) |
| (...skipping 215 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 335 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); | 335 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); |
| 336 if (peer) | 336 if (peer) |
| 337 peer->NotifyBadMessage(error); | 337 peer->NotifyBadMessage(error); |
| 338 } | 338 } |
| 339 | 339 |
| 340 void NodeController::SendBrokerClientInvitationOnIOThread( | 340 void NodeController::SendBrokerClientInvitationOnIOThread( |
| 341 base::ProcessHandle target_process, | 341 base::ProcessHandle target_process, |
| 342 ConnectionParams connection_params, | 342 ConnectionParams connection_params, |
| 343 ports::NodeName temporary_node_name, | 343 ports::NodeName temporary_node_name, |
| 344 const ProcessErrorCallback& process_error_callback) { | 344 const ProcessErrorCallback& process_error_callback) { |
| 345 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 345 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 346 | 346 |
| 347 #if !defined(OS_MACOSX) && !defined(OS_NACL) | 347 #if !defined(OS_MACOSX) && !defined(OS_NACL) |
| 348 PlatformChannelPair node_channel; | 348 PlatformChannelPair node_channel; |
| 349 ScopedPlatformHandle server_handle = node_channel.PassServerHandle(); | 349 ScopedPlatformHandle server_handle = node_channel.PassServerHandle(); |
| 350 // BrokerHost owns itself. | 350 // BrokerHost owns itself. |
| 351 BrokerHost* broker_host = | 351 BrokerHost* broker_host = |
| 352 new BrokerHost(target_process, connection_params.TakeChannelHandle()); | 352 new BrokerHost(target_process, connection_params.TakeChannelHandle()); |
| 353 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle()); | 353 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle()); |
| 354 | 354 |
| 355 #if defined(OS_WIN) | 355 #if defined(OS_WIN) |
| (...skipping 29 matching lines...) Expand all Loading... |
| 385 | 385 |
| 386 channel->SetRemoteNodeName(temporary_node_name); | 386 channel->SetRemoteNodeName(temporary_node_name); |
| 387 channel->SetRemoteProcessHandle(target_process); | 387 channel->SetRemoteProcessHandle(target_process); |
| 388 channel->Start(); | 388 channel->Start(); |
| 389 | 389 |
| 390 channel->AcceptChild(name_, temporary_node_name); | 390 channel->AcceptChild(name_, temporary_node_name); |
| 391 } | 391 } |
| 392 | 392 |
| 393 void NodeController::AcceptBrokerClientInvitationOnIOThread( | 393 void NodeController::AcceptBrokerClientInvitationOnIOThread( |
| 394 ConnectionParams connection_params) { | 394 ConnectionParams connection_params) { |
| 395 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 395 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 396 | 396 |
| 397 { | 397 { |
| 398 base::AutoLock lock(parent_lock_); | 398 base::AutoLock lock(parent_lock_); |
| 399 DCHECK(parent_name_ == ports::kInvalidNodeName); | 399 DCHECK(parent_name_ == ports::kInvalidNodeName); |
| 400 | 400 |
| 401 // At this point we don't know the parent's name, so we can't yet insert it | 401 // At this point we don't know the parent's name, so we can't yet insert it |
| 402 // into our |peers_| map. That will happen as soon as we receive an | 402 // into our |peers_| map. That will happen as soon as we receive an |
| 403 // AcceptChild message from them. | 403 // AcceptChild message from them. |
| 404 bootstrap_parent_channel_ = | 404 bootstrap_parent_channel_ = |
| 405 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, | 405 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, |
| 406 ProcessErrorCallback()); | 406 ProcessErrorCallback()); |
| 407 // Prevent the parent pipe handle from being closed on shutdown. Pipe | 407 // Prevent the parent pipe handle from being closed on shutdown. Pipe |
| 408 // closure is used by the parent to detect the child process has exited. | 408 // closure is used by the parent to detect the child process has exited. |
| 409 // Relying on message pipes to be closed is not enough because the parent | 409 // Relying on message pipes to be closed is not enough because the parent |
| 410 // may see the message pipe closure before the child is dead, causing the | 410 // may see the message pipe closure before the child is dead, causing the |
| 411 // child process to be unexpectedly SIGKILL'd. | 411 // child process to be unexpectedly SIGKILL'd. |
| 412 bootstrap_parent_channel_->LeakHandleOnShutdown(); | 412 bootstrap_parent_channel_->LeakHandleOnShutdown(); |
| 413 } | 413 } |
| 414 bootstrap_parent_channel_->Start(); | 414 bootstrap_parent_channel_->Start(); |
| 415 } | 415 } |
| 416 | 416 |
| 417 void NodeController::ConnectToPeerOnIOThread(ConnectionParams connection_params, | 417 void NodeController::ConnectToPeerOnIOThread(ConnectionParams connection_params, |
| 418 ports::NodeName token, | 418 ports::NodeName token, |
| 419 ports::PortRef port, | 419 ports::PortRef port, |
| 420 const std::string& peer_token) { | 420 const std::string& peer_token) { |
| 421 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 421 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 422 | 422 |
| 423 scoped_refptr<NodeChannel> channel = NodeChannel::Create( | 423 scoped_refptr<NodeChannel> channel = NodeChannel::Create( |
| 424 this, std::move(connection_params), io_task_runner_, {}); | 424 this, std::move(connection_params), io_task_runner_, {}); |
| 425 peer_connections_.insert( | 425 peer_connections_.insert( |
| 426 {token, PeerConnection{channel, port, peer_token}}); | 426 {token, PeerConnection{channel, port, peer_token}}); |
| 427 peers_by_token_.insert({peer_token, token}); | 427 peers_by_token_.insert({peer_token, token}); |
| 428 | 428 |
| 429 channel->SetRemoteNodeName(token); | 429 channel->SetRemoteNodeName(token); |
| 430 channel->Start(); | 430 channel->Start(); |
| 431 | 431 |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 471 { | 471 { |
| 472 base::AutoLock lock(broker_lock_); | 472 base::AutoLock lock(broker_lock_); |
| 473 broker_name = broker_name_; | 473 broker_name = broker_name_; |
| 474 } | 474 } |
| 475 return GetPeerChannel(broker_name); | 475 return GetPeerChannel(broker_name); |
| 476 } | 476 } |
| 477 | 477 |
| 478 void NodeController::AddPeer(const ports::NodeName& name, | 478 void NodeController::AddPeer(const ports::NodeName& name, |
| 479 scoped_refptr<NodeChannel> channel, | 479 scoped_refptr<NodeChannel> channel, |
| 480 bool start_channel) { | 480 bool start_channel) { |
| 481 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 481 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 482 | 482 |
| 483 DCHECK(name != ports::kInvalidNodeName); | 483 DCHECK(name != ports::kInvalidNodeName); |
| 484 DCHECK(channel); | 484 DCHECK(channel); |
| 485 | 485 |
| 486 channel->SetRemoteNodeName(name); | 486 channel->SetRemoteNodeName(name); |
| 487 | 487 |
| 488 OutgoingMessageQueue pending_messages; | 488 OutgoingMessageQueue pending_messages; |
| 489 { | 489 { |
| 490 base::AutoLock lock(peers_lock_); | 490 base::AutoLock lock(peers_lock_); |
| 491 if (peers_.find(name) != peers_.end()) { | 491 if (peers_.find(name) != peers_.end()) { |
| (...skipping 23 matching lines...) Expand all Loading... |
| 515 | 515 |
| 516 // Flush any queued message we need to deliver to this node. | 516 // Flush any queued message we need to deliver to this node. |
| 517 while (!pending_messages.empty()) { | 517 while (!pending_messages.empty()) { |
| 518 channel->PortsMessage(std::move(pending_messages.front())); | 518 channel->PortsMessage(std::move(pending_messages.front())); |
| 519 pending_messages.pop(); | 519 pending_messages.pop(); |
| 520 } | 520 } |
| 521 } | 521 } |
| 522 | 522 |
| 523 void NodeController::DropPeer(const ports::NodeName& name, | 523 void NodeController::DropPeer(const ports::NodeName& name, |
| 524 NodeChannel* channel) { | 524 NodeChannel* channel) { |
| 525 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 525 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 526 | 526 |
| 527 { | 527 { |
| 528 base::AutoLock lock(peers_lock_); | 528 base::AutoLock lock(peers_lock_); |
| 529 auto it = peers_.find(name); | 529 auto it = peers_.find(name); |
| 530 | 530 |
| 531 if (it != peers_.end()) { | 531 if (it != peers_.end()) { |
| 532 ports::NodeName peer = it->first; | 532 ports::NodeName peer = it->first; |
| 533 peers_.erase(it); | 533 peers_.erase(it); |
| 534 DVLOG(1) << "Dropped peer " << peer; | 534 DVLOG(1) << "Dropped peer " << peer; |
| 535 } | 535 } |
| (...skipping 176 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 712 // done after AcceptIncomingMessages() otherwise a message might be missed. | 712 // done after AcceptIncomingMessages() otherwise a message might be missed. |
| 713 // Doing it here may result in at most two tasks existing at the same time; | 713 // Doing it here may result in at most two tasks existing at the same time; |
| 714 // this running one, and one pending in the task runner. | 714 // this running one, and one pending in the task runner. |
| 715 incoming_messages_task_posted_ = false; | 715 incoming_messages_task_posted_ = false; |
| 716 } | 716 } |
| 717 | 717 |
| 718 AcceptIncomingMessages(); | 718 AcceptIncomingMessages(); |
| 719 } | 719 } |
| 720 | 720 |
| 721 void NodeController::DropAllPeers() { | 721 void NodeController::DropAllPeers() { |
| 722 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 722 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 723 | 723 |
| 724 std::vector<scoped_refptr<NodeChannel>> all_peers; | 724 std::vector<scoped_refptr<NodeChannel>> all_peers; |
| 725 { | 725 { |
| 726 base::AutoLock lock(parent_lock_); | 726 base::AutoLock lock(parent_lock_); |
| 727 if (bootstrap_parent_channel_) { | 727 if (bootstrap_parent_channel_) { |
| 728 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its | 728 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its |
| 729 // existence to determine whether or not this is the root node. Once | 729 // existence to determine whether or not this is the root node. Once |
| 730 // bootstrap_parent_channel_->ShutDown() has been called, | 730 // bootstrap_parent_channel_->ShutDown() has been called, |
| 731 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't | 731 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't |
| 732 // matter if it's deleted now or when |this| is deleted. | 732 // matter if it's deleted now or when |this| is deleted. |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 825 observer->OnPortStatusChanged(); | 825 observer->OnPortStatusChanged(); |
| 826 } else { | 826 } else { |
| 827 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " | 827 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " |
| 828 << "doesn't have an observer."; | 828 << "doesn't have an observer."; |
| 829 } | 829 } |
| 830 } | 830 } |
| 831 | 831 |
| 832 void NodeController::OnAcceptChild(const ports::NodeName& from_node, | 832 void NodeController::OnAcceptChild(const ports::NodeName& from_node, |
| 833 const ports::NodeName& parent_name, | 833 const ports::NodeName& parent_name, |
| 834 const ports::NodeName& token) { | 834 const ports::NodeName& token) { |
| 835 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 835 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 836 | 836 |
| 837 scoped_refptr<NodeChannel> parent; | 837 scoped_refptr<NodeChannel> parent; |
| 838 { | 838 { |
| 839 base::AutoLock lock(parent_lock_); | 839 base::AutoLock lock(parent_lock_); |
| 840 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { | 840 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { |
| 841 parent_name_ = parent_name; | 841 parent_name_ = parent_name; |
| 842 parent = bootstrap_parent_channel_; | 842 parent = bootstrap_parent_channel_; |
| 843 } | 843 } |
| 844 } | 844 } |
| 845 | 845 |
| 846 if (!parent) { | 846 if (!parent) { |
| 847 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; | 847 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; |
| 848 DropPeer(from_node, nullptr); | 848 DropPeer(from_node, nullptr); |
| 849 return; | 849 return; |
| 850 } | 850 } |
| 851 | 851 |
| 852 parent->SetRemoteNodeName(parent_name); | 852 parent->SetRemoteNodeName(parent_name); |
| 853 parent->AcceptParent(token, name_); | 853 parent->AcceptParent(token, name_); |
| 854 | 854 |
| 855 // NOTE: The child does not actually add its parent as a peer until | 855 // NOTE: The child does not actually add its parent as a peer until |
| 856 // receiving an AcceptBrokerClient message from the broker. The parent | 856 // receiving an AcceptBrokerClient message from the broker. The parent |
| 857 // will request that said message be sent upon receiving AcceptParent. | 857 // will request that said message be sent upon receiving AcceptParent. |
| 858 | 858 |
| 859 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; | 859 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; |
| 860 } | 860 } |
| 861 | 861 |
| 862 void NodeController::OnAcceptParent(const ports::NodeName& from_node, | 862 void NodeController::OnAcceptParent(const ports::NodeName& from_node, |
| 863 const ports::NodeName& token, | 863 const ports::NodeName& token, |
| 864 const ports::NodeName& child_name) { | 864 const ports::NodeName& child_name) { |
| 865 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 865 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 866 | 866 |
| 867 auto it = pending_invitations_.find(from_node); | 867 auto it = pending_invitations_.find(from_node); |
| 868 if (it == pending_invitations_.end() || token != from_node) { | 868 if (it == pending_invitations_.end() || token != from_node) { |
| 869 DLOG(ERROR) << "Received unexpected AcceptParent message from " | 869 DLOG(ERROR) << "Received unexpected AcceptParent message from " |
| 870 << from_node; | 870 << from_node; |
| 871 DropPeer(from_node, nullptr); | 871 DropPeer(from_node, nullptr); |
| 872 return; | 872 return; |
| 873 } | 873 } |
| 874 | 874 |
| 875 { | 875 { |
| (...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1060 message_queue.pop(); | 1060 message_queue.pop(); |
| 1061 } | 1061 } |
| 1062 } | 1062 } |
| 1063 #endif | 1063 #endif |
| 1064 | 1064 |
| 1065 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; | 1065 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; |
| 1066 } | 1066 } |
| 1067 | 1067 |
| 1068 void NodeController::OnPortsMessage(const ports::NodeName& from_node, | 1068 void NodeController::OnPortsMessage(const ports::NodeName& from_node, |
| 1069 Channel::MessagePtr channel_message) { | 1069 Channel::MessagePtr channel_message) { |
| 1070 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1070 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1071 | 1071 |
| 1072 void* data; | 1072 void* data; |
| 1073 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; | 1073 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; |
| 1074 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, | 1074 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, |
| 1075 &num_header_bytes, &num_payload_bytes, | 1075 &num_header_bytes, &num_payload_bytes, |
| 1076 &num_ports_bytes)) { | 1076 &num_ports_bytes)) { |
| 1077 DropPeer(from_node, nullptr); | 1077 DropPeer(from_node, nullptr); |
| 1078 return; | 1078 return; |
| 1079 } | 1079 } |
| 1080 | 1080 |
| 1081 CHECK(channel_message); | 1081 CHECK(channel_message); |
| 1082 std::unique_ptr<PortsMessage> ports_message( | 1082 std::unique_ptr<PortsMessage> ports_message( |
| 1083 new PortsMessage(num_header_bytes, | 1083 new PortsMessage(num_header_bytes, |
| 1084 num_payload_bytes, | 1084 num_payload_bytes, |
| 1085 num_ports_bytes, | 1085 num_ports_bytes, |
| 1086 std::move(channel_message))); | 1086 std::move(channel_message))); |
| 1087 ports_message->set_source_node(from_node); | 1087 ports_message->set_source_node(from_node); |
| 1088 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); | 1088 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); |
| 1089 AcceptIncomingMessages(); | 1089 AcceptIncomingMessages(); |
| 1090 } | 1090 } |
| 1091 | 1091 |
| 1092 void NodeController::OnRequestPortMerge( | 1092 void NodeController::OnRequestPortMerge( |
| 1093 const ports::NodeName& from_node, | 1093 const ports::NodeName& from_node, |
| 1094 const ports::PortName& connector_port_name, | 1094 const ports::PortName& connector_port_name, |
| 1095 const std::string& name) { | 1095 const std::string& name) { |
| 1096 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1096 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1097 | 1097 |
| 1098 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name | 1098 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name |
| 1099 << " and port " << connector_port_name << "@" << from_node; | 1099 << " and port " << connector_port_name << "@" << from_node; |
| 1100 | 1100 |
| 1101 ports::PortRef local_port; | 1101 ports::PortRef local_port; |
| 1102 { | 1102 { |
| 1103 base::AutoLock lock(reserved_ports_lock_); | 1103 base::AutoLock lock(reserved_ports_lock_); |
| 1104 auto it = reserved_ports_.find(from_node); | 1104 auto it = reserved_ports_.find(from_node); |
| 1105 if (it == reserved_ports_.end()) { | 1105 if (it == reserved_ports_.end()) { |
| 1106 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". " | 1106 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". " |
| (...skipping 16 matching lines...) Expand all Loading... |
| 1123 | 1123 |
| 1124 int rv = node_->MergePorts(local_port, from_node, connector_port_name); | 1124 int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
| 1125 if (rv != ports::OK) | 1125 if (rv != ports::OK) |
| 1126 DLOG(ERROR) << "MergePorts failed: " << rv; | 1126 DLOG(ERROR) << "MergePorts failed: " << rv; |
| 1127 | 1127 |
| 1128 AcceptIncomingMessages(); | 1128 AcceptIncomingMessages(); |
| 1129 } | 1129 } |
| 1130 | 1130 |
| 1131 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, | 1131 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |
| 1132 const ports::NodeName& name) { | 1132 const ports::NodeName& name) { |
| 1133 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1133 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1134 | 1134 |
| 1135 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); | 1135 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); |
| 1136 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { | 1136 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { |
| 1137 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " | 1137 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " |
| 1138 << from_node; | 1138 << from_node; |
| 1139 DropPeer(from_node, nullptr); | 1139 DropPeer(from_node, nullptr); |
| 1140 return; | 1140 return; |
| 1141 } | 1141 } |
| 1142 | 1142 |
| 1143 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); | 1143 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); |
| 1144 if (!new_friend) { | 1144 if (!new_friend) { |
| 1145 // We don't know who they're talking about! | 1145 // We don't know who they're talking about! |
| 1146 requestor->Introduce(name, ScopedPlatformHandle()); | 1146 requestor->Introduce(name, ScopedPlatformHandle()); |
| 1147 } else { | 1147 } else { |
| 1148 PlatformChannelPair new_channel; | 1148 PlatformChannelPair new_channel; |
| 1149 requestor->Introduce(name, new_channel.PassServerHandle()); | 1149 requestor->Introduce(name, new_channel.PassServerHandle()); |
| 1150 new_friend->Introduce(from_node, new_channel.PassClientHandle()); | 1150 new_friend->Introduce(from_node, new_channel.PassClientHandle()); |
| 1151 } | 1151 } |
| 1152 } | 1152 } |
| 1153 | 1153 |
| 1154 void NodeController::OnIntroduce(const ports::NodeName& from_node, | 1154 void NodeController::OnIntroduce(const ports::NodeName& from_node, |
| 1155 const ports::NodeName& name, | 1155 const ports::NodeName& name, |
| 1156 ScopedPlatformHandle channel_handle) { | 1156 ScopedPlatformHandle channel_handle) { |
| 1157 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1157 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1158 | 1158 |
| 1159 if (!channel_handle.is_valid()) { | 1159 if (!channel_handle.is_valid()) { |
| 1160 node_->LostConnectionToNode(name); | 1160 node_->LostConnectionToNode(name); |
| 1161 | 1161 |
| 1162 DVLOG(1) << "Could not be introduced to peer " << name; | 1162 DVLOG(1) << "Could not be introduced to peer " << name; |
| 1163 base::AutoLock lock(peers_lock_); | 1163 base::AutoLock lock(peers_lock_); |
| 1164 pending_peer_messages_.erase(name); | 1164 pending_peer_messages_.erase(name); |
| 1165 return; | 1165 return; |
| 1166 } | 1166 } |
| 1167 | 1167 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1202 message->payload_size()); | 1202 message->payload_size()); |
| 1203 iter.second->PortsMessage(std::move(peer_message)); | 1203 iter.second->PortsMessage(std::move(peer_message)); |
| 1204 } | 1204 } |
| 1205 } | 1205 } |
| 1206 | 1206 |
| 1207 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) | 1207 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
| 1208 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, | 1208 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
| 1209 base::ProcessHandle from_process, | 1209 base::ProcessHandle from_process, |
| 1210 const ports::NodeName& destination, | 1210 const ports::NodeName& destination, |
| 1211 Channel::MessagePtr message) { | 1211 Channel::MessagePtr message) { |
| 1212 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1212 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1213 | 1213 |
| 1214 if (GetBrokerChannel()) { | 1214 if (GetBrokerChannel()) { |
| 1215 // Only the broker should be asked to relay a message. | 1215 // Only the broker should be asked to relay a message. |
| 1216 LOG(ERROR) << "Non-broker refusing to relay message."; | 1216 LOG(ERROR) << "Non-broker refusing to relay message."; |
| 1217 DropPeer(from_node, nullptr); | 1217 DropPeer(from_node, nullptr); |
| 1218 return; | 1218 return; |
| 1219 } | 1219 } |
| 1220 | 1220 |
| 1221 // The parent should always know which process this came from. | 1221 // The parent should always know which process this came from. |
| 1222 DCHECK(from_process != base::kNullProcessHandle); | 1222 DCHECK(from_process != base::kNullProcessHandle); |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1280 } | 1280 } |
| 1281 | 1281 |
| 1282 OnPortsMessage(source_node, std::move(message)); | 1282 OnPortsMessage(source_node, std::move(message)); |
| 1283 } | 1283 } |
| 1284 #endif | 1284 #endif |
| 1285 | 1285 |
| 1286 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, | 1286 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, |
| 1287 const ports::NodeName& token, | 1287 const ports::NodeName& token, |
| 1288 const ports::NodeName& peer_name, | 1288 const ports::NodeName& peer_name, |
| 1289 const ports::PortName& port_name) { | 1289 const ports::PortName& port_name) { |
| 1290 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1290 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1291 | 1291 |
| 1292 auto it = peer_connections_.find(from_node); | 1292 auto it = peer_connections_.find(from_node); |
| 1293 if (it == peer_connections_.end()) { | 1293 if (it == peer_connections_.end()) { |
| 1294 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; | 1294 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; |
| 1295 DropPeer(from_node, nullptr); | 1295 DropPeer(from_node, nullptr); |
| 1296 return; | 1296 return; |
| 1297 } | 1297 } |
| 1298 | 1298 |
| 1299 scoped_refptr<NodeChannel> channel = std::move(it->second.channel); | 1299 scoped_refptr<NodeChannel> channel = std::move(it->second.channel); |
| 1300 ports::PortRef local_port = it->second.local_port; | 1300 ports::PortRef local_port = it->second.local_port; |
| (...skipping 17 matching lines...) Expand all Loading... |
| 1318 // We need to choose one side to initiate the port merge. It doesn't matter | 1318 // We need to choose one side to initiate the port merge. It doesn't matter |
| 1319 // who does it as long as they don't both try. Simple solution: pick the one | 1319 // who does it as long as they don't both try. Simple solution: pick the one |
| 1320 // with the "smaller" port name. | 1320 // with the "smaller" port name. |
| 1321 if (local_port.name() < port_name) { | 1321 if (local_port.name() < port_name) { |
| 1322 node()->MergePorts(local_port, peer_name, port_name); | 1322 node()->MergePorts(local_port, peer_name, port_name); |
| 1323 } | 1323 } |
| 1324 } | 1324 } |
| 1325 | 1325 |
| 1326 void NodeController::OnChannelError(const ports::NodeName& from_node, | 1326 void NodeController::OnChannelError(const ports::NodeName& from_node, |
| 1327 NodeChannel* channel) { | 1327 NodeChannel* channel) { |
| 1328 if (io_task_runner_->RunsTasksOnCurrentThread()) { | 1328 if (io_task_runner_->RunsTasksInCurrentSequence()) { |
| 1329 DropPeer(from_node, channel); | 1329 DropPeer(from_node, channel); |
| 1330 // DropPeer may have caused local port closures, so be sure to process any | 1330 // DropPeer may have caused local port closures, so be sure to process any |
| 1331 // pending local messages. | 1331 // pending local messages. |
| 1332 AcceptIncomingMessages(); | 1332 AcceptIncomingMessages(); |
| 1333 } else { | 1333 } else { |
| 1334 io_task_runner_->PostTask( | 1334 io_task_runner_->PostTask( |
| 1335 FROM_HERE, | 1335 FROM_HERE, |
| 1336 base::Bind(&NodeController::OnChannelError, base::Unretained(this), | 1336 base::Bind(&NodeController::OnChannelError, base::Unretained(this), |
| 1337 from_node, channel)); | 1337 from_node, channel)); |
| 1338 } | 1338 } |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1415 NodeController::PeerConnection::~PeerConnection() = default; | 1415 NodeController::PeerConnection::~PeerConnection() = default; |
| 1416 | 1416 |
| 1417 NodeController::PeerConnection& NodeController::PeerConnection:: | 1417 NodeController::PeerConnection& NodeController::PeerConnection:: |
| 1418 operator=(const PeerConnection& other) = default; | 1418 operator=(const PeerConnection& other) = default; |
| 1419 | 1419 |
| 1420 NodeController::PeerConnection& NodeController::PeerConnection:: | 1420 NodeController::PeerConnection& NodeController::PeerConnection:: |
| 1421 operator=(PeerConnection&& other) = default; | 1421 operator=(PeerConnection&& other) = default; |
| 1422 | 1422 |
| 1423 } // namespace edk | 1423 } // namespace edk |
| 1424 } // namespace mojo | 1424 } // namespace mojo |
| OLD | NEW |