Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.h

Issue 2608163003: Change single-interface mojo bindings to use SequencedTaskRunner. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_ 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
6 #define MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_ 6 #define MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
7 7
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <deque> 10 #include <deque>
11 #include <map> 11 #include <map>
12 #include <memory> 12 #include <memory>
13 #include <string> 13 #include <string>
14 14
15 #include "base/compiler_specific.h" 15 #include "base/compiler_specific.h"
16 #include "base/logging.h" 16 #include "base/logging.h"
17 #include "base/macros.h" 17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h" 18 #include "base/memory/ref_counted.h"
19 #include "base/memory/weak_ptr.h" 19 #include "base/memory/weak_ptr.h"
20 #include "base/single_thread_task_runner.h" 20 #include "base/sequence_checker.h"
21 #include "base/sequenced_task_runner.h"
21 #include "base/synchronization/lock.h" 22 #include "base/synchronization/lock.h"
22 #include "base/threading/thread_checker.h"
23 #include "mojo/public/cpp/bindings/associated_group_controller.h" 23 #include "mojo/public/cpp/bindings/associated_group_controller.h"
24 #include "mojo/public/cpp/bindings/bindings_export.h" 24 #include "mojo/public/cpp/bindings/bindings_export.h"
25 #include "mojo/public/cpp/bindings/connector.h" 25 #include "mojo/public/cpp/bindings/connector.h"
26 #include "mojo/public/cpp/bindings/filter_chain.h" 26 #include "mojo/public/cpp/bindings/filter_chain.h"
27 #include "mojo/public/cpp/bindings/interface_id.h" 27 #include "mojo/public/cpp/bindings/interface_id.h"
28 #include "mojo/public/cpp/bindings/message_header_validator.h" 28 #include "mojo/public/cpp/bindings/message_header_validator.h"
29 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" 29 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
30 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" 30 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
31 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" 31 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
32 #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h" 32 #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
33 33
34 namespace base { 34 namespace base {
35 class SingleThreadTaskRunner; 35 class SequencedTaskRunner;
36 } 36 }
37 37
38 namespace mojo { 38 namespace mojo {
39 39
40 namespace internal { 40 namespace internal {
41 41
42 // MultiplexRouter supports routing messages for multiple interfaces over a 42 // MultiplexRouter supports routing messages for multiple interfaces over a
43 // single message pipe. 43 // single message pipe.
44 // 44 //
45 // It is created on the thread where the master interface of the message pipe 45 // It is created on the thread where the master interface of the message pipe
(...skipping 22 matching lines...) Expand all
68 SINGLE_INTERFACE_WITH_SYNC_METHODS, 68 SINGLE_INTERFACE_WITH_SYNC_METHODS,
69 // There may be associated interfaces running on this router. 69 // There may be associated interfaces running on this router.
70 MULTI_INTERFACE 70 MULTI_INTERFACE
71 }; 71 };
72 72
73 // If |set_interface_id_namespace_bit| is true, the interface IDs generated by 73 // If |set_interface_id_namespace_bit| is true, the interface IDs generated by
74 // this router will have the highest bit set. 74 // this router will have the highest bit set.
75 MultiplexRouter(ScopedMessagePipeHandle message_pipe, 75 MultiplexRouter(ScopedMessagePipeHandle message_pipe,
76 Config config, 76 Config config,
77 bool set_interface_id_namespace_bit, 77 bool set_interface_id_namespace_bit,
78 scoped_refptr<base::SingleThreadTaskRunner> runner); 78 scoped_refptr<base::SequencedTaskRunner> runner);
79 79
80 // Sets the master interface name for this router. Only used when reporting 80 // Sets the master interface name for this router. Only used when reporting
81 // message header or control message validation errors. 81 // message header or control message validation errors.
82 // |name| must be a string literal. 82 // |name| must be a string literal.
83 void SetMasterInterfaceName(const char* name); 83 void SetMasterInterfaceName(const char* name);
84 84
85 // --------------------------------------------------------------------------- 85 // ---------------------------------------------------------------------------
86 // The following public methods are safe to call from any threads. 86 // The following public methods are safe to call from any threads.
87 87
88 // AssociatedGroupController implementation: 88 // AssociatedGroupController implementation:
89 void CreateEndpointHandlePair( 89 void CreateEndpointHandlePair(
90 ScopedInterfaceEndpointHandle* local_endpoint, 90 ScopedInterfaceEndpointHandle* local_endpoint,
91 ScopedInterfaceEndpointHandle* remote_endpoint) override; 91 ScopedInterfaceEndpointHandle* remote_endpoint) override;
92 ScopedInterfaceEndpointHandle CreateLocalEndpointHandle( 92 ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
93 InterfaceId id) override; 93 InterfaceId id) override;
94 void CloseEndpointHandle( 94 void CloseEndpointHandle(
95 InterfaceId id, 95 InterfaceId id,
96 bool is_local, 96 bool is_local,
97 const base::Optional<DisconnectReason>& reason) override; 97 const base::Optional<DisconnectReason>& reason) override;
98 InterfaceEndpointController* AttachEndpointClient( 98 InterfaceEndpointController* AttachEndpointClient(
99 const ScopedInterfaceEndpointHandle& handle, 99 const ScopedInterfaceEndpointHandle& handle,
100 InterfaceEndpointClient* endpoint_client, 100 InterfaceEndpointClient* endpoint_client,
101 scoped_refptr<base::SingleThreadTaskRunner> runner) override; 101 scoped_refptr<base::SequencedTaskRunner> runner) override;
102 void DetachEndpointClient( 102 void DetachEndpointClient(
103 const ScopedInterfaceEndpointHandle& handle) override; 103 const ScopedInterfaceEndpointHandle& handle) override;
104 void RaiseError() override; 104 void RaiseError() override;
105 105
106 // --------------------------------------------------------------------------- 106 // ---------------------------------------------------------------------------
107 // The following public methods are called on the creating thread. 107 // The following public methods are called on the creating thread.
108 108
109 // Please note that this method shouldn't be called unless it results from an 109 // Please note that this method shouldn't be called unless it results from an
110 // explicit request of the user of bindings (e.g., the user sets an 110 // explicit request of the user of bindings (e.g., the user sets an
111 // InterfacePtr to null or closes a Binding). 111 // InterfacePtr to null or closes a Binding).
112 void CloseMessagePipe(); 112 void CloseMessagePipe();
113 113
114 // Extracts the underlying message pipe. 114 // Extracts the underlying message pipe.
115 ScopedMessagePipeHandle PassMessagePipe() { 115 ScopedMessagePipeHandle PassMessagePipe() {
116 DCHECK(thread_checker_.CalledOnValidThread()); 116 DCHECK(sequence_checker_.CalledOnValidSequence());
117 DCHECK(!HasAssociatedEndpoints()); 117 DCHECK(!HasAssociatedEndpoints());
118 return connector_.PassMessagePipe(); 118 return connector_.PassMessagePipe();
119 } 119 }
120 120
121 // Blocks the current thread until the first incoming message, or |deadline|. 121 // Blocks the current thread until the first incoming message, or |deadline|.
122 bool WaitForIncomingMessage(MojoDeadline deadline) { 122 bool WaitForIncomingMessage(MojoDeadline deadline) {
123 DCHECK(thread_checker_.CalledOnValidThread()); 123 DCHECK(sequence_checker_.CalledOnValidSequence());
124 return connector_.WaitForIncomingMessage(deadline); 124 return connector_.WaitForIncomingMessage(deadline);
125 } 125 }
126 126
127 // See Binding for details of pause/resume. 127 // See Binding for details of pause/resume.
128 void PauseIncomingMethodCallProcessing(); 128 void PauseIncomingMethodCallProcessing();
129 void ResumeIncomingMethodCallProcessing(); 129 void ResumeIncomingMethodCallProcessing();
130 130
131 // Whether there are any associated interfaces running currently. 131 // Whether there are any associated interfaces running currently.
132 bool HasAssociatedEndpoints() const; 132 bool HasAssociatedEndpoints() const;
133 133
134 // Sets this object to testing mode. 134 // Sets this object to testing mode.
135 // In testing mode, the object doesn't disconnect the underlying message pipe 135 // In testing mode, the object doesn't disconnect the underlying message pipe
136 // when it receives unexpected or invalid messages. 136 // when it receives unexpected or invalid messages.
137 void EnableTestingMode(); 137 void EnableTestingMode();
138 138
139 // Is the router bound to a message pipe handle? 139 // Is the router bound to a message pipe handle?
140 bool is_valid() const { 140 bool is_valid() const {
141 DCHECK(thread_checker_.CalledOnValidThread()); 141 DCHECK(sequence_checker_.CalledOnValidSequence());
142 return connector_.is_valid(); 142 return connector_.is_valid();
143 } 143 }
144 144
145 // TODO(yzshen): consider removing this getter. 145 // TODO(yzshen): consider removing this getter.
146 MessagePipeHandle handle() const { 146 MessagePipeHandle handle() const {
147 DCHECK(thread_checker_.CalledOnValidThread()); 147 DCHECK(sequence_checker_.CalledOnValidSequence());
148 return connector_.handle(); 148 return connector_.handle();
149 } 149 }
150 150
151 bool SimulateReceivingMessageForTesting(Message* message) { 151 bool SimulateReceivingMessageForTesting(Message* message) {
152 return filters_.Accept(message); 152 return filters_.Accept(message);
153 } 153 }
154 154
155 private: 155 private:
156 class InterfaceEndpoint; 156 class InterfaceEndpoint;
157 struct Task; 157 struct Task;
(...skipping 27 matching lines...) Expand all
185 // Processes enqueued tasks (incoming messages and error notifications). 185 // Processes enqueued tasks (incoming messages and error notifications).
186 // |current_task_runner| is only used when |client_call_behavior| is 186 // |current_task_runner| is only used when |client_call_behavior| is
187 // ALLOW_DIRECT_CLIENT_CALLS to determine whether we are on the right task 187 // ALLOW_DIRECT_CLIENT_CALLS to determine whether we are on the right task
188 // runner to make client calls for async messages or connection error 188 // runner to make client calls for async messages or connection error
189 // notifications. 189 // notifications.
190 // 190 //
191 // Note: Because calling into InterfaceEndpointClient may lead to destruction 191 // Note: Because calling into InterfaceEndpointClient may lead to destruction
192 // of this object, if direct calls are allowed, the caller needs to hold on to 192 // of this object, if direct calls are allowed, the caller needs to hold on to
193 // a ref outside of |lock_| before calling this method. 193 // a ref outside of |lock_| before calling this method.
194 void ProcessTasks(ClientCallBehavior client_call_behavior, 194 void ProcessTasks(ClientCallBehavior client_call_behavior,
195 base::SingleThreadTaskRunner* current_task_runner); 195 base::SequencedTaskRunner* current_task_runner);
196 196
197 // Processes the first queued sync message for the endpoint corresponding to 197 // Processes the first queued sync message for the endpoint corresponding to
198 // |id|; returns whether there are more sync messages for that endpoint in the 198 // |id|; returns whether there are more sync messages for that endpoint in the
199 // queue. 199 // queue.
200 // 200 //
201 // This method is only used by enpoints during sync watching. Therefore, not 201 // This method is only used by enpoints during sync watching. Therefore, not
202 // all sync messages are handled by it. 202 // all sync messages are handled by it.
203 bool ProcessFirstSyncMessageForEndpoint(InterfaceId id); 203 bool ProcessFirstSyncMessageForEndpoint(InterfaceId id);
204 204
205 // Returns true to indicate that |task|/|message| has been processed. 205 // Returns true to indicate that |task|/|message| has been processed.
206 bool ProcessNotifyErrorTask( 206 bool ProcessNotifyErrorTask(Task* task,
207 Task* task, 207 ClientCallBehavior client_call_behavior,
208 ClientCallBehavior client_call_behavior, 208 base::SequencedTaskRunner* current_task_runner);
209 base::SingleThreadTaskRunner* current_task_runner); 209 bool ProcessIncomingMessage(Message* message,
210 bool ProcessIncomingMessage( 210 ClientCallBehavior client_call_behavior,
211 Message* message, 211 base::SequencedTaskRunner* current_task_runner);
212 ClientCallBehavior client_call_behavior,
213 base::SingleThreadTaskRunner* current_task_runner);
214 212
215 void MaybePostToProcessTasks(base::SingleThreadTaskRunner* task_runner); 213 void MaybePostToProcessTasks(base::SequencedTaskRunner* task_runner);
216 void LockAndCallProcessTasks(); 214 void LockAndCallProcessTasks();
217 215
218 // Updates the state of |endpoint|. If both the endpoint and its peer have 216 // Updates the state of |endpoint|. If both the endpoint and its peer have
219 // been closed, removes it from |endpoints_|. 217 // been closed, removes it from |endpoints_|.
220 // NOTE: The method may invalidate |endpoint|. 218 // NOTE: The method may invalidate |endpoint|.
221 enum EndpointStateUpdateType { ENDPOINT_CLOSED, PEER_ENDPOINT_CLOSED }; 219 enum EndpointStateUpdateType { ENDPOINT_CLOSED, PEER_ENDPOINT_CLOSED };
222 void UpdateEndpointStateMayRemove(InterfaceEndpoint* endpoint, 220 void UpdateEndpointStateMayRemove(InterfaceEndpoint* endpoint,
223 EndpointStateUpdateType type); 221 EndpointStateUpdateType type);
224 222
225 void RaiseErrorInNonTestingMode(); 223 void RaiseErrorInNonTestingMode();
226 224
227 InterfaceEndpoint* FindOrInsertEndpoint(InterfaceId id, bool* inserted); 225 InterfaceEndpoint* FindOrInsertEndpoint(InterfaceId id, bool* inserted);
228 226
229 void AssertLockAcquired(); 227 void AssertLockAcquired();
230 228
231 // Whether to set the namespace bit when generating interface IDs. Please see 229 // Whether to set the namespace bit when generating interface IDs. Please see
232 // comments of kInterfaceIdNamespaceMask. 230 // comments of kInterfaceIdNamespaceMask.
233 const bool set_interface_id_namespace_bit_; 231 const bool set_interface_id_namespace_bit_;
234 232
235 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 233 scoped_refptr<base::SequencedTaskRunner> task_runner_;
236 234
237 // Owned by |filters_| below. 235 // Owned by |filters_| below.
238 MessageHeaderValidator* header_validator_; 236 MessageHeaderValidator* header_validator_;
239 237
240 FilterChain filters_; 238 FilterChain filters_;
241 Connector connector_; 239 Connector connector_;
242 240
243 base::ThreadChecker thread_checker_; 241 base::SequenceChecker sequence_checker_;
244 242
245 // Protects the following members. 243 // Protects the following members.
246 // Sets to nullptr in Config::SINGLE_INTERFACE* mode. 244 // Sets to nullptr in Config::SINGLE_INTERFACE* mode.
247 std::unique_ptr<base::Lock> lock_; 245 std::unique_ptr<base::Lock> lock_;
248 PipeControlMessageHandler control_message_handler_; 246 PipeControlMessageHandler control_message_handler_;
249 247
250 // NOTE: It is unsafe to call into this object while holding |lock_|. 248 // NOTE: It is unsafe to call into this object while holding |lock_|.
251 PipeControlMessageProxy control_message_proxy_; 249 PipeControlMessageProxy control_message_proxy_;
252 250
253 std::map<InterfaceId, scoped_refptr<InterfaceEndpoint>> endpoints_; 251 std::map<InterfaceId, scoped_refptr<InterfaceEndpoint>> endpoints_;
254 uint32_t next_interface_id_value_; 252 uint32_t next_interface_id_value_;
255 253
256 std::deque<std::unique_ptr<Task>> tasks_; 254 std::deque<std::unique_ptr<Task>> tasks_;
257 // It refers to tasks in |tasks_| and doesn't own any of them. 255 // It refers to tasks in |tasks_| and doesn't own any of them.
258 std::map<InterfaceId, std::deque<Task*>> sync_message_tasks_; 256 std::map<InterfaceId, std::deque<Task*>> sync_message_tasks_;
259 257
260 bool posted_to_process_tasks_; 258 bool posted_to_process_tasks_;
261 scoped_refptr<base::SingleThreadTaskRunner> posted_to_task_runner_; 259 scoped_refptr<base::SequencedTaskRunner> posted_to_task_runner_;
262 260
263 bool encountered_error_; 261 bool encountered_error_;
264 262
265 bool paused_; 263 bool paused_;
266 264
267 bool testing_mode_; 265 bool testing_mode_;
268 266
269 DISALLOW_COPY_AND_ASSIGN(MultiplexRouter); 267 DISALLOW_COPY_AND_ASSIGN(MultiplexRouter);
270 }; 268 };
271 269
272 } // namespace internal 270 } // namespace internal
273 } // namespace mojo 271 } // namespace mojo
274 272
275 #endif // MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_ 273 #endif // MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/interface_ptr_state.h ('k') | mojo/public/cpp/bindings/lib/multiplex_router.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698