| Index: mojo/edk/system/node_controller.cc
|
| diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..030cf47296ed8102d5d73814d451663b53a2bb64
|
| --- /dev/null
|
| +++ b/mojo/edk/system/node_controller.cc
|
| @@ -0,0 +1,771 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/edk/system/node_controller.h"
|
| +
|
| +#include <algorithm>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/location.h"
|
| +#include "base/logging.h"
|
| +#include "base/macros.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "crypto/random.h"
|
| +#include "mojo/edk/embedder/embedder_internal.h"
|
| +#include "mojo/edk/embedder/platform_channel_pair.h"
|
| +#include "mojo/edk/embedder/platform_support.h"
|
| +#include "mojo/edk/system/core.h"
|
| +#include "mojo/edk/system/ports_message.h"
|
| +
|
| +namespace mojo {
|
| +namespace edk {
|
| +
|
| +namespace {
|
| +
|
| +template <typename T>
|
| +void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); }
|
| +
|
| +ports::NodeName GetRandomNodeName() {
|
| + ports::NodeName name;
|
| + GenerateRandomName(&name);
|
| + return name;
|
| +}
|
| +
|
| +// Used by NodeController to watch for shutdown. Since no IO can happen once
|
| +// the IO thread is killed, the NodeController can cleanly drop all its peers
|
| +// at that time.
|
| +class ThreadDestructionObserver :
|
| + public base::MessageLoop::DestructionObserver {
|
| + public:
|
| + static void Create(scoped_refptr<base::TaskRunner> task_runner,
|
| + const base::Closure& callback) {
|
| + if (task_runner->RunsTasksOnCurrentThread()) {
|
| + // Owns itself.
|
| + new ThreadDestructionObserver(callback);
|
| + } else {
|
| + task_runner->PostTask(FROM_HERE,
|
| + base::Bind(&Create, task_runner, callback));
|
| + }
|
| + }
|
| +
|
| + private:
|
| + explicit ThreadDestructionObserver(const base::Closure& callback)
|
| + : callback_(callback) {
|
| + base::MessageLoop::current()->AddDestructionObserver(this);
|
| + }
|
| +
|
| + ~ThreadDestructionObserver() override {
|
| + base::MessageLoop::current()->RemoveDestructionObserver(this);
|
| + }
|
| +
|
| + // base::MessageLoop::DestructionObserver:
|
| + void WillDestroyCurrentMessageLoop() override {
|
| + callback_.Run();
|
| + delete this;
|
| + }
|
| +
|
| + const base::Closure callback_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
|
| +};
|
| +
|
| +} // namespace
|
| +
|
| +NodeController::PendingPortRequest::PendingPortRequest() {}
|
| +
|
| +NodeController::PendingPortRequest::~PendingPortRequest() {}
|
| +
|
| +NodeController::ReservedPort::ReservedPort() {}
|
| +
|
| +NodeController::ReservedPort::~ReservedPort() {}
|
| +
|
| +NodeController::~NodeController() {}
|
| +
|
| +NodeController::NodeController(Core* core)
|
| + : core_(core),
|
| + name_(GetRandomNodeName()),
|
| + node_(new ports::Node(name_, this)) {
|
| + DVLOG(1) << "Initializing node " << name_;
|
| +}
|
| +
|
| +void NodeController::SetIOTaskRunner(
|
| + scoped_refptr<base::TaskRunner> task_runner) {
|
| + io_task_runner_ = task_runner;
|
| + ThreadDestructionObserver::Create(
|
| + io_task_runner_,
|
| + base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
|
| +}
|
| +
|
| +void NodeController::ConnectToChild(base::ProcessHandle process_handle,
|
| + ScopedPlatformHandle platform_handle) {
|
| + io_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&NodeController::ConnectToChildOnIOThread,
|
| + base::Unretained(this),
|
| + process_handle,
|
| + base::Passed(&platform_handle)));
|
| +}
|
| +
|
| +void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
|
| + io_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&NodeController::ConnectToParentOnIOThread,
|
| + base::Unretained(this),
|
| + base::Passed(&platform_handle)));
|
| +}
|
| +
|
| +void NodeController::SetPortObserver(
|
| + const ports::PortRef& port,
|
| + const scoped_refptr<PortObserver>& observer) {
|
| + node_->SetUserData(port, observer);
|
| +}
|
| +
|
| +void NodeController::ClosePort(const ports::PortRef& port) {
|
| + SetPortObserver(port, nullptr);
|
| + int rv = node_->ClosePort(port);
|
| + DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
|
| +
|
| + AcceptIncomingMessages();
|
| +}
|
| +
|
| +int NodeController::SendMessage(const ports::PortRef& port,
|
| + scoped_ptr<PortsMessage>* message) {
|
| + ports::ScopedMessage ports_message(message->release());
|
| + int rv = node_->SendMessage(port, &ports_message);
|
| + if (rv != ports::OK) {
|
| + DCHECK(ports_message);
|
| + message->reset(static_cast<PortsMessage*>(ports_message.release()));
|
| + }
|
| +
|
| + AcceptIncomingMessages();
|
| + return rv;
|
| +}
|
| +
|
| +void NodeController::ReservePort(const std::string& token,
|
| + const ReservePortCallback& callback) {
|
| + ports::PortRef port;
|
| + node_->CreateUninitializedPort(&port);
|
| +
|
| + DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
|
| + << token;
|
| +
|
| + base::AutoLock lock(reserved_ports_lock_);
|
| + ReservedPort reservation;
|
| + reservation.local_port = port;
|
| + reservation.callback = callback;
|
| + reserved_ports_.insert(std::make_pair(token, reservation));
|
| +}
|
| +
|
| +scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
|
| + size_t num_bytes) {
|
| + // TODO: Broker through the parent over a sync channel. :(
|
| + return internal::g_platform_support->CreateSharedBuffer(num_bytes);
|
| +}
|
| +
|
| +void NodeController::ConnectToParentPort(const ports::PortRef& local_port,
|
| + const std::string& token,
|
| + const base::Closure& callback) {
|
| + io_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&NodeController::RequestParentPortConnectionOnIOThread,
|
| + base::Unretained(this), local_port, token, callback));
|
| +}
|
| +
|
| +void NodeController::ConnectReservedPorts(const std::string& token1,
|
| + const std::string& token2) {
|
| + ReservedPort port1;
|
| + ReservedPort port2;
|
| + {
|
| + base::AutoLock lock(reserved_ports_lock_);
|
| + auto it1 = reserved_ports_.find(token1);
|
| + if (it1 == reserved_ports_.end())
|
| + return;
|
| + auto it2 = reserved_ports_.find(token2);
|
| + if (it2 == reserved_ports_.end())
|
| + return;
|
| + port1 = it1->second;
|
| + port2 = it2->second;
|
| + reserved_ports_.erase(it1);
|
| + reserved_ports_.erase(it2);
|
| + }
|
| +
|
| + node_->InitializePort(port1.local_port, name_, port2.local_port.name());
|
| + node_->InitializePort(port2.local_port, name_, port1.local_port.name());
|
| + port1.callback.Run(port1.local_port);
|
| + port2.callback.Run(port2.local_port);
|
| +}
|
| +
|
| +void NodeController::RequestShutdown(const base::Closure& callback) {
|
| + {
|
| + base::AutoLock lock(shutdown_lock_);
|
| + shutdown_callback_ = callback;
|
| + }
|
| +
|
| + AttemptShutdownIfRequested();
|
| +}
|
| +
|
| +void NodeController::ConnectToChildOnIOThread(
|
| + base::ProcessHandle process_handle,
|
| + ScopedPlatformHandle platform_handle) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + scoped_refptr<NodeChannel> channel =
|
| + NodeChannel::Create(this, std::move(platform_handle), io_task_runner_);
|
| +
|
| + ports::NodeName token;
|
| + GenerateRandomName(&token);
|
| +
|
| + channel->SetRemoteNodeName(token);
|
| + channel->SetRemoteProcessHandle(process_handle);
|
| + channel->Start();
|
| + channel->AcceptChild(name_, token);
|
| +
|
| + pending_children_.insert(std::make_pair(token, channel));
|
| +}
|
| +
|
| +void NodeController::ConnectToParentOnIOThread(
|
| + ScopedPlatformHandle platform_handle) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + base::AutoLock lock(parent_lock_);
|
| + DCHECK(parent_name_ == ports::kInvalidNodeName);
|
| +
|
| + // At this point we don't know the parent's name, so we can't yet insert it
|
| + // into our |peers_| map. That will happen as soon as we receive an
|
| + // AcceptChild message from them.
|
| + bootstrap_parent_channel_ =
|
| + NodeChannel::Create(this, std::move(platform_handle), io_task_runner_);
|
| + bootstrap_parent_channel_->Start();
|
| +}
|
| +
|
| +void NodeController::RequestParentPortConnectionOnIOThread(
|
| + const ports::PortRef& local_port,
|
| + const std::string& token,
|
| + const base::Closure& callback) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + scoped_refptr<NodeChannel> parent = GetParentChannel();
|
| + if (!parent) {
|
| + PendingPortRequest request;
|
| + request.token = token;
|
| + request.local_port = local_port;
|
| + request.callback = callback;
|
| + pending_port_requests_.push_back(request);
|
| + return;
|
| + }
|
| +
|
| + pending_port_connections_.insert(std::make_pair(local_port.name(), callback));
|
| + parent->RequestPortConnection(local_port.name(), token);
|
| +}
|
| +
|
| +scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
|
| + const ports::NodeName& name) {
|
| + base::AutoLock lock(peers_lock_);
|
| + auto it = peers_.find(name);
|
| + if (it == peers_.end())
|
| + return nullptr;
|
| + return it->second;
|
| +}
|
| +
|
| +scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
|
| + ports::NodeName parent_name;
|
| + {
|
| + base::AutoLock lock(parent_lock_);
|
| + parent_name = parent_name_;
|
| + }
|
| + return GetPeerChannel(parent_name);
|
| +}
|
| +
|
| +void NodeController::AddPeer(const ports::NodeName& name,
|
| + scoped_refptr<NodeChannel> channel,
|
| + bool start_channel) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + DCHECK(name != ports::kInvalidNodeName);
|
| + DCHECK(channel);
|
| +
|
| + channel->SetRemoteNodeName(name);
|
| +
|
| + base::AutoLock lock(peers_lock_);
|
| + if (peers_.find(name) != peers_.end()) {
|
| + // This can happen normally if two nodes race to be introduced to each
|
| + // other. The losing pipe will be silently closed and introduction should
|
| + // not be affected.
|
| + DVLOG(1) << "Ignoring duplicate peer name " << name;
|
| + return;
|
| + }
|
| +
|
| + auto result = peers_.insert(std::make_pair(name, channel));
|
| + DCHECK(result.second);
|
| +
|
| + DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
|
| +
|
| + if (start_channel)
|
| + channel->Start();
|
| +
|
| + // Flush any queued message we need to deliver to this node.
|
| + OutgoingMessageQueue pending_messages;
|
| + auto it = pending_peer_messages_.find(name);
|
| + if (it != pending_peer_messages_.end()) {
|
| + auto& message_queue = it->second;
|
| + while (!message_queue.empty()) {
|
| + ports::ScopedMessage message = std::move(message_queue.front());
|
| + channel->PortsMessage(
|
| + static_cast<PortsMessage*>(message.get())->TakeChannelMessage());
|
| + message_queue.pop();
|
| + }
|
| + pending_peer_messages_.erase(it);
|
| + }
|
| +}
|
| +
|
| +void NodeController::DropPeer(const ports::NodeName& name) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + {
|
| + base::AutoLock lock(peers_lock_);
|
| + auto it = peers_.find(name);
|
| +
|
| + if (it != peers_.end()) {
|
| + ports::NodeName peer = it->first;
|
| + peers_.erase(it);
|
| + DVLOG(1) << "Dropped peer " << peer;
|
| + }
|
| +
|
| + pending_peer_messages_.erase(name);
|
| + pending_children_.erase(name);
|
| + }
|
| +
|
| + node_->LostConnectionToNode(name);
|
| +}
|
| +
|
| +void NodeController::SendPeerMessage(const ports::NodeName& name,
|
| + ports::ScopedMessage message) {
|
| + PortsMessage* ports_message = static_cast<PortsMessage*>(message.get());
|
| +
|
| +#if defined(OS_WIN)
|
| + // If we're sending a message with handles and we're not the parent,
|
| + // relay the message through the parent.
|
| + if (ports_message->has_handles()) {
|
| + scoped_refptr<NodeChannel> parent = GetParentChannel();
|
| + if (parent) {
|
| + parent->RelayPortsMessage(name, ports_message->TakeChannelMessage());
|
| + return;
|
| + }
|
| + }
|
| +#endif
|
| +
|
| + scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
|
| + if (peer) {
|
| + peer->PortsMessage(ports_message->TakeChannelMessage());
|
| + return;
|
| + }
|
| +
|
| + // If we don't know who the peer is, queue the message for delivery. If this
|
| + // is the first message queued for the peer, we also ask the parent to
|
| + // introduce us to them.
|
| +
|
| + bool needs_introduction = false;
|
| + {
|
| + base::AutoLock lock(peers_lock_);
|
| + auto& queue = pending_peer_messages_[name];
|
| + needs_introduction = queue.empty();
|
| + queue.emplace(std::move(message));
|
| + }
|
| +
|
| + if (needs_introduction) {
|
| + scoped_refptr<NodeChannel> parent = GetParentChannel();
|
| + if (!parent) {
|
| + DVLOG(1) << "Dropping message for unknown peer: " << name;
|
| + return;
|
| + }
|
| + parent->RequestIntroduction(name);
|
| + }
|
| +}
|
| +
|
| +void NodeController::AcceptIncomingMessages() {
|
| + std::queue<ports::ScopedMessage> messages;
|
| + for (;;) {
|
| + // TODO: We may need to be more careful to avoid starving the rest of the
|
| + // thread here. Revisit this if it turns out to be a problem. One
|
| + // alternative would be to schedule a task to continue pumping messages
|
| + // after flushing once.
|
| +
|
| + {
|
| + base::AutoLock lock(messages_lock_);
|
| + if (incoming_messages_.empty())
|
| + break;
|
| + std::swap(messages, incoming_messages_);
|
| + }
|
| +
|
| + while (!messages.empty()) {
|
| + node_->AcceptMessage(std::move(messages.front()));
|
| + messages.pop();
|
| + }
|
| + }
|
| + AttemptShutdownIfRequested();
|
| +}
|
| +
|
| +void NodeController::DropAllPeers() {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + {
|
| + base::AutoLock lock(parent_lock_);
|
| + if (bootstrap_parent_channel_) {
|
| + bootstrap_parent_channel_->ShutDown();
|
| + bootstrap_parent_channel_ = nullptr;
|
| + }
|
| + }
|
| +
|
| + std::vector<scoped_refptr<NodeChannel>> all_peers;
|
| + {
|
| + base::AutoLock lock(peers_lock_);
|
| + for (const auto& peer : peers_)
|
| + all_peers.push_back(peer.second);
|
| + for (const auto& peer : pending_children_)
|
| + all_peers.push_back(peer.second);
|
| + peers_.clear();
|
| + pending_children_.clear();
|
| + pending_peer_messages_.clear();
|
| + }
|
| +
|
| + for (const auto& peer : all_peers)
|
| + peer->ShutDown();
|
| +
|
| + if (destroy_on_io_thread_shutdown_)
|
| + delete this;
|
| +}
|
| +
|
| +void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
|
| + GenerateRandomName(port_name);
|
| +}
|
| +
|
| +void NodeController::AllocMessage(size_t num_header_bytes,
|
| + ports::ScopedMessage* message) {
|
| + message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
|
| +}
|
| +
|
| +void NodeController::ForwardMessage(const ports::NodeName& node,
|
| + ports::ScopedMessage message) {
|
| + if (node == name_) {
|
| + // NOTE: We need to avoid re-entering the Node instance within
|
| + // ForwardMessage. Because ForwardMessage is only ever called
|
| + // (synchronously) in response to Node's ClosePort, SendMessage, or
|
| + // AcceptMessage, we flush the queue after calling any of those methods.
|
| + base::AutoLock lock(messages_lock_);
|
| + incoming_messages_.emplace(std::move(message));
|
| + } else {
|
| + SendPeerMessage(node, std::move(message));
|
| + }
|
| +}
|
| +
|
| +void NodeController::PortStatusChanged(const ports::PortRef& port) {
|
| + scoped_refptr<ports::UserData> user_data;
|
| + node_->GetUserData(port, &user_data);
|
| +
|
| + PortObserver* observer = static_cast<PortObserver*>(user_data.get());
|
| + if (observer) {
|
| + observer->OnPortStatusChanged();
|
| + } else {
|
| + DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
|
| + << "doesn't have an observer.";
|
| + }
|
| +}
|
| +
|
| +void NodeController::OnAcceptChild(const ports::NodeName& from_node,
|
| + const ports::NodeName& parent_name,
|
| + const ports::NodeName& token) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + scoped_refptr<NodeChannel> parent;
|
| + {
|
| + base::AutoLock lock(parent_lock_);
|
| + if (!bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) {
|
| + DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
|
| + DropPeer(from_node);
|
| + return;
|
| + }
|
| +
|
| + parent_name_ = parent_name;
|
| + parent = bootstrap_parent_channel_;
|
| + bootstrap_parent_channel_ = nullptr;
|
| + }
|
| +
|
| + parent->AcceptParent(token, name_);
|
| + for (const auto& request : pending_port_requests_) {
|
| + pending_port_connections_.insert(
|
| + std::make_pair(request.local_port.name(), request.callback));
|
| + parent->RequestPortConnection(request.local_port.name(), request.token);
|
| + }
|
| + pending_port_requests_.clear();
|
| +
|
| + DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
|
| +
|
| + AddPeer(parent_name_, parent, false /* start_channel */);
|
| +}
|
| +
|
| +void NodeController::OnAcceptParent(const ports::NodeName& from_node,
|
| + const ports::NodeName& token,
|
| + const ports::NodeName& child_name) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + auto it = pending_children_.find(from_node);
|
| + if (it == pending_children_.end() || token != from_node) {
|
| + DLOG(ERROR) << "Received unexpected AcceptParent message from "
|
| + << from_node;
|
| + DropPeer(from_node);
|
| + return;
|
| + }
|
| +
|
| + scoped_refptr<NodeChannel> channel = it->second;
|
| + pending_children_.erase(it);
|
| +
|
| + DCHECK(channel);
|
| +
|
| + DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
|
| +
|
| + AddPeer(child_name, channel, false /* start_channel */);
|
| +}
|
| +
|
| +void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + void* data;
|
| + size_t num_data_bytes;
|
| + NodeChannel::GetPortsMessageData(
|
| + channel_message.get(), &data, &num_data_bytes);
|
| +
|
| + size_t num_header_bytes, num_payload_bytes, num_ports_bytes;
|
| + ports::Message::Parse(data,
|
| + num_data_bytes,
|
| + &num_header_bytes,
|
| + &num_payload_bytes,
|
| + &num_ports_bytes);
|
| +
|
| + CHECK(channel_message);
|
| + ports::ScopedMessage message(
|
| + new PortsMessage(num_header_bytes,
|
| + num_payload_bytes,
|
| + num_ports_bytes,
|
| + std::move(channel_message)));
|
| +
|
| + node_->AcceptMessage(std::move(message));
|
| + AcceptIncomingMessages();
|
| + AttemptShutdownIfRequested();
|
| +}
|
| +
|
| +void NodeController::OnRequestPortConnection(
|
| + const ports::NodeName& from_node,
|
| + const ports::PortName& connector_port_name,
|
| + const std::string& token) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token "
|
| + << token << " and port " << connector_port_name << "@" << from_node;
|
| +
|
| + ReservePortCallback callback;
|
| + ports::PortRef local_port;
|
| + {
|
| + base::AutoLock lock(reserved_ports_lock_);
|
| + auto it = reserved_ports_.find(token);
|
| + if (it == reserved_ports_.end()) {
|
| + DVLOG(1) << "Ignoring request to connect to port for unknown token "
|
| + << token;
|
| + return;
|
| + }
|
| + local_port = it->second.local_port;
|
| + callback = it->second.callback;
|
| + reserved_ports_.erase(it);
|
| + }
|
| +
|
| + DCHECK(!callback.is_null());
|
| +
|
| + scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node);
|
| + if (!peer) {
|
| + DVLOG(1) << "Ignoring request to connect to port from unknown node "
|
| + << from_node;
|
| + return;
|
| + }
|
| +
|
| + // This reserved port should not have been initialized yet.
|
| + CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node,
|
| + connector_port_name));
|
| +
|
| + peer->ConnectToPort(local_port.name(), connector_port_name);
|
| + callback.Run(local_port);
|
| +}
|
| +
|
| +void NodeController::OnConnectToPort(
|
| + const ports::NodeName& from_node,
|
| + const ports::PortName& connector_port_name,
|
| + const ports::PortName& connectee_port_name) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port "
|
| + << connectee_port_name << " to port " << connector_port_name << "@"
|
| + << from_node;
|
| +
|
| + ports::PortRef connectee_port;
|
| + int rv = node_->GetPort(connectee_port_name, &connectee_port);
|
| + if (rv != ports::OK) {
|
| + DLOG(ERROR) << "Ignoring ConnectToPort for unknown port "
|
| + << connectee_port_name;
|
| + return;
|
| + }
|
| +
|
| + // It's OK if this port has already been initialized. This message is only
|
| + // sent by the remote peer to ensure the port is ready before it starts
|
| + // us sending messages to it.
|
| + ports::PortStatus port_status;
|
| + rv = node_->GetStatus(connectee_port, &port_status);
|
| + if (rv == ports::OK) {
|
| + DVLOG(1) << "Ignoring ConnectToPort for already-initialized port "
|
| + << connectee_port_name;
|
| + return;
|
| + }
|
| +
|
| + CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node,
|
| + connector_port_name));
|
| +
|
| + auto it = pending_port_connections_.find(connectee_port_name);
|
| + DCHECK(it != pending_port_connections_.end());
|
| + it->second.Run();
|
| + pending_port_connections_.erase(it);
|
| +}
|
| +
|
| +void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
|
| + const ports::NodeName& name) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
|
| + if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
|
| + DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
|
| + << from_node;
|
| + DropPeer(from_node);
|
| + return;
|
| + }
|
| +
|
| + if (GetParentChannel() != nullptr) {
|
| + DLOG(ERROR) << "Non-parent node cannot introduce peers to each other.";
|
| + return;
|
| + }
|
| +
|
| + scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
|
| + if (!new_friend) {
|
| + // We don't know who they're talking about!
|
| + requestor->Introduce(name, ScopedPlatformHandle());
|
| + } else {
|
| + PlatformChannelPair new_channel;
|
| + requestor->Introduce(name, new_channel.PassServerHandle());
|
| + new_friend->Introduce(from_node, new_channel.PassClientHandle());
|
| + }
|
| +}
|
| +
|
| +void NodeController::OnIntroduce(const ports::NodeName& from_node,
|
| + const ports::NodeName& name,
|
| + ScopedPlatformHandle channel_handle) {
|
| + DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
|
| +
|
| + {
|
| + base::AutoLock lock(parent_lock_);
|
| + if (from_node != parent_name_) {
|
| + DLOG(ERROR) << "Received unexpected Introduce message from node "
|
| + << from_node;
|
| + DropPeer(from_node);
|
| + return;
|
| + }
|
| + }
|
| +
|
| + if (!channel_handle.is_valid()) {
|
| + DLOG(ERROR) << "Could not be introduced to peer " << name;
|
| + base::AutoLock lock(peers_lock_);
|
| + pending_peer_messages_.erase(name);
|
| + return;
|
| + }
|
| +
|
| + scoped_refptr<NodeChannel> channel =
|
| + NodeChannel::Create(this, std::move(channel_handle), io_task_runner_);
|
| +
|
| + DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
|
| + AddPeer(name, channel, true /* start_channel */);
|
| +}
|
| +
|
| +#if defined(OS_WIN)
|
| +void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
|
| + base::ProcessHandle from_process,
|
| + const ports::NodeName& destination,
|
| + Channel::MessagePtr message) {
|
| + scoped_refptr<NodeChannel> parent = GetParentChannel();
|
| + if (parent) {
|
| + // Only the parent should be asked to relay a message.
|
| + DLOG(ERROR) << "Non-parent refusing to relay message.";
|
| + DropPeer(from_node);
|
| + return;
|
| + }
|
| +
|
| + // The parent should always know which process this came from.
|
| + DCHECK(from_process != base::kNullProcessHandle);
|
| +
|
| + // Duplicate the handles to this (the parent) process. If the message is
|
| + // destined for another child process, the handles will be duplicated to
|
| + // that process before going out (see NodeChannel::WriteChannelMessage).
|
| + //
|
| + // TODO: We could avoid double-duplication.
|
| + for (size_t i = 0; i < message->num_handles(); ++i) {
|
| + BOOL result = DuplicateHandle(
|
| + from_process, message->handles()[i].handle,
|
| + base::GetCurrentProcessHandle(),
|
| + reinterpret_cast<HANDLE*>(message->handles() + i),
|
| + 0, FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
|
| + DCHECK(result);
|
| + }
|
| + if (destination == name_) {
|
| + // Great, we can deliver this message locally.
|
| + OnPortsMessage(std::move(message));
|
| + return;
|
| + }
|
| +
|
| + scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
|
| + if (peer)
|
| + peer->PortsMessage(std::move(message));
|
| + else
|
| + DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
|
| +}
|
| +#endif
|
| +
|
| +void NodeController::OnChannelError(const ports::NodeName& from_node) {
|
| + if (io_task_runner_->RunsTasksOnCurrentThread()) {
|
| + DropPeer(from_node);
|
| + } else {
|
| + io_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&NodeController::DropPeer, base::Unretained(this),
|
| + from_node));
|
| + }
|
| +}
|
| +
|
| +void NodeController::DestroyOnIOThreadShutdown() {
|
| + destroy_on_io_thread_shutdown_ = true;
|
| +}
|
| +
|
| +void NodeController::AttemptShutdownIfRequested() {
|
| + base::Closure callback;
|
| + {
|
| + base::AutoLock lock(shutdown_lock_);
|
| + if (shutdown_callback_.is_null())
|
| + return;
|
| + if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) {
|
| + DVLOG(2) << "Unable to cleanly shut down node " << name_ << ".";
|
| + return;
|
| + }
|
| + callback = shutdown_callback_;
|
| + shutdown_callback_.Reset();
|
| + }
|
| +
|
| + DCHECK(!callback.is_null());
|
| +
|
| + callback.Run();
|
| +}
|
| +
|
| +} // namespace edk
|
| +} // namespace mojo
|
|
|