Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(254)

Side by Side Diff: mojo/public/cpp/bindings/lib/interface_endpoint_client.cc

Issue 2608163003: Change single-interface mojo bindings to use SequencedTaskRunner. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" 5 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/macros.h" 13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h" 14 #include "base/memory/ptr_util.h"
15 #include "base/single_thread_task_runner.h" 15 #include "base/sequenced_task_runner.h"
16 #include "base/stl_util.h" 16 #include "base/stl_util.h"
17 #include "mojo/public/cpp/bindings/associated_group.h" 17 #include "mojo/public/cpp/bindings/associated_group.h"
18 #include "mojo/public/cpp/bindings/associated_group_controller.h" 18 #include "mojo/public/cpp/bindings/associated_group_controller.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" 19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
20 #include "mojo/public/cpp/bindings/lib/validation_util.h" 20 #include "mojo/public/cpp/bindings/lib/validation_util.h"
21 #include "mojo/public/cpp/bindings/sync_call_restrictions.h" 21 #include "mojo/public/cpp/bindings/sync_call_restrictions.h"
22 22
23 namespace mojo { 23 namespace mojo {
24 24
25 // ---------------------------------------------------------------------------- 25 // ----------------------------------------------------------------------------
26 26
27 namespace { 27 namespace {
28 28
29 void DCheckIfInvalid(const base::WeakPtr<InterfaceEndpointClient>& client, 29 void DCheckIfInvalid(const base::WeakPtr<InterfaceEndpointClient>& client,
30 const std::string& message) { 30 const std::string& message) {
31 bool is_valid = client && !client->encountered_error(); 31 bool is_valid = client && !client->encountered_error();
32 DCHECK(!is_valid) << message; 32 DCHECK(!is_valid) << message;
33 } 33 }
34 34
35 // When receiving an incoming message which expects a repsonse, 35 // When receiving an incoming message which expects a repsonse,
36 // InterfaceEndpointClient creates a ResponderThunk object and passes it to the 36 // InterfaceEndpointClient creates a ResponderThunk object and passes it to the
37 // incoming message receiver. When the receiver finishes processing the message, 37 // incoming message receiver. When the receiver finishes processing the message,
38 // it can provide a response using this object. 38 // it can provide a response using this object.
39 class ResponderThunk : public MessageReceiverWithStatus { 39 class ResponderThunk : public MessageReceiverWithStatus {
40 public: 40 public:
41 explicit ResponderThunk( 41 explicit ResponderThunk(
42 const base::WeakPtr<InterfaceEndpointClient>& endpoint_client, 42 const base::WeakPtr<InterfaceEndpointClient>& endpoint_client,
43 scoped_refptr<base::SingleThreadTaskRunner> runner) 43 scoped_refptr<base::SequencedTaskRunner> runner)
44 : endpoint_client_(endpoint_client), 44 : endpoint_client_(endpoint_client),
45 accept_was_invoked_(false), 45 accept_was_invoked_(false),
46 task_runner_(std::move(runner)) {} 46 task_runner_(std::move(runner)) {}
47 ~ResponderThunk() override { 47 ~ResponderThunk() override {
48 if (!accept_was_invoked_) { 48 if (!accept_was_invoked_) {
49 // The Service handled a message that was expecting a response 49 // The Service handled a message that was expecting a response
50 // but did not send a response. 50 // but did not send a response.
51 // We raise an error to signal the calling application that an error 51 // We raise an error to signal the calling application that an error
52 // condition occurred. Without this the calling application would have no 52 // condition occurred. Without this the calling application would have no
53 // way of knowing it should stop waiting for a response. 53 // way of knowing it should stop waiting for a response.
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 DCheckIfInvalid(endpoint_client_, message); 92 DCheckIfInvalid(endpoint_client_, message);
93 } else { 93 } else {
94 task_runner_->PostTask( 94 task_runner_->PostTask(
95 FROM_HERE, base::Bind(&DCheckIfInvalid, endpoint_client_, message)); 95 FROM_HERE, base::Bind(&DCheckIfInvalid, endpoint_client_, message));
96 } 96 }
97 } 97 }
98 98
99 private: 99 private:
100 base::WeakPtr<InterfaceEndpointClient> endpoint_client_; 100 base::WeakPtr<InterfaceEndpointClient> endpoint_client_;
101 bool accept_was_invoked_; 101 bool accept_was_invoked_;
102 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 102 scoped_refptr<base::SequencedTaskRunner> task_runner_;
103 103
104 DISALLOW_COPY_AND_ASSIGN(ResponderThunk); 104 DISALLOW_COPY_AND_ASSIGN(ResponderThunk);
105 }; 105 };
106 106
107 } // namespace 107 } // namespace
108 108
109 // ---------------------------------------------------------------------------- 109 // ----------------------------------------------------------------------------
110 110
111 InterfaceEndpointClient::SyncResponseInfo::SyncResponseInfo( 111 InterfaceEndpointClient::SyncResponseInfo::SyncResponseInfo(
112 bool* in_response_received) 112 bool* in_response_received)
(...skipping 15 matching lines...) Expand all
128 return owner_->HandleValidatedMessage(message); 128 return owner_->HandleValidatedMessage(message);
129 } 129 }
130 130
131 // ---------------------------------------------------------------------------- 131 // ----------------------------------------------------------------------------
132 132
133 InterfaceEndpointClient::InterfaceEndpointClient( 133 InterfaceEndpointClient::InterfaceEndpointClient(
134 ScopedInterfaceEndpointHandle handle, 134 ScopedInterfaceEndpointHandle handle,
135 MessageReceiverWithResponderStatus* receiver, 135 MessageReceiverWithResponderStatus* receiver,
136 std::unique_ptr<MessageReceiver> payload_validator, 136 std::unique_ptr<MessageReceiver> payload_validator,
137 bool expect_sync_requests, 137 bool expect_sync_requests,
138 scoped_refptr<base::SingleThreadTaskRunner> runner, 138 scoped_refptr<base::SequencedTaskRunner> runner,
139 uint32_t interface_version) 139 uint32_t interface_version)
140 : handle_(std::move(handle)), 140 : handle_(std::move(handle)),
141 incoming_receiver_(receiver), 141 incoming_receiver_(receiver),
142 thunk_(this), 142 thunk_(this),
143 filters_(&thunk_), 143 filters_(&thunk_),
144 next_request_id_(1), 144 next_request_id_(1),
145 encountered_error_(false), 145 encountered_error_(false),
146 task_runner_(std::move(runner)), 146 task_runner_(std::move(runner)),
147 control_message_proxy_(this), 147 control_message_proxy_(this),
148 control_message_handler_(interface_version), 148 control_message_handler_(interface_version),
149 weak_ptr_factory_(this) { 149 weak_ptr_factory_(this) {
150 DCHECK(handle_.is_valid()); 150 DCHECK(handle_.is_valid());
151 DCHECK(handle_.is_local()); 151 DCHECK(handle_.is_local());
152 152
153 // TODO(yzshen): the way to use validator (or message filter in general) 153 // TODO(yzshen): the way to use validator (or message filter in general)
154 // directly is a little awkward. 154 // directly is a little awkward.
155 if (payload_validator) 155 if (payload_validator)
156 filters_.Append(std::move(payload_validator)); 156 filters_.Append(std::move(payload_validator));
157 157
158 controller_ = handle_.group_controller()->AttachEndpointClient( 158 controller_ = handle_.group_controller()->AttachEndpointClient(
159 handle_, this, task_runner_); 159 handle_, this, task_runner_);
160 if (expect_sync_requests) 160 if (expect_sync_requests)
161 controller_->AllowWokenUpBySyncWatchOnSameThread(); 161 controller_->AllowWokenUpBySyncWatchOnSameThread();
162 } 162 }
163 163
164 InterfaceEndpointClient::~InterfaceEndpointClient() { 164 InterfaceEndpointClient::~InterfaceEndpointClient() {
165 DCHECK(thread_checker_.CalledOnValidThread()); 165 DCHECK(sequence_checker_.CalledOnValidSequence());
166 166
167 if (handle_.is_valid()) 167 if (handle_.is_valid())
168 handle_.group_controller()->DetachEndpointClient(handle_); 168 handle_.group_controller()->DetachEndpointClient(handle_);
169 } 169 }
170 170
171 AssociatedGroup* InterfaceEndpointClient::associated_group() { 171 AssociatedGroup* InterfaceEndpointClient::associated_group() {
172 if (!associated_group_) 172 if (!associated_group_)
173 associated_group_ = handle_.group_controller()->CreateAssociatedGroup(); 173 associated_group_ = handle_.group_controller()->CreateAssociatedGroup();
174 return associated_group_.get(); 174 return associated_group_.get();
175 } 175 }
176 176
177 uint32_t InterfaceEndpointClient::interface_id() const { 177 uint32_t InterfaceEndpointClient::interface_id() const {
178 DCHECK(thread_checker_.CalledOnValidThread()); 178 DCHECK(sequence_checker_.CalledOnValidSequence());
179 return handle_.id(); 179 return handle_.id();
180 } 180 }
181 181
182 ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() { 182 ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() {
183 DCHECK(thread_checker_.CalledOnValidThread()); 183 DCHECK(sequence_checker_.CalledOnValidSequence());
184 DCHECK(!has_pending_responders()); 184 DCHECK(!has_pending_responders());
185 185
186 if (!handle_.is_valid()) 186 if (!handle_.is_valid())
187 return ScopedInterfaceEndpointHandle(); 187 return ScopedInterfaceEndpointHandle();
188 188
189 controller_ = nullptr; 189 controller_ = nullptr;
190 handle_.group_controller()->DetachEndpointClient(handle_); 190 handle_.group_controller()->DetachEndpointClient(handle_);
191 191
192 return std::move(handle_); 192 return std::move(handle_);
193 } 193 }
194 194
195 void InterfaceEndpointClient::AddFilter( 195 void InterfaceEndpointClient::AddFilter(
196 std::unique_ptr<MessageReceiver> filter) { 196 std::unique_ptr<MessageReceiver> filter) {
197 filters_.Append(std::move(filter)); 197 filters_.Append(std::move(filter));
198 } 198 }
199 199
200 void InterfaceEndpointClient::RaiseError() { 200 void InterfaceEndpointClient::RaiseError() {
201 DCHECK(thread_checker_.CalledOnValidThread()); 201 DCHECK(sequence_checker_.CalledOnValidSequence());
202 202
203 handle_.group_controller()->RaiseError(); 203 handle_.group_controller()->RaiseError();
204 } 204 }
205 205
206 void InterfaceEndpointClient::CloseWithReason(uint32_t custom_reason, 206 void InterfaceEndpointClient::CloseWithReason(uint32_t custom_reason,
207 const std::string& description) { 207 const std::string& description) {
208 DCHECK(thread_checker_.CalledOnValidThread()); 208 DCHECK(sequence_checker_.CalledOnValidSequence());
209 209
210 auto handle = PassHandle(); 210 auto handle = PassHandle();
211 handle.ResetWithReason(custom_reason, description); 211 handle.ResetWithReason(custom_reason, description);
212 } 212 }
213 213
214 bool InterfaceEndpointClient::Accept(Message* message) { 214 bool InterfaceEndpointClient::Accept(Message* message) {
215 DCHECK(thread_checker_.CalledOnValidThread()); 215 DCHECK(sequence_checker_.CalledOnValidSequence());
216 DCHECK(controller_); 216 DCHECK(controller_);
217 DCHECK(!message->has_flag(Message::kFlagExpectsResponse)); 217 DCHECK(!message->has_flag(Message::kFlagExpectsResponse));
218 218
219 if (encountered_error_) 219 if (encountered_error_)
220 return false; 220 return false;
221 221
222 return controller_->SendMessage(message); 222 return controller_->SendMessage(message);
223 } 223 }
224 224
225 bool InterfaceEndpointClient::AcceptWithResponder(Message* message, 225 bool InterfaceEndpointClient::AcceptWithResponder(Message* message,
226 MessageReceiver* responder) { 226 MessageReceiver* responder) {
227 DCHECK(thread_checker_.CalledOnValidThread()); 227 DCHECK(sequence_checker_.CalledOnValidSequence());
228 DCHECK(controller_); 228 DCHECK(controller_);
229 DCHECK(message->has_flag(Message::kFlagExpectsResponse)); 229 DCHECK(message->has_flag(Message::kFlagExpectsResponse));
230 230
231 if (encountered_error_) 231 if (encountered_error_)
232 return false; 232 return false;
233 233
234 // Reserve 0 in case we want it to convey special meaning in the future. 234 // Reserve 0 in case we want it to convey special meaning in the future.
235 uint64_t request_id = next_request_id_++; 235 uint64_t request_id = next_request_id_++;
236 if (request_id == 0) 236 if (request_id == 0)
237 request_id = next_request_id_++; 237 request_id = next_request_id_++;
(...skipping 28 matching lines...) Expand all
266 if (response_received) 266 if (response_received)
267 ignore_result(sync_responder->Accept(&iter->second->response)); 267 ignore_result(sync_responder->Accept(&iter->second->response));
268 sync_responses_.erase(iter); 268 sync_responses_.erase(iter);
269 } 269 }
270 270
271 // Return true means that we take ownership of |responder|. 271 // Return true means that we take ownership of |responder|.
272 return true; 272 return true;
273 } 273 }
274 274
275 bool InterfaceEndpointClient::HandleIncomingMessage(Message* message) { 275 bool InterfaceEndpointClient::HandleIncomingMessage(Message* message) {
276 DCHECK(thread_checker_.CalledOnValidThread()); 276 DCHECK(sequence_checker_.CalledOnValidSequence());
277 return filters_.Accept(message); 277 return filters_.Accept(message);
278 } 278 }
279 279
280 void InterfaceEndpointClient::NotifyError( 280 void InterfaceEndpointClient::NotifyError(
281 const base::Optional<DisconnectReason>& reason) { 281 const base::Optional<DisconnectReason>& reason) {
282 DCHECK(thread_checker_.CalledOnValidThread()); 282 DCHECK(sequence_checker_.CalledOnValidSequence());
283 283
284 if (encountered_error_) 284 if (encountered_error_)
285 return; 285 return;
286 encountered_error_ = true; 286 encountered_error_ = true;
287 287
288 // Response callbacks may hold on to resource, and there's no need to keep 288 // Response callbacks may hold on to resource, and there's no need to keep
289 // them alive any longer. Note that it's allowed that a pending response 289 // them alive any longer. Note that it's allowed that a pending response
290 // callback may own this endpoint, so we simply move the responders onto the 290 // callback may own this endpoint, so we simply move the responders onto the
291 // stack here and let them be destroyed when the stack unwinds. 291 // stack here and let them be destroyed when the stack unwinds.
292 AsyncResponderMap responders = std::move(async_responders_); 292 AsyncResponderMap responders = std::move(async_responders_);
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
341 return responder->Accept(message); 341 return responder->Accept(message);
342 } else { 342 } else {
343 if (mojo::internal::ControlMessageHandler::IsControlMessage(message)) 343 if (mojo::internal::ControlMessageHandler::IsControlMessage(message))
344 return control_message_handler_.Accept(message); 344 return control_message_handler_.Accept(message);
345 345
346 return incoming_receiver_->Accept(message); 346 return incoming_receiver_->Accept(message);
347 } 347 }
348 } 348 }
349 349
350 } // namespace mojo 350 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/connector.cc ('k') | mojo/public/cpp/bindings/lib/interface_ptr_state.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698