Index: mojo/edk/system/node_controller.h |
diff --git a/mojo/edk/system/node_controller.h b/mojo/edk/system/node_controller.h |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f9cb326b008198f8f7c921db838256f7b0bb2b1c |
--- /dev/null |
+++ b/mojo/edk/system/node_controller.h |
@@ -0,0 +1,280 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#ifndef MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ |
+#define MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ |
+ |
+#include <queue> |
+#include <unordered_map> |
+#include <unordered_set> |
+#include <vector> |
+ |
+#include "base/callback.h" |
+#include "base/containers/hash_tables.h" |
+#include "base/macros.h" |
+#include "base/memory/ref_counted.h" |
+#include "base/memory/scoped_ptr.h" |
+#include "base/task_runner.h" |
+#include "mojo/edk/embedder/platform_handle_vector.h" |
+#include "mojo/edk/embedder/platform_shared_buffer.h" |
+#include "mojo/edk/embedder/scoped_platform_handle.h" |
+#include "mojo/edk/system/node_channel.h" |
+#include "mojo/edk/system/ports/hash_functions.h" |
+#include "mojo/edk/system/ports/name.h" |
+#include "mojo/edk/system/ports/node.h" |
+#include "mojo/edk/system/ports/node_delegate.h" |
+ |
+namespace mojo { |
+namespace edk { |
+ |
+class Core; |
+class PortsMessage; |
+ |
+// The owner of ports::Node which facilitates core EDK implementation. All |
+// public interface methods are safe to call from any thread. |
+class NodeController : public ports::NodeDelegate, |
+ public NodeChannel::Delegate { |
+ public: |
+ class PortObserver : public ports::UserData { |
+ public: |
+ virtual void OnPortStatusChanged() = 0; |
+ |
+ protected: |
+ ~PortObserver() override {} |
+ }; |
+ |
+ // |core| owns and out-lives us. |
+ explicit NodeController(Core* core); |
+ ~NodeController() override; |
+ |
+ const ports::NodeName& name() const { return name_; } |
+ Core* core() const { return core_; } |
+ ports::Node* node() const { return node_.get(); } |
+ scoped_refptr<base::TaskRunner> io_task_runner() const { |
+ return io_task_runner_; |
+ } |
+ |
+ // Called exactly once, shortly after construction, and before any other |
+ // methods are called on this object. |
+ void SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner); |
+ |
+ // Connects this node to a child node. This node will initiate a handshake. |
+ void ConnectToChild(base::ProcessHandle process_handle, |
+ ScopedPlatformHandle platform_handle); |
+ |
+ // Connects this node to a parent node. The parent node will initiate a |
+ // handshake. |
+ void ConnectToParent(ScopedPlatformHandle platform_handle); |
+ |
+ // Sets a port's observer. If |observer| is null the port's current observer |
+ // is removed. |
+ void SetPortObserver(const ports::PortRef& port, |
+ const scoped_refptr<PortObserver>& observer); |
+ |
+ // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as |
+ // it ensures the port's observer has also been removed. |
+ void ClosePort(const ports::PortRef& port); |
+ |
+ // Sends a message on a port to its peer. If message send fails, |message| |
+ // is left intact. Otherwise ownership is transferred and it's reset. |
+ int SendMessage(const ports::PortRef& port_ref, |
+ scoped_ptr<PortsMessage>* message); |
+ |
+ using ReservePortCallback = base::Callback<void(const ports::PortRef& port)>; |
+ |
+ // Reserves a port associated with |token|. A peer may associate one of their |
+ // own ports with this one by sending us a RequestPortConnection message with |
+ // the same token value. |
+ // |
+ // Note that the reservation is made synchronously. In order to avoid races, |
+ // reservations should be acquired before |token| is communicated to any |
+ // potential peer. |
+ // |
+ // |callback| must be runnable on any thread and will be run with a reference |
+ // to the new local port once connected. |
+ void ReservePort(const std::string& token, |
+ const ReservePortCallback& callback); |
+ |
+ // Eventually initializes a local port with a parent port peer identified by |
+ // |token|. The parent should also have |token| and should alrady have |
+ // reserved a port for it. |callback| must be runnable on any thread and will |
+ // be run if and when the local port is connected. |
+ void ConnectToParentPort(const ports::PortRef& local_port, |
+ const std::string& token, |
+ const base::Closure& callback); |
+ |
+ // Connects two reserved ports to each other. Useful when two independent |
+ // systems in the same (parent) process need to establish a port pair without |
+ // any direct knowledge of each other. |
+ void ConnectReservedPorts(const std::string& token1, |
+ const std::string& token2); |
+ |
+ // Creates a new shared buffer for use in the current process. |
+ scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes); |
+ |
+ // Request that the Node be shut down cleanly. This may take an arbitrarily |
+ // long time to complete, at which point |callback| will be called. |
+ // |
+ // Note that while it is safe to continue using the NodeController's public |
+ // interface after requesting shutdown, you do so at your own risk and there |
+ // is NO guarantee that new messages will be sent or ports will complete |
+ // transfer. |
+ void RequestShutdown(const base::Closure& callback); |
+ |
+ private: |
+ friend Core; |
+ |
+ using NodeMap = std::unordered_map<ports::NodeName, |
+ scoped_refptr<NodeChannel>>; |
+ using OutgoingMessageQueue = std::queue<ports::ScopedMessage>; |
+ |
+ // Tracks a pending token-based connection to a parent port. |
+ struct PendingPortRequest { |
+ PendingPortRequest(); |
+ ~PendingPortRequest(); |
+ |
+ std::string token; |
+ ports::PortRef local_port; |
+ base::Closure callback; |
+ }; |
+ |
+ // Tracks a reserved port. |
+ struct ReservedPort { |
+ ReservedPort(); |
+ ~ReservedPort(); |
+ |
+ ports::PortRef local_port; |
+ ReservePortCallback callback; |
+ }; |
+ |
+ void ConnectToChildOnIOThread(base::ProcessHandle process_handle, |
+ ScopedPlatformHandle platform_handle); |
+ void ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle); |
+ void RequestParentPortConnectionOnIOThread(const ports::PortRef& local_port, |
+ const std::string& token, |
+ const base::Closure& callback); |
+ |
+ scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name); |
+ scoped_refptr<NodeChannel> GetParentChannel(); |
+ |
+ void AddPeer(const ports::NodeName& name, |
+ scoped_refptr<NodeChannel> channel, |
+ bool start_channel); |
+ void DropPeer(const ports::NodeName& name); |
+ void SendPeerMessage(const ports::NodeName& name, |
+ ports::ScopedMessage message); |
+ void AcceptIncomingMessages(); |
+ void DropAllPeers(); |
+ |
+ // ports::NodeDelegate: |
+ void GenerateRandomPortName(ports::PortName* port_name) override; |
+ void AllocMessage(size_t num_header_bytes, |
+ ports::ScopedMessage* message) override; |
+ void ForwardMessage(const ports::NodeName& node, |
+ ports::ScopedMessage message) override; |
+ void PortStatusChanged(const ports::PortRef& port) override; |
+ |
+ // NodeChannel::Delegate: |
+ void OnAcceptChild(const ports::NodeName& from_node, |
+ const ports::NodeName& parent_name, |
+ const ports::NodeName& token) override; |
+ void OnAcceptParent(const ports::NodeName& from_node, |
+ const ports::NodeName& token, |
+ const ports::NodeName& child_name) override; |
+ void OnPortsMessage(Channel::MessagePtr message) override; |
+ void OnRequestPortConnection(const ports::NodeName& from_node, |
+ const ports::PortName& connector_port_name, |
+ const std::string& token) override; |
+ void OnConnectToPort(const ports::NodeName& from_node, |
+ const ports::PortName& connector_port_name, |
+ const ports::PortName& connectee_port_name) override; |
+ void OnRequestIntroduction(const ports::NodeName& from_node, |
+ const ports::NodeName& name) override; |
+ void OnIntroduce(const ports::NodeName& from_node, |
+ const ports::NodeName& name, |
+ ScopedPlatformHandle channel_handle) override; |
+#if defined(OS_WIN) |
+ void OnRelayPortsMessage(const ports::NodeName& from_node, |
+ base::ProcessHandle from_process, |
+ const ports::NodeName& destination, |
+ Channel::MessagePtr message) override; |
+#endif |
+ void OnChannelError(const ports::NodeName& from_node) override; |
+ |
+ // Marks this NodeController for destruction when the IO thread shuts down. |
+ // This is used in case Core is torn down before the IO thread. Must only be |
+ // called on the IO thread. |
+ void DestroyOnIOThreadShutdown(); |
+ |
+ // If there is a registered shutdown callback (meaning shutdown has been |
+ // requested, this checks the Node's status to see if clean shutdown is |
+ // possible. If so, shutdown is performed and the shutdown callback is run. |
+ void AttemptShutdownIfRequested(); |
+ |
+ // These are safe to access from any thread as long as the Node is alive. |
+ Core* const core_; |
+ const ports::NodeName name_; |
+ const scoped_ptr<ports::Node> node_; |
+ scoped_refptr<base::TaskRunner> io_task_runner_; |
+ |
+ // Guards |peers_| and |pending_peer_messages_|. |
+ base::Lock peers_lock_; |
+ |
+ // Channels to known peers, including parent and children, if any. |
+ NodeMap peers_; |
+ |
+ // Outgoing message queues for peers we've heard of but can't yet talk to. |
+ std::unordered_map<ports::NodeName, OutgoingMessageQueue> |
+ pending_peer_messages_; |
+ |
+ // Guards |reserved_ports_|. |
+ base::Lock reserved_ports_lock_; |
+ |
+ // Ports reserved by token. |
+ base::hash_map<std::string, ReservedPort> reserved_ports_; |
+ |
+ // Guards |parent_name_| and |bootstrap_parent_channel_|. |
+ base::Lock parent_lock_; |
+ |
+ // The name of our parent node, if any. |
+ ports::NodeName parent_name_; |
+ |
+ // A temporary reference to the parent channel before we know their name. |
+ scoped_refptr<NodeChannel> bootstrap_parent_channel_; |
+ |
+ // Guards |incoming_messages_|. |
+ base::Lock messages_lock_; |
+ std::queue<ports::ScopedMessage> incoming_messages_; |
+ |
+ // Guards |shutdown_callback_|. |
+ base::Lock shutdown_lock_; |
+ |
+ // Set by RequestShutdown(). If this is non-null, the controller will |
+ // begin polling the Node to see if clean shutdown is possible any time the |
+ // Node's state is modified by the controller. |
+ base::Closure shutdown_callback_; |
+ |
+ // All other fields below must only be accessed on the I/O thread, i.e., the |
+ // thread on which core_->io_task_runner() runs tasks. |
+ |
+ // Channels to children during handshake. |
+ NodeMap pending_children_; |
+ |
+ // Port connection requests which have been deferred until we have a parent. |
+ std::vector<PendingPortRequest> pending_port_requests_; |
+ |
+ // Port connection requests awaiting a response from the parent. |
+ std::unordered_map<ports::PortName, base::Closure> pending_port_connections_; |
+ |
+ // Indicates whether this object should delete itself on IO thread shutdown. |
+ // Must only be accessed from the IO thread. |
+ bool destroy_on_io_thread_shutdown_ = false; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(NodeController); |
+}; |
+ |
+} // namespace edk |
+} // namespace mojo |
+ |
+#endif // MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_ |