Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(140)

Side by Side Diff: mojo/edk/system/node_controller.h

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #ifndef MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
6 #define MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
7
8 #include <queue>
9 #include <unordered_map>
10 #include <unordered_set>
11 #include <vector>
12
13 #include "base/callback.h"
14 #include "base/containers/hash_tables.h"
15 #include "base/macros.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/scoped_ptr.h"
18 #include "base/task_runner.h"
19 #include "mojo/edk/embedder/platform_handle_vector.h"
20 #include "mojo/edk/embedder/platform_shared_buffer.h"
21 #include "mojo/edk/embedder/scoped_platform_handle.h"
22 #include "mojo/edk/system/node_channel.h"
23 #include "mojo/edk/system/ports/hash_functions.h"
24 #include "mojo/edk/system/ports/name.h"
25 #include "mojo/edk/system/ports/node.h"
26 #include "mojo/edk/system/ports/node_delegate.h"
27
28 namespace mojo {
29 namespace edk {
30
31 class Core;
32 class PortsMessage;
33
34 // The owner of ports::Node which facilitates core EDK implementation. All
35 // public interface methods are safe to call from any thread.
36 class NodeController : public ports::NodeDelegate,
37 public NodeChannel::Delegate {
38 public:
39 class PortObserver : public ports::UserData {
40 public:
41 virtual void OnPortStatusChanged() = 0;
42
43 protected:
44 ~PortObserver() override {}
45 };
46
47 // |core| owns and out-lives us.
48 explicit NodeController(Core* core);
49 ~NodeController() override;
50
51 const ports::NodeName& name() const { return name_; }
52 Core* core() const { return core_; }
53 ports::Node* node() const { return node_.get(); }
54 scoped_refptr<base::TaskRunner> io_task_runner() const {
55 return io_task_runner_;
56 }
57
58 // Called exactly once, shortly after construction, and before any other
59 // methods are called on this object.
60 void SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner);
61
62 // Connects this node to a child node. This node will initiate a handshake.
63 void ConnectToChild(base::ProcessHandle process_handle,
64 ScopedPlatformHandle platform_handle);
65
66 // Connects this node to a parent node. The parent node will initiate a
67 // handshake.
68 void ConnectToParent(ScopedPlatformHandle platform_handle);
69
70 // Sets a port's observer. If |observer| is null the port's current observer
71 // is removed.
72 void SetPortObserver(const ports::PortRef& port,
73 const scoped_refptr<PortObserver>& observer);
74
75 // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as
76 // it ensures the port's observer has also been removed.
77 void ClosePort(const ports::PortRef& port);
78
79 // Sends a message on a port to its peer. If message send fails, |message|
80 // is left intact. Otherwise ownership is transferred and it's reset.
81 int SendMessage(const ports::PortRef& port_ref,
82 scoped_ptr<PortsMessage>* message);
83
84 using ReservePortCallback = base::Callback<void(const ports::PortRef& port)>;
85
86 // Reserves a port associated with |token|. A peer may associate one of their
87 // own ports with this one by sending us a RequestPortConnection message with
88 // the same token value.
89 //
90 // Note that the reservation is made synchronously. In order to avoid races,
91 // reservations should be acquired before |token| is communicated to any
92 // potential peer.
93 //
94 // |callback| must be runnable on any thread and will be run with a reference
95 // to the new local port once connected.
96 void ReservePort(const std::string& token,
97 const ReservePortCallback& callback);
98
99 // Eventually initializes a local port with a parent port peer identified by
100 // |token|. The parent should also have |token| and should alrady have
101 // reserved a port for it. |callback| must be runnable on any thread and will
102 // be run if and when the local port is connected.
103 void ConnectToParentPort(const ports::PortRef& local_port,
104 const std::string& token,
105 const base::Closure& callback);
106
107 // Connects two reserved ports to each other. Useful when two independent
108 // systems in the same (parent) process need to establish a port pair without
109 // any direct knowledge of each other.
110 void ConnectReservedPorts(const std::string& token1,
111 const std::string& token2);
112
113 // Creates a new shared buffer for use in the current process.
114 scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes);
115
116 // Request that the Node be shut down cleanly. This may take an arbitrarily
117 // long time to complete, at which point |callback| will be called.
118 //
119 // Note that while it is safe to continue using the NodeController's public
120 // interface after requesting shutdown, you do so at your own risk and there
121 // is NO guarantee that new messages will be sent or ports will complete
122 // transfer.
123 void RequestShutdown(const base::Closure& callback);
124
125 private:
126 friend Core;
127
128 using NodeMap = std::unordered_map<ports::NodeName,
129 scoped_refptr<NodeChannel>>;
130 using OutgoingMessageQueue = std::queue<ports::ScopedMessage>;
131
132 // Tracks a pending token-based connection to a parent port.
133 struct PendingPortRequest {
134 PendingPortRequest();
135 ~PendingPortRequest();
136
137 std::string token;
138 ports::PortRef local_port;
139 base::Closure callback;
140 };
141
142 // Tracks a reserved port.
143 struct ReservedPort {
144 ReservedPort();
145 ~ReservedPort();
146
147 ports::PortRef local_port;
148 ReservePortCallback callback;
149 };
150
151 void ConnectToChildOnIOThread(base::ProcessHandle process_handle,
152 ScopedPlatformHandle platform_handle);
153 void ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle);
154 void RequestParentPortConnectionOnIOThread(const ports::PortRef& local_port,
155 const std::string& token,
156 const base::Closure& callback);
157
158 scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
159 scoped_refptr<NodeChannel> GetParentChannel();
160
161 void AddPeer(const ports::NodeName& name,
162 scoped_refptr<NodeChannel> channel,
163 bool start_channel);
164 void DropPeer(const ports::NodeName& name);
165 void SendPeerMessage(const ports::NodeName& name,
166 ports::ScopedMessage message);
167 void AcceptIncomingMessages();
168 void DropAllPeers();
169
170 // ports::NodeDelegate:
171 void GenerateRandomPortName(ports::PortName* port_name) override;
172 void AllocMessage(size_t num_header_bytes,
173 ports::ScopedMessage* message) override;
174 void ForwardMessage(const ports::NodeName& node,
175 ports::ScopedMessage message) override;
176 void PortStatusChanged(const ports::PortRef& port) override;
177
178 // NodeChannel::Delegate:
179 void OnAcceptChild(const ports::NodeName& from_node,
180 const ports::NodeName& parent_name,
181 const ports::NodeName& token) override;
182 void OnAcceptParent(const ports::NodeName& from_node,
183 const ports::NodeName& token,
184 const ports::NodeName& child_name) override;
185 void OnPortsMessage(Channel::MessagePtr message) override;
186 void OnRequestPortConnection(const ports::NodeName& from_node,
187 const ports::PortName& connector_port_name,
188 const std::string& token) override;
189 void OnConnectToPort(const ports::NodeName& from_node,
190 const ports::PortName& connector_port_name,
191 const ports::PortName& connectee_port_name) override;
192 void OnRequestIntroduction(const ports::NodeName& from_node,
193 const ports::NodeName& name) override;
194 void OnIntroduce(const ports::NodeName& from_node,
195 const ports::NodeName& name,
196 ScopedPlatformHandle channel_handle) override;
197 #if defined(OS_WIN)
198 void OnRelayPortsMessage(const ports::NodeName& from_node,
199 base::ProcessHandle from_process,
200 const ports::NodeName& destination,
201 Channel::MessagePtr message) override;
202 #endif
203 void OnChannelError(const ports::NodeName& from_node) override;
204
205 // Marks this NodeController for destruction when the IO thread shuts down.
206 // This is used in case Core is torn down before the IO thread. Must only be
207 // called on the IO thread.
208 void DestroyOnIOThreadShutdown();
209
210 // If there is a registered shutdown callback (meaning shutdown has been
211 // requested, this checks the Node's status to see if clean shutdown is
212 // possible. If so, shutdown is performed and the shutdown callback is run.
213 void AttemptShutdownIfRequested();
214
215 // These are safe to access from any thread as long as the Node is alive.
216 Core* const core_;
217 const ports::NodeName name_;
218 const scoped_ptr<ports::Node> node_;
219 scoped_refptr<base::TaskRunner> io_task_runner_;
220
221 // Guards |peers_| and |pending_peer_messages_|.
222 base::Lock peers_lock_;
223
224 // Channels to known peers, including parent and children, if any.
225 NodeMap peers_;
226
227 // Outgoing message queues for peers we've heard of but can't yet talk to.
228 std::unordered_map<ports::NodeName, OutgoingMessageQueue>
229 pending_peer_messages_;
230
231 // Guards |reserved_ports_|.
232 base::Lock reserved_ports_lock_;
233
234 // Ports reserved by token.
235 base::hash_map<std::string, ReservedPort> reserved_ports_;
236
237 // Guards |parent_name_| and |bootstrap_parent_channel_|.
238 base::Lock parent_lock_;
239
240 // The name of our parent node, if any.
241 ports::NodeName parent_name_;
242
243 // A temporary reference to the parent channel before we know their name.
244 scoped_refptr<NodeChannel> bootstrap_parent_channel_;
245
246 // Guards |incoming_messages_|.
247 base::Lock messages_lock_;
248 std::queue<ports::ScopedMessage> incoming_messages_;
249
250 // Guards |shutdown_callback_|.
251 base::Lock shutdown_lock_;
252
253 // Set by RequestShutdown(). If this is non-null, the controller will
254 // begin polling the Node to see if clean shutdown is possible any time the
255 // Node's state is modified by the controller.
256 base::Closure shutdown_callback_;
257
258 // All other fields below must only be accessed on the I/O thread, i.e., the
259 // thread on which core_->io_task_runner() runs tasks.
260
261 // Channels to children during handshake.
262 NodeMap pending_children_;
263
264 // Port connection requests which have been deferred until we have a parent.
265 std::vector<PendingPortRequest> pending_port_requests_;
266
267 // Port connection requests awaiting a response from the parent.
268 std::unordered_map<ports::PortName, base::Closure> pending_port_connections_;
269
270 // Indicates whether this object should delete itself on IO thread shutdown.
271 // Must only be accessed from the IO thread.
272 bool destroy_on_io_thread_shutdown_ = false;
273
274 DISALLOW_COPY_AND_ASSIGN(NodeController);
275 };
276
277 } // namespace edk
278 } // namespace mojo
279
280 #endif // MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698