Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(100)

Unified Diff: mojo/edk/system/node_controller.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/node_controller.cc
diff --git a/mojo/edk/system/node_controller.cc b/mojo/edk/system/node_controller.cc
new file mode 100644
index 0000000000000000000000000000000000000000..030cf47296ed8102d5d73814d451663b53a2bb64
--- /dev/null
+++ b/mojo/edk/system/node_controller.cc
@@ -0,0 +1,771 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/node_controller.h"
+
+#include <algorithm>
+
+#include "base/bind.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/macros.h"
+#include "base/message_loop/message_loop.h"
+#include "crypto/random.h"
+#include "mojo/edk/embedder/embedder_internal.h"
+#include "mojo/edk/embedder/platform_channel_pair.h"
+#include "mojo/edk/embedder/platform_support.h"
+#include "mojo/edk/system/core.h"
+#include "mojo/edk/system/ports_message.h"
+
+namespace mojo {
+namespace edk {
+
+namespace {
+
+template <typename T>
+void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); }
+
+ports::NodeName GetRandomNodeName() {
+ ports::NodeName name;
+ GenerateRandomName(&name);
+ return name;
+}
+
+// Used by NodeController to watch for shutdown. Since no IO can happen once
+// the IO thread is killed, the NodeController can cleanly drop all its peers
+// at that time.
+class ThreadDestructionObserver :
+ public base::MessageLoop::DestructionObserver {
+ public:
+ static void Create(scoped_refptr<base::TaskRunner> task_runner,
+ const base::Closure& callback) {
+ if (task_runner->RunsTasksOnCurrentThread()) {
+ // Owns itself.
+ new ThreadDestructionObserver(callback);
+ } else {
+ task_runner->PostTask(FROM_HERE,
+ base::Bind(&Create, task_runner, callback));
+ }
+ }
+
+ private:
+ explicit ThreadDestructionObserver(const base::Closure& callback)
+ : callback_(callback) {
+ base::MessageLoop::current()->AddDestructionObserver(this);
+ }
+
+ ~ThreadDestructionObserver() override {
+ base::MessageLoop::current()->RemoveDestructionObserver(this);
+ }
+
+ // base::MessageLoop::DestructionObserver:
+ void WillDestroyCurrentMessageLoop() override {
+ callback_.Run();
+ delete this;
+ }
+
+ const base::Closure callback_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
+};
+
+} // namespace
+
+NodeController::PendingPortRequest::PendingPortRequest() {}
+
+NodeController::PendingPortRequest::~PendingPortRequest() {}
+
+NodeController::ReservedPort::ReservedPort() {}
+
+NodeController::ReservedPort::~ReservedPort() {}
+
+NodeController::~NodeController() {}
+
+NodeController::NodeController(Core* core)
+ : core_(core),
+ name_(GetRandomNodeName()),
+ node_(new ports::Node(name_, this)) {
+ DVLOG(1) << "Initializing node " << name_;
+}
+
+void NodeController::SetIOTaskRunner(
+ scoped_refptr<base::TaskRunner> task_runner) {
+ io_task_runner_ = task_runner;
+ ThreadDestructionObserver::Create(
+ io_task_runner_,
+ base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
+}
+
+void NodeController::ConnectToChild(base::ProcessHandle process_handle,
+ ScopedPlatformHandle platform_handle) {
+ io_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NodeController::ConnectToChildOnIOThread,
+ base::Unretained(this),
+ process_handle,
+ base::Passed(&platform_handle)));
+}
+
+void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
+ io_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NodeController::ConnectToParentOnIOThread,
+ base::Unretained(this),
+ base::Passed(&platform_handle)));
+}
+
+void NodeController::SetPortObserver(
+ const ports::PortRef& port,
+ const scoped_refptr<PortObserver>& observer) {
+ node_->SetUserData(port, observer);
+}
+
+void NodeController::ClosePort(const ports::PortRef& port) {
+ SetPortObserver(port, nullptr);
+ int rv = node_->ClosePort(port);
+ DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
+
+ AcceptIncomingMessages();
+}
+
+int NodeController::SendMessage(const ports::PortRef& port,
+ scoped_ptr<PortsMessage>* message) {
+ ports::ScopedMessage ports_message(message->release());
+ int rv = node_->SendMessage(port, &ports_message);
+ if (rv != ports::OK) {
+ DCHECK(ports_message);
+ message->reset(static_cast<PortsMessage*>(ports_message.release()));
+ }
+
+ AcceptIncomingMessages();
+ return rv;
+}
+
+void NodeController::ReservePort(const std::string& token,
+ const ReservePortCallback& callback) {
+ ports::PortRef port;
+ node_->CreateUninitializedPort(&port);
+
+ DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
+ << token;
+
+ base::AutoLock lock(reserved_ports_lock_);
+ ReservedPort reservation;
+ reservation.local_port = port;
+ reservation.callback = callback;
+ reserved_ports_.insert(std::make_pair(token, reservation));
+}
+
+scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
+ size_t num_bytes) {
+ // TODO: Broker through the parent over a sync channel. :(
+ return internal::g_platform_support->CreateSharedBuffer(num_bytes);
+}
+
+void NodeController::ConnectToParentPort(const ports::PortRef& local_port,
+ const std::string& token,
+ const base::Closure& callback) {
+ io_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NodeController::RequestParentPortConnectionOnIOThread,
+ base::Unretained(this), local_port, token, callback));
+}
+
+void NodeController::ConnectReservedPorts(const std::string& token1,
+ const std::string& token2) {
+ ReservedPort port1;
+ ReservedPort port2;
+ {
+ base::AutoLock lock(reserved_ports_lock_);
+ auto it1 = reserved_ports_.find(token1);
+ if (it1 == reserved_ports_.end())
+ return;
+ auto it2 = reserved_ports_.find(token2);
+ if (it2 == reserved_ports_.end())
+ return;
+ port1 = it1->second;
+ port2 = it2->second;
+ reserved_ports_.erase(it1);
+ reserved_ports_.erase(it2);
+ }
+
+ node_->InitializePort(port1.local_port, name_, port2.local_port.name());
+ node_->InitializePort(port2.local_port, name_, port1.local_port.name());
+ port1.callback.Run(port1.local_port);
+ port2.callback.Run(port2.local_port);
+}
+
+void NodeController::RequestShutdown(const base::Closure& callback) {
+ {
+ base::AutoLock lock(shutdown_lock_);
+ shutdown_callback_ = callback;
+ }
+
+ AttemptShutdownIfRequested();
+}
+
+void NodeController::ConnectToChildOnIOThread(
+ base::ProcessHandle process_handle,
+ ScopedPlatformHandle platform_handle) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ scoped_refptr<NodeChannel> channel =
+ NodeChannel::Create(this, std::move(platform_handle), io_task_runner_);
+
+ ports::NodeName token;
+ GenerateRandomName(&token);
+
+ channel->SetRemoteNodeName(token);
+ channel->SetRemoteProcessHandle(process_handle);
+ channel->Start();
+ channel->AcceptChild(name_, token);
+
+ pending_children_.insert(std::make_pair(token, channel));
+}
+
+void NodeController::ConnectToParentOnIOThread(
+ ScopedPlatformHandle platform_handle) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ base::AutoLock lock(parent_lock_);
+ DCHECK(parent_name_ == ports::kInvalidNodeName);
+
+ // At this point we don't know the parent's name, so we can't yet insert it
+ // into our |peers_| map. That will happen as soon as we receive an
+ // AcceptChild message from them.
+ bootstrap_parent_channel_ =
+ NodeChannel::Create(this, std::move(platform_handle), io_task_runner_);
+ bootstrap_parent_channel_->Start();
+}
+
+void NodeController::RequestParentPortConnectionOnIOThread(
+ const ports::PortRef& local_port,
+ const std::string& token,
+ const base::Closure& callback) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ scoped_refptr<NodeChannel> parent = GetParentChannel();
+ if (!parent) {
+ PendingPortRequest request;
+ request.token = token;
+ request.local_port = local_port;
+ request.callback = callback;
+ pending_port_requests_.push_back(request);
+ return;
+ }
+
+ pending_port_connections_.insert(std::make_pair(local_port.name(), callback));
+ parent->RequestPortConnection(local_port.name(), token);
+}
+
+scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
+ const ports::NodeName& name) {
+ base::AutoLock lock(peers_lock_);
+ auto it = peers_.find(name);
+ if (it == peers_.end())
+ return nullptr;
+ return it->second;
+}
+
+scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
+ ports::NodeName parent_name;
+ {
+ base::AutoLock lock(parent_lock_);
+ parent_name = parent_name_;
+ }
+ return GetPeerChannel(parent_name);
+}
+
+void NodeController::AddPeer(const ports::NodeName& name,
+ scoped_refptr<NodeChannel> channel,
+ bool start_channel) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ DCHECK(name != ports::kInvalidNodeName);
+ DCHECK(channel);
+
+ channel->SetRemoteNodeName(name);
+
+ base::AutoLock lock(peers_lock_);
+ if (peers_.find(name) != peers_.end()) {
+ // This can happen normally if two nodes race to be introduced to each
+ // other. The losing pipe will be silently closed and introduction should
+ // not be affected.
+ DVLOG(1) << "Ignoring duplicate peer name " << name;
+ return;
+ }
+
+ auto result = peers_.insert(std::make_pair(name, channel));
+ DCHECK(result.second);
+
+ DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
+
+ if (start_channel)
+ channel->Start();
+
+ // Flush any queued message we need to deliver to this node.
+ OutgoingMessageQueue pending_messages;
+ auto it = pending_peer_messages_.find(name);
+ if (it != pending_peer_messages_.end()) {
+ auto& message_queue = it->second;
+ while (!message_queue.empty()) {
+ ports::ScopedMessage message = std::move(message_queue.front());
+ channel->PortsMessage(
+ static_cast<PortsMessage*>(message.get())->TakeChannelMessage());
+ message_queue.pop();
+ }
+ pending_peer_messages_.erase(it);
+ }
+}
+
+void NodeController::DropPeer(const ports::NodeName& name) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ {
+ base::AutoLock lock(peers_lock_);
+ auto it = peers_.find(name);
+
+ if (it != peers_.end()) {
+ ports::NodeName peer = it->first;
+ peers_.erase(it);
+ DVLOG(1) << "Dropped peer " << peer;
+ }
+
+ pending_peer_messages_.erase(name);
+ pending_children_.erase(name);
+ }
+
+ node_->LostConnectionToNode(name);
+}
+
+void NodeController::SendPeerMessage(const ports::NodeName& name,
+ ports::ScopedMessage message) {
+ PortsMessage* ports_message = static_cast<PortsMessage*>(message.get());
+
+#if defined(OS_WIN)
+ // If we're sending a message with handles and we're not the parent,
+ // relay the message through the parent.
+ if (ports_message->has_handles()) {
+ scoped_refptr<NodeChannel> parent = GetParentChannel();
+ if (parent) {
+ parent->RelayPortsMessage(name, ports_message->TakeChannelMessage());
+ return;
+ }
+ }
+#endif
+
+ scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
+ if (peer) {
+ peer->PortsMessage(ports_message->TakeChannelMessage());
+ return;
+ }
+
+ // If we don't know who the peer is, queue the message for delivery. If this
+ // is the first message queued for the peer, we also ask the parent to
+ // introduce us to them.
+
+ bool needs_introduction = false;
+ {
+ base::AutoLock lock(peers_lock_);
+ auto& queue = pending_peer_messages_[name];
+ needs_introduction = queue.empty();
+ queue.emplace(std::move(message));
+ }
+
+ if (needs_introduction) {
+ scoped_refptr<NodeChannel> parent = GetParentChannel();
+ if (!parent) {
+ DVLOG(1) << "Dropping message for unknown peer: " << name;
+ return;
+ }
+ parent->RequestIntroduction(name);
+ }
+}
+
+void NodeController::AcceptIncomingMessages() {
+ std::queue<ports::ScopedMessage> messages;
+ for (;;) {
+ // TODO: We may need to be more careful to avoid starving the rest of the
+ // thread here. Revisit this if it turns out to be a problem. One
+ // alternative would be to schedule a task to continue pumping messages
+ // after flushing once.
+
+ {
+ base::AutoLock lock(messages_lock_);
+ if (incoming_messages_.empty())
+ break;
+ std::swap(messages, incoming_messages_);
+ }
+
+ while (!messages.empty()) {
+ node_->AcceptMessage(std::move(messages.front()));
+ messages.pop();
+ }
+ }
+ AttemptShutdownIfRequested();
+}
+
+void NodeController::DropAllPeers() {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ {
+ base::AutoLock lock(parent_lock_);
+ if (bootstrap_parent_channel_) {
+ bootstrap_parent_channel_->ShutDown();
+ bootstrap_parent_channel_ = nullptr;
+ }
+ }
+
+ std::vector<scoped_refptr<NodeChannel>> all_peers;
+ {
+ base::AutoLock lock(peers_lock_);
+ for (const auto& peer : peers_)
+ all_peers.push_back(peer.second);
+ for (const auto& peer : pending_children_)
+ all_peers.push_back(peer.second);
+ peers_.clear();
+ pending_children_.clear();
+ pending_peer_messages_.clear();
+ }
+
+ for (const auto& peer : all_peers)
+ peer->ShutDown();
+
+ if (destroy_on_io_thread_shutdown_)
+ delete this;
+}
+
+void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
+ GenerateRandomName(port_name);
+}
+
+void NodeController::AllocMessage(size_t num_header_bytes,
+ ports::ScopedMessage* message) {
+ message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
+}
+
+void NodeController::ForwardMessage(const ports::NodeName& node,
+ ports::ScopedMessage message) {
+ if (node == name_) {
+ // NOTE: We need to avoid re-entering the Node instance within
+ // ForwardMessage. Because ForwardMessage is only ever called
+ // (synchronously) in response to Node's ClosePort, SendMessage, or
+ // AcceptMessage, we flush the queue after calling any of those methods.
+ base::AutoLock lock(messages_lock_);
+ incoming_messages_.emplace(std::move(message));
+ } else {
+ SendPeerMessage(node, std::move(message));
+ }
+}
+
+void NodeController::PortStatusChanged(const ports::PortRef& port) {
+ scoped_refptr<ports::UserData> user_data;
+ node_->GetUserData(port, &user_data);
+
+ PortObserver* observer = static_cast<PortObserver*>(user_data.get());
+ if (observer) {
+ observer->OnPortStatusChanged();
+ } else {
+ DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
+ << "doesn't have an observer.";
+ }
+}
+
+void NodeController::OnAcceptChild(const ports::NodeName& from_node,
+ const ports::NodeName& parent_name,
+ const ports::NodeName& token) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ scoped_refptr<NodeChannel> parent;
+ {
+ base::AutoLock lock(parent_lock_);
+ if (!bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName) {
+ DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
+ DropPeer(from_node);
+ return;
+ }
+
+ parent_name_ = parent_name;
+ parent = bootstrap_parent_channel_;
+ bootstrap_parent_channel_ = nullptr;
+ }
+
+ parent->AcceptParent(token, name_);
+ for (const auto& request : pending_port_requests_) {
+ pending_port_connections_.insert(
+ std::make_pair(request.local_port.name(), request.callback));
+ parent->RequestPortConnection(request.local_port.name(), request.token);
+ }
+ pending_port_requests_.clear();
+
+ DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
+
+ AddPeer(parent_name_, parent, false /* start_channel */);
+}
+
+void NodeController::OnAcceptParent(const ports::NodeName& from_node,
+ const ports::NodeName& token,
+ const ports::NodeName& child_name) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ auto it = pending_children_.find(from_node);
+ if (it == pending_children_.end() || token != from_node) {
+ DLOG(ERROR) << "Received unexpected AcceptParent message from "
+ << from_node;
+ DropPeer(from_node);
+ return;
+ }
+
+ scoped_refptr<NodeChannel> channel = it->second;
+ pending_children_.erase(it);
+
+ DCHECK(channel);
+
+ DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
+
+ AddPeer(child_name, channel, false /* start_channel */);
+}
+
+void NodeController::OnPortsMessage(Channel::MessagePtr channel_message) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ void* data;
+ size_t num_data_bytes;
+ NodeChannel::GetPortsMessageData(
+ channel_message.get(), &data, &num_data_bytes);
+
+ size_t num_header_bytes, num_payload_bytes, num_ports_bytes;
+ ports::Message::Parse(data,
+ num_data_bytes,
+ &num_header_bytes,
+ &num_payload_bytes,
+ &num_ports_bytes);
+
+ CHECK(channel_message);
+ ports::ScopedMessage message(
+ new PortsMessage(num_header_bytes,
+ num_payload_bytes,
+ num_ports_bytes,
+ std::move(channel_message)));
+
+ node_->AcceptMessage(std::move(message));
+ AcceptIncomingMessages();
+ AttemptShutdownIfRequested();
+}
+
+void NodeController::OnRequestPortConnection(
+ const ports::NodeName& from_node,
+ const ports::PortName& connector_port_name,
+ const std::string& token) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ DVLOG(2) << "Node " << name_ << " received RequestPortConnection for token "
+ << token << " and port " << connector_port_name << "@" << from_node;
+
+ ReservePortCallback callback;
+ ports::PortRef local_port;
+ {
+ base::AutoLock lock(reserved_ports_lock_);
+ auto it = reserved_ports_.find(token);
+ if (it == reserved_ports_.end()) {
+ DVLOG(1) << "Ignoring request to connect to port for unknown token "
+ << token;
+ return;
+ }
+ local_port = it->second.local_port;
+ callback = it->second.callback;
+ reserved_ports_.erase(it);
+ }
+
+ DCHECK(!callback.is_null());
+
+ scoped_refptr<NodeChannel> peer = GetPeerChannel(from_node);
+ if (!peer) {
+ DVLOG(1) << "Ignoring request to connect to port from unknown node "
+ << from_node;
+ return;
+ }
+
+ // This reserved port should not have been initialized yet.
+ CHECK_EQ(ports::OK, node_->InitializePort(local_port, from_node,
+ connector_port_name));
+
+ peer->ConnectToPort(local_port.name(), connector_port_name);
+ callback.Run(local_port);
+}
+
+void NodeController::OnConnectToPort(
+ const ports::NodeName& from_node,
+ const ports::PortName& connector_port_name,
+ const ports::PortName& connectee_port_name) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ DVLOG(2) << "Node " << name_ << " received ConnectToPort for local port "
+ << connectee_port_name << " to port " << connector_port_name << "@"
+ << from_node;
+
+ ports::PortRef connectee_port;
+ int rv = node_->GetPort(connectee_port_name, &connectee_port);
+ if (rv != ports::OK) {
+ DLOG(ERROR) << "Ignoring ConnectToPort for unknown port "
+ << connectee_port_name;
+ return;
+ }
+
+ // It's OK if this port has already been initialized. This message is only
+ // sent by the remote peer to ensure the port is ready before it starts
+ // us sending messages to it.
+ ports::PortStatus port_status;
+ rv = node_->GetStatus(connectee_port, &port_status);
+ if (rv == ports::OK) {
+ DVLOG(1) << "Ignoring ConnectToPort for already-initialized port "
+ << connectee_port_name;
+ return;
+ }
+
+ CHECK_EQ(ports::OK, node_->InitializePort(connectee_port, from_node,
+ connector_port_name));
+
+ auto it = pending_port_connections_.find(connectee_port_name);
+ DCHECK(it != pending_port_connections_.end());
+ it->second.Run();
+ pending_port_connections_.erase(it);
+}
+
+void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
+ const ports::NodeName& name) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
+ if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
+ DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
+ << from_node;
+ DropPeer(from_node);
+ return;
+ }
+
+ if (GetParentChannel() != nullptr) {
+ DLOG(ERROR) << "Non-parent node cannot introduce peers to each other.";
+ return;
+ }
+
+ scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
+ if (!new_friend) {
+ // We don't know who they're talking about!
+ requestor->Introduce(name, ScopedPlatformHandle());
+ } else {
+ PlatformChannelPair new_channel;
+ requestor->Introduce(name, new_channel.PassServerHandle());
+ new_friend->Introduce(from_node, new_channel.PassClientHandle());
+ }
+}
+
+void NodeController::OnIntroduce(const ports::NodeName& from_node,
+ const ports::NodeName& name,
+ ScopedPlatformHandle channel_handle) {
+ DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
+
+ {
+ base::AutoLock lock(parent_lock_);
+ if (from_node != parent_name_) {
+ DLOG(ERROR) << "Received unexpected Introduce message from node "
+ << from_node;
+ DropPeer(from_node);
+ return;
+ }
+ }
+
+ if (!channel_handle.is_valid()) {
+ DLOG(ERROR) << "Could not be introduced to peer " << name;
+ base::AutoLock lock(peers_lock_);
+ pending_peer_messages_.erase(name);
+ return;
+ }
+
+ scoped_refptr<NodeChannel> channel =
+ NodeChannel::Create(this, std::move(channel_handle), io_task_runner_);
+
+ DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
+ AddPeer(name, channel, true /* start_channel */);
+}
+
+#if defined(OS_WIN)
+void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
+ base::ProcessHandle from_process,
+ const ports::NodeName& destination,
+ Channel::MessagePtr message) {
+ scoped_refptr<NodeChannel> parent = GetParentChannel();
+ if (parent) {
+ // Only the parent should be asked to relay a message.
+ DLOG(ERROR) << "Non-parent refusing to relay message.";
+ DropPeer(from_node);
+ return;
+ }
+
+ // The parent should always know which process this came from.
+ DCHECK(from_process != base::kNullProcessHandle);
+
+ // Duplicate the handles to this (the parent) process. If the message is
+ // destined for another child process, the handles will be duplicated to
+ // that process before going out (see NodeChannel::WriteChannelMessage).
+ //
+ // TODO: We could avoid double-duplication.
+ for (size_t i = 0; i < message->num_handles(); ++i) {
+ BOOL result = DuplicateHandle(
+ from_process, message->handles()[i].handle,
+ base::GetCurrentProcessHandle(),
+ reinterpret_cast<HANDLE*>(message->handles() + i),
+ 0, FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
+ DCHECK(result);
+ }
+ if (destination == name_) {
+ // Great, we can deliver this message locally.
+ OnPortsMessage(std::move(message));
+ return;
+ }
+
+ scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
+ if (peer)
+ peer->PortsMessage(std::move(message));
+ else
+ DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
+}
+#endif
+
+void NodeController::OnChannelError(const ports::NodeName& from_node) {
+ if (io_task_runner_->RunsTasksOnCurrentThread()) {
+ DropPeer(from_node);
+ } else {
+ io_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NodeController::DropPeer, base::Unretained(this),
+ from_node));
+ }
+}
+
+void NodeController::DestroyOnIOThreadShutdown() {
+ destroy_on_io_thread_shutdown_ = true;
+}
+
+void NodeController::AttemptShutdownIfRequested() {
+ base::Closure callback;
+ {
+ base::AutoLock lock(shutdown_lock_);
+ if (shutdown_callback_.is_null())
+ return;
+ if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) {
+ DVLOG(2) << "Unable to cleanly shut down node " << name_ << ".";
+ return;
+ }
+ callback = shutdown_callback_;
+ shutdown_callback_.Reset();
+ }
+
+ DCHECK(!callback.is_null());
+
+ callback.Run();
+}
+
+} // namespace edk
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698