| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/node_controller.h" | 5 #include "mojo/edk/system/node_controller.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 #include <limits> | 8 #include <limits> |
| 9 | 9 |
| 10 #include "base/bind.h" | 10 #include "base/bind.h" |
| (...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 99 } | 99 } |
| 100 | 100 |
| 101 // Used by NodeController to watch for shutdown. Since no IO can happen once | 101 // Used by NodeController to watch for shutdown. Since no IO can happen once |
| 102 // the IO thread is killed, the NodeController can cleanly drop all its peers | 102 // the IO thread is killed, the NodeController can cleanly drop all its peers |
| 103 // at that time. | 103 // at that time. |
| 104 class ThreadDestructionObserver : | 104 class ThreadDestructionObserver : |
| 105 public base::MessageLoop::DestructionObserver { | 105 public base::MessageLoop::DestructionObserver { |
| 106 public: | 106 public: |
| 107 static void Create(scoped_refptr<base::TaskRunner> task_runner, | 107 static void Create(scoped_refptr<base::TaskRunner> task_runner, |
| 108 const base::Closure& callback) { | 108 const base::Closure& callback) { |
| 109 if (task_runner->RunsTasksOnCurrentThread()) { | 109 if (task_runner->RunsTasksInCurrentSequence()) { |
| 110 // Owns itself. | 110 // Owns itself. |
| 111 new ThreadDestructionObserver(callback); | 111 new ThreadDestructionObserver(callback); |
| 112 } else { | 112 } else { |
| 113 task_runner->PostTask(FROM_HERE, | 113 task_runner->PostTask(FROM_HERE, |
| 114 base::Bind(&Create, task_runner, callback)); | 114 base::Bind(&Create, task_runner, callback)); |
| 115 } | 115 } |
| 116 } | 116 } |
| 117 | 117 |
| 118 private: | 118 private: |
| 119 explicit ThreadDestructionObserver(const base::Closure& callback) | 119 explicit ThreadDestructionObserver(const base::Closure& callback) |
| (...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 337 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); | 337 scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); |
| 338 if (peer) | 338 if (peer) |
| 339 peer->NotifyBadMessage(error); | 339 peer->NotifyBadMessage(error); |
| 340 } | 340 } |
| 341 | 341 |
| 342 void NodeController::SendBrokerClientInvitationOnIOThread( | 342 void NodeController::SendBrokerClientInvitationOnIOThread( |
| 343 base::ProcessHandle target_process, | 343 base::ProcessHandle target_process, |
| 344 ConnectionParams connection_params, | 344 ConnectionParams connection_params, |
| 345 ports::NodeName temporary_node_name, | 345 ports::NodeName temporary_node_name, |
| 346 const ProcessErrorCallback& process_error_callback) { | 346 const ProcessErrorCallback& process_error_callback) { |
| 347 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 347 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 348 | 348 |
| 349 #if !defined(OS_MACOSX) && !defined(OS_NACL) | 349 #if !defined(OS_MACOSX) && !defined(OS_NACL) |
| 350 PlatformChannelPair node_channel; | 350 PlatformChannelPair node_channel; |
| 351 ScopedPlatformHandle server_handle = node_channel.PassServerHandle(); | 351 ScopedPlatformHandle server_handle = node_channel.PassServerHandle(); |
| 352 // BrokerHost owns itself. | 352 // BrokerHost owns itself. |
| 353 BrokerHost* broker_host = | 353 BrokerHost* broker_host = |
| 354 new BrokerHost(target_process, connection_params.TakeChannelHandle()); | 354 new BrokerHost(target_process, connection_params.TakeChannelHandle()); |
| 355 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle()); | 355 bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle()); |
| 356 | 356 |
| 357 #if defined(OS_WIN) | 357 #if defined(OS_WIN) |
| (...skipping 29 matching lines...) Expand all Loading... |
| 387 | 387 |
| 388 channel->SetRemoteNodeName(temporary_node_name); | 388 channel->SetRemoteNodeName(temporary_node_name); |
| 389 channel->SetRemoteProcessHandle(target_process); | 389 channel->SetRemoteProcessHandle(target_process); |
| 390 channel->Start(); | 390 channel->Start(); |
| 391 | 391 |
| 392 channel->AcceptChild(name_, temporary_node_name); | 392 channel->AcceptChild(name_, temporary_node_name); |
| 393 } | 393 } |
| 394 | 394 |
| 395 void NodeController::AcceptBrokerClientInvitationOnIOThread( | 395 void NodeController::AcceptBrokerClientInvitationOnIOThread( |
| 396 ConnectionParams connection_params) { | 396 ConnectionParams connection_params) { |
| 397 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 397 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 398 | 398 |
| 399 { | 399 { |
| 400 base::AutoLock lock(parent_lock_); | 400 base::AutoLock lock(parent_lock_); |
| 401 DCHECK(parent_name_ == ports::kInvalidNodeName); | 401 DCHECK(parent_name_ == ports::kInvalidNodeName); |
| 402 | 402 |
| 403 // At this point we don't know the parent's name, so we can't yet insert it | 403 // At this point we don't know the parent's name, so we can't yet insert it |
| 404 // into our |peers_| map. That will happen as soon as we receive an | 404 // into our |peers_| map. That will happen as soon as we receive an |
| 405 // AcceptChild message from them. | 405 // AcceptChild message from them. |
| 406 bootstrap_parent_channel_ = | 406 bootstrap_parent_channel_ = |
| 407 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, | 407 NodeChannel::Create(this, std::move(connection_params), io_task_runner_, |
| 408 ProcessErrorCallback()); | 408 ProcessErrorCallback()); |
| 409 // Prevent the parent pipe handle from being closed on shutdown. Pipe | 409 // Prevent the parent pipe handle from being closed on shutdown. Pipe |
| 410 // closure is used by the parent to detect the child process has exited. | 410 // closure is used by the parent to detect the child process has exited. |
| 411 // Relying on message pipes to be closed is not enough because the parent | 411 // Relying on message pipes to be closed is not enough because the parent |
| 412 // may see the message pipe closure before the child is dead, causing the | 412 // may see the message pipe closure before the child is dead, causing the |
| 413 // child process to be unexpectedly SIGKILL'd. | 413 // child process to be unexpectedly SIGKILL'd. |
| 414 bootstrap_parent_channel_->LeakHandleOnShutdown(); | 414 bootstrap_parent_channel_->LeakHandleOnShutdown(); |
| 415 } | 415 } |
| 416 bootstrap_parent_channel_->Start(); | 416 bootstrap_parent_channel_->Start(); |
| 417 } | 417 } |
| 418 | 418 |
| 419 void NodeController::ConnectToPeerOnIOThread(uint64_t peer_connection_id, | 419 void NodeController::ConnectToPeerOnIOThread(uint64_t peer_connection_id, |
| 420 ConnectionParams connection_params, | 420 ConnectionParams connection_params, |
| 421 ports::PortRef port) { | 421 ports::PortRef port) { |
| 422 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 422 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 423 | 423 |
| 424 scoped_refptr<NodeChannel> channel = NodeChannel::Create( | 424 scoped_refptr<NodeChannel> channel = NodeChannel::Create( |
| 425 this, std::move(connection_params), io_task_runner_, {}); | 425 this, std::move(connection_params), io_task_runner_, {}); |
| 426 | 426 |
| 427 ports::NodeName token; | 427 ports::NodeName token; |
| 428 GenerateRandomName(&token); | 428 GenerateRandomName(&token); |
| 429 peer_connections_.emplace(token, | 429 peer_connections_.emplace(token, |
| 430 PeerConnection{channel, port, peer_connection_id}); | 430 PeerConnection{channel, port, peer_connection_id}); |
| 431 peer_connections_by_id_.emplace(peer_connection_id, token); | 431 peer_connections_by_id_.emplace(peer_connection_id, token); |
| 432 | 432 |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 475 { | 475 { |
| 476 base::AutoLock lock(broker_lock_); | 476 base::AutoLock lock(broker_lock_); |
| 477 broker_name = broker_name_; | 477 broker_name = broker_name_; |
| 478 } | 478 } |
| 479 return GetPeerChannel(broker_name); | 479 return GetPeerChannel(broker_name); |
| 480 } | 480 } |
| 481 | 481 |
| 482 void NodeController::AddPeer(const ports::NodeName& name, | 482 void NodeController::AddPeer(const ports::NodeName& name, |
| 483 scoped_refptr<NodeChannel> channel, | 483 scoped_refptr<NodeChannel> channel, |
| 484 bool start_channel) { | 484 bool start_channel) { |
| 485 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 485 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 486 | 486 |
| 487 DCHECK(name != ports::kInvalidNodeName); | 487 DCHECK(name != ports::kInvalidNodeName); |
| 488 DCHECK(channel); | 488 DCHECK(channel); |
| 489 | 489 |
| 490 channel->SetRemoteNodeName(name); | 490 channel->SetRemoteNodeName(name); |
| 491 | 491 |
| 492 OutgoingMessageQueue pending_messages; | 492 OutgoingMessageQueue pending_messages; |
| 493 { | 493 { |
| 494 base::AutoLock lock(peers_lock_); | 494 base::AutoLock lock(peers_lock_); |
| 495 if (peers_.find(name) != peers_.end()) { | 495 if (peers_.find(name) != peers_.end()) { |
| (...skipping 23 matching lines...) Expand all Loading... |
| 519 | 519 |
| 520 // Flush any queued message we need to deliver to this node. | 520 // Flush any queued message we need to deliver to this node. |
| 521 while (!pending_messages.empty()) { | 521 while (!pending_messages.empty()) { |
| 522 channel->PortsMessage(std::move(pending_messages.front())); | 522 channel->PortsMessage(std::move(pending_messages.front())); |
| 523 pending_messages.pop(); | 523 pending_messages.pop(); |
| 524 } | 524 } |
| 525 } | 525 } |
| 526 | 526 |
| 527 void NodeController::DropPeer(const ports::NodeName& name, | 527 void NodeController::DropPeer(const ports::NodeName& name, |
| 528 NodeChannel* channel) { | 528 NodeChannel* channel) { |
| 529 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 529 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 530 | 530 |
| 531 { | 531 { |
| 532 base::AutoLock lock(peers_lock_); | 532 base::AutoLock lock(peers_lock_); |
| 533 auto it = peers_.find(name); | 533 auto it = peers_.find(name); |
| 534 | 534 |
| 535 if (it != peers_.end()) { | 535 if (it != peers_.end()) { |
| 536 ports::NodeName peer = it->first; | 536 ports::NodeName peer = it->first; |
| 537 peers_.erase(it); | 537 peers_.erase(it); |
| 538 DVLOG(1) << "Dropped peer " << peer; | 538 DVLOG(1) << "Dropped peer " << peer; |
| 539 } | 539 } |
| (...skipping 176 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 716 // done after AcceptIncomingMessages() otherwise a message might be missed. | 716 // done after AcceptIncomingMessages() otherwise a message might be missed. |
| 717 // Doing it here may result in at most two tasks existing at the same time; | 717 // Doing it here may result in at most two tasks existing at the same time; |
| 718 // this running one, and one pending in the task runner. | 718 // this running one, and one pending in the task runner. |
| 719 incoming_messages_task_posted_ = false; | 719 incoming_messages_task_posted_ = false; |
| 720 } | 720 } |
| 721 | 721 |
| 722 AcceptIncomingMessages(); | 722 AcceptIncomingMessages(); |
| 723 } | 723 } |
| 724 | 724 |
| 725 void NodeController::DropAllPeers() { | 725 void NodeController::DropAllPeers() { |
| 726 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 726 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 727 | 727 |
| 728 std::vector<scoped_refptr<NodeChannel>> all_peers; | 728 std::vector<scoped_refptr<NodeChannel>> all_peers; |
| 729 { | 729 { |
| 730 base::AutoLock lock(parent_lock_); | 730 base::AutoLock lock(parent_lock_); |
| 731 if (bootstrap_parent_channel_) { | 731 if (bootstrap_parent_channel_) { |
| 732 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its | 732 // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its |
| 733 // existence to determine whether or not this is the root node. Once | 733 // existence to determine whether or not this is the root node. Once |
| 734 // bootstrap_parent_channel_->ShutDown() has been called, | 734 // bootstrap_parent_channel_->ShutDown() has been called, |
| 735 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't | 735 // |bootstrap_parent_channel_| is essentially a dead object and it doesn't |
| 736 // matter if it's deleted now or when |this| is deleted. | 736 // matter if it's deleted now or when |this| is deleted. |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 829 observer->OnPortStatusChanged(); | 829 observer->OnPortStatusChanged(); |
| 830 } else { | 830 } else { |
| 831 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " | 831 DVLOG(2) << "Ignoring status change for " << port.name() << " because it " |
| 832 << "doesn't have an observer."; | 832 << "doesn't have an observer."; |
| 833 } | 833 } |
| 834 } | 834 } |
| 835 | 835 |
| 836 void NodeController::OnAcceptChild(const ports::NodeName& from_node, | 836 void NodeController::OnAcceptChild(const ports::NodeName& from_node, |
| 837 const ports::NodeName& parent_name, | 837 const ports::NodeName& parent_name, |
| 838 const ports::NodeName& token) { | 838 const ports::NodeName& token) { |
| 839 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 839 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 840 | 840 |
| 841 scoped_refptr<NodeChannel> parent; | 841 scoped_refptr<NodeChannel> parent; |
| 842 { | 842 { |
| 843 base::AutoLock lock(parent_lock_); | 843 base::AutoLock lock(parent_lock_); |
| 844 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { | 844 if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) { |
| 845 parent_name_ = parent_name; | 845 parent_name_ = parent_name; |
| 846 parent = bootstrap_parent_channel_; | 846 parent = bootstrap_parent_channel_; |
| 847 } | 847 } |
| 848 } | 848 } |
| 849 | 849 |
| 850 if (!parent) { | 850 if (!parent) { |
| 851 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; | 851 DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node; |
| 852 DropPeer(from_node, nullptr); | 852 DropPeer(from_node, nullptr); |
| 853 return; | 853 return; |
| 854 } | 854 } |
| 855 | 855 |
| 856 parent->SetRemoteNodeName(parent_name); | 856 parent->SetRemoteNodeName(parent_name); |
| 857 parent->AcceptParent(token, name_); | 857 parent->AcceptParent(token, name_); |
| 858 | 858 |
| 859 // NOTE: The child does not actually add its parent as a peer until | 859 // NOTE: The child does not actually add its parent as a peer until |
| 860 // receiving an AcceptBrokerClient message from the broker. The parent | 860 // receiving an AcceptBrokerClient message from the broker. The parent |
| 861 // will request that said message be sent upon receiving AcceptParent. | 861 // will request that said message be sent upon receiving AcceptParent. |
| 862 | 862 |
| 863 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; | 863 DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name; |
| 864 } | 864 } |
| 865 | 865 |
| 866 void NodeController::OnAcceptParent(const ports::NodeName& from_node, | 866 void NodeController::OnAcceptParent(const ports::NodeName& from_node, |
| 867 const ports::NodeName& token, | 867 const ports::NodeName& token, |
| 868 const ports::NodeName& child_name) { | 868 const ports::NodeName& child_name) { |
| 869 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 869 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 870 | 870 |
| 871 auto it = pending_invitations_.find(from_node); | 871 auto it = pending_invitations_.find(from_node); |
| 872 if (it == pending_invitations_.end() || token != from_node) { | 872 if (it == pending_invitations_.end() || token != from_node) { |
| 873 DLOG(ERROR) << "Received unexpected AcceptParent message from " | 873 DLOG(ERROR) << "Received unexpected AcceptParent message from " |
| 874 << from_node; | 874 << from_node; |
| 875 DropPeer(from_node, nullptr); | 875 DropPeer(from_node, nullptr); |
| 876 return; | 876 return; |
| 877 } | 877 } |
| 878 | 878 |
| 879 { | 879 { |
| (...skipping 184 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1064 message_queue.pop(); | 1064 message_queue.pop(); |
| 1065 } | 1065 } |
| 1066 } | 1066 } |
| 1067 #endif | 1067 #endif |
| 1068 | 1068 |
| 1069 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; | 1069 DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name; |
| 1070 } | 1070 } |
| 1071 | 1071 |
| 1072 void NodeController::OnPortsMessage(const ports::NodeName& from_node, | 1072 void NodeController::OnPortsMessage(const ports::NodeName& from_node, |
| 1073 Channel::MessagePtr channel_message) { | 1073 Channel::MessagePtr channel_message) { |
| 1074 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1074 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1075 | 1075 |
| 1076 void* data; | 1076 void* data; |
| 1077 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; | 1077 size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes; |
| 1078 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, | 1078 if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes, |
| 1079 &num_header_bytes, &num_payload_bytes, | 1079 &num_header_bytes, &num_payload_bytes, |
| 1080 &num_ports_bytes)) { | 1080 &num_ports_bytes)) { |
| 1081 DropPeer(from_node, nullptr); | 1081 DropPeer(from_node, nullptr); |
| 1082 return; | 1082 return; |
| 1083 } | 1083 } |
| 1084 | 1084 |
| 1085 CHECK(channel_message); | 1085 CHECK(channel_message); |
| 1086 std::unique_ptr<PortsMessage> ports_message( | 1086 std::unique_ptr<PortsMessage> ports_message( |
| 1087 new PortsMessage(num_header_bytes, | 1087 new PortsMessage(num_header_bytes, |
| 1088 num_payload_bytes, | 1088 num_payload_bytes, |
| 1089 num_ports_bytes, | 1089 num_ports_bytes, |
| 1090 std::move(channel_message))); | 1090 std::move(channel_message))); |
| 1091 ports_message->set_source_node(from_node); | 1091 ports_message->set_source_node(from_node); |
| 1092 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); | 1092 node_->AcceptMessage(ports::ScopedMessage(ports_message.release())); |
| 1093 AcceptIncomingMessages(); | 1093 AcceptIncomingMessages(); |
| 1094 } | 1094 } |
| 1095 | 1095 |
| 1096 void NodeController::OnRequestPortMerge( | 1096 void NodeController::OnRequestPortMerge( |
| 1097 const ports::NodeName& from_node, | 1097 const ports::NodeName& from_node, |
| 1098 const ports::PortName& connector_port_name, | 1098 const ports::PortName& connector_port_name, |
| 1099 const std::string& name) { | 1099 const std::string& name) { |
| 1100 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1100 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1101 | 1101 |
| 1102 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name | 1102 DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name |
| 1103 << " and port " << connector_port_name << "@" << from_node; | 1103 << " and port " << connector_port_name << "@" << from_node; |
| 1104 | 1104 |
| 1105 ports::PortRef local_port; | 1105 ports::PortRef local_port; |
| 1106 { | 1106 { |
| 1107 base::AutoLock lock(reserved_ports_lock_); | 1107 base::AutoLock lock(reserved_ports_lock_); |
| 1108 auto it = reserved_ports_.find(from_node); | 1108 auto it = reserved_ports_.find(from_node); |
| 1109 if (it == reserved_ports_.end()) { | 1109 if (it == reserved_ports_.end()) { |
| 1110 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". " | 1110 DVLOG(1) << "Ignoring port merge request from node " << from_node << ". " |
| (...skipping 16 matching lines...) Expand all Loading... |
| 1127 | 1127 |
| 1128 int rv = node_->MergePorts(local_port, from_node, connector_port_name); | 1128 int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
| 1129 if (rv != ports::OK) | 1129 if (rv != ports::OK) |
| 1130 DLOG(ERROR) << "MergePorts failed: " << rv; | 1130 DLOG(ERROR) << "MergePorts failed: " << rv; |
| 1131 | 1131 |
| 1132 AcceptIncomingMessages(); | 1132 AcceptIncomingMessages(); |
| 1133 } | 1133 } |
| 1134 | 1134 |
| 1135 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, | 1135 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |
| 1136 const ports::NodeName& name) { | 1136 const ports::NodeName& name) { |
| 1137 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1137 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1138 | 1138 |
| 1139 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); | 1139 scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); |
| 1140 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { | 1140 if (from_node == name || name == ports::kInvalidNodeName || !requestor) { |
| 1141 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " | 1141 DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " |
| 1142 << from_node; | 1142 << from_node; |
| 1143 DropPeer(from_node, nullptr); | 1143 DropPeer(from_node, nullptr); |
| 1144 return; | 1144 return; |
| 1145 } | 1145 } |
| 1146 | 1146 |
| 1147 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); | 1147 scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); |
| 1148 if (!new_friend) { | 1148 if (!new_friend) { |
| 1149 // We don't know who they're talking about! | 1149 // We don't know who they're talking about! |
| 1150 requestor->Introduce(name, ScopedPlatformHandle()); | 1150 requestor->Introduce(name, ScopedPlatformHandle()); |
| 1151 } else { | 1151 } else { |
| 1152 PlatformChannelPair new_channel; | 1152 PlatformChannelPair new_channel; |
| 1153 requestor->Introduce(name, new_channel.PassServerHandle()); | 1153 requestor->Introduce(name, new_channel.PassServerHandle()); |
| 1154 new_friend->Introduce(from_node, new_channel.PassClientHandle()); | 1154 new_friend->Introduce(from_node, new_channel.PassClientHandle()); |
| 1155 } | 1155 } |
| 1156 } | 1156 } |
| 1157 | 1157 |
| 1158 void NodeController::OnIntroduce(const ports::NodeName& from_node, | 1158 void NodeController::OnIntroduce(const ports::NodeName& from_node, |
| 1159 const ports::NodeName& name, | 1159 const ports::NodeName& name, |
| 1160 ScopedPlatformHandle channel_handle) { | 1160 ScopedPlatformHandle channel_handle) { |
| 1161 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1161 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1162 | 1162 |
| 1163 if (!channel_handle.is_valid()) { | 1163 if (!channel_handle.is_valid()) { |
| 1164 node_->LostConnectionToNode(name); | 1164 node_->LostConnectionToNode(name); |
| 1165 | 1165 |
| 1166 DVLOG(1) << "Could not be introduced to peer " << name; | 1166 DVLOG(1) << "Could not be introduced to peer " << name; |
| 1167 base::AutoLock lock(peers_lock_); | 1167 base::AutoLock lock(peers_lock_); |
| 1168 pending_peer_messages_.erase(name); | 1168 pending_peer_messages_.erase(name); |
| 1169 return; | 1169 return; |
| 1170 } | 1170 } |
| 1171 | 1171 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1206 message->payload_size()); | 1206 message->payload_size()); |
| 1207 iter.second->PortsMessage(std::move(peer_message)); | 1207 iter.second->PortsMessage(std::move(peer_message)); |
| 1208 } | 1208 } |
| 1209 } | 1209 } |
| 1210 | 1210 |
| 1211 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) | 1211 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
| 1212 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, | 1212 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node, |
| 1213 base::ProcessHandle from_process, | 1213 base::ProcessHandle from_process, |
| 1214 const ports::NodeName& destination, | 1214 const ports::NodeName& destination, |
| 1215 Channel::MessagePtr message) { | 1215 Channel::MessagePtr message) { |
| 1216 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1216 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1217 | 1217 |
| 1218 if (GetBrokerChannel()) { | 1218 if (GetBrokerChannel()) { |
| 1219 // Only the broker should be asked to relay a message. | 1219 // Only the broker should be asked to relay a message. |
| 1220 LOG(ERROR) << "Non-broker refusing to relay message."; | 1220 LOG(ERROR) << "Non-broker refusing to relay message."; |
| 1221 DropPeer(from_node, nullptr); | 1221 DropPeer(from_node, nullptr); |
| 1222 return; | 1222 return; |
| 1223 } | 1223 } |
| 1224 | 1224 |
| 1225 // The parent should always know which process this came from. | 1225 // The parent should always know which process this came from. |
| 1226 DCHECK(from_process != base::kNullProcessHandle); | 1226 DCHECK(from_process != base::kNullProcessHandle); |
| (...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1284 } | 1284 } |
| 1285 | 1285 |
| 1286 OnPortsMessage(source_node, std::move(message)); | 1286 OnPortsMessage(source_node, std::move(message)); |
| 1287 } | 1287 } |
| 1288 #endif | 1288 #endif |
| 1289 | 1289 |
| 1290 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, | 1290 void NodeController::OnAcceptPeer(const ports::NodeName& from_node, |
| 1291 const ports::NodeName& token, | 1291 const ports::NodeName& token, |
| 1292 const ports::NodeName& peer_name, | 1292 const ports::NodeName& peer_name, |
| 1293 const ports::PortName& port_name) { | 1293 const ports::PortName& port_name) { |
| 1294 DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); | 1294 DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| 1295 | 1295 |
| 1296 auto it = peer_connections_.find(from_node); | 1296 auto it = peer_connections_.find(from_node); |
| 1297 if (it == peer_connections_.end()) { | 1297 if (it == peer_connections_.end()) { |
| 1298 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; | 1298 DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; |
| 1299 DropPeer(from_node, nullptr); | 1299 DropPeer(from_node, nullptr); |
| 1300 return; | 1300 return; |
| 1301 } | 1301 } |
| 1302 | 1302 |
| 1303 scoped_refptr<NodeChannel> channel = std::move(it->second.channel); | 1303 scoped_refptr<NodeChannel> channel = std::move(it->second.channel); |
| 1304 ports::PortRef local_port = it->second.local_port; | 1304 ports::PortRef local_port = it->second.local_port; |
| (...skipping 16 matching lines...) Expand all Loading... |
| 1321 | 1321 |
| 1322 // We need to choose one side to initiate the port merge. It doesn't matter | 1322 // We need to choose one side to initiate the port merge. It doesn't matter |
| 1323 // who does it as long as they don't both try. Simple solution: pick the one | 1323 // who does it as long as they don't both try. Simple solution: pick the one |
| 1324 // with the "smaller" port name. | 1324 // with the "smaller" port name. |
| 1325 if (local_port.name() < port_name) | 1325 if (local_port.name() < port_name) |
| 1326 node()->MergePorts(local_port, peer_name, port_name); | 1326 node()->MergePorts(local_port, peer_name, port_name); |
| 1327 } | 1327 } |
| 1328 | 1328 |
| 1329 void NodeController::OnChannelError(const ports::NodeName& from_node, | 1329 void NodeController::OnChannelError(const ports::NodeName& from_node, |
| 1330 NodeChannel* channel) { | 1330 NodeChannel* channel) { |
| 1331 if (io_task_runner_->RunsTasksOnCurrentThread()) { | 1331 if (io_task_runner_->RunsTasksInCurrentSequence()) { |
| 1332 DropPeer(from_node, channel); | 1332 DropPeer(from_node, channel); |
| 1333 // DropPeer may have caused local port closures, so be sure to process any | 1333 // DropPeer may have caused local port closures, so be sure to process any |
| 1334 // pending local messages. | 1334 // pending local messages. |
| 1335 AcceptIncomingMessages(); | 1335 AcceptIncomingMessages(); |
| 1336 } else { | 1336 } else { |
| 1337 io_task_runner_->PostTask( | 1337 io_task_runner_->PostTask( |
| 1338 FROM_HERE, | 1338 FROM_HERE, |
| 1339 base::Bind(&NodeController::OnChannelError, base::Unretained(this), | 1339 base::Bind(&NodeController::OnChannelError, base::Unretained(this), |
| 1340 from_node, channel)); | 1340 from_node, channel)); |
| 1341 } | 1341 } |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1418 NodeController::PeerConnection::~PeerConnection() = default; | 1418 NodeController::PeerConnection::~PeerConnection() = default; |
| 1419 | 1419 |
| 1420 NodeController::PeerConnection& NodeController::PeerConnection:: | 1420 NodeController::PeerConnection& NodeController::PeerConnection:: |
| 1421 operator=(const PeerConnection& other) = default; | 1421 operator=(const PeerConnection& other) = default; |
| 1422 | 1422 |
| 1423 NodeController::PeerConnection& NodeController::PeerConnection:: | 1423 NodeController::PeerConnection& NodeController::PeerConnection:: |
| 1424 operator=(PeerConnection&& other) = default; | 1424 operator=(PeerConnection&& other) = default; |
| 1425 | 1425 |
| 1426 } // namespace edk | 1426 } // namespace edk |
| 1427 } // namespace mojo | 1427 } // namespace mojo |
| OLD | NEW |