Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(172)

Side by Side Diff: mojo/public/cpp/bindings/lib/router.cc

Issue 2532053004: Mojo C++ bindings: remove the single-threaded Router. (Closed)
Patch Set: . Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/bindings/lib/router.h ('k') | mojo/public/cpp/bindings/strong_binding.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/lib/router.h"
6
7 #include <stdint.h>
8
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/stl_util.h"
16 #include "mojo/public/cpp/bindings/lib/validation_util.h"
17 #include "mojo/public/cpp/bindings/sync_call_restrictions.h"
18
19 namespace mojo {
20 namespace internal {
21
22 // ----------------------------------------------------------------------------
23
24 namespace {
25
26 void DCheckIfInvalid(const base::WeakPtr<Router>& router,
27 const std::string& message) {
28 bool is_valid = router && !router->encountered_error() && router->is_valid();
29 DCHECK(!is_valid) << message;
30 }
31
32 class ResponderThunk : public MessageReceiverWithStatus {
33 public:
34 explicit ResponderThunk(const base::WeakPtr<Router>& router,
35 scoped_refptr<base::SingleThreadTaskRunner> runner)
36 : router_(router),
37 accept_was_invoked_(false),
38 task_runner_(std::move(runner)) {}
39 ~ResponderThunk() override {
40 if (!accept_was_invoked_) {
41 // The Service handled a message that was expecting a response
42 // but did not send a response.
43 // We raise an error to signal the calling application that an error
44 // condition occurred. Without this the calling application would have no
45 // way of knowing it should stop waiting for a response.
46 if (task_runner_->RunsTasksOnCurrentThread()) {
47 // Please note that even if this code is run from a different task
48 // runner on the same thread as |task_runner_|, it is okay to directly
49 // call Router::RaiseError(), because it will raise error from the
50 // correct task runner asynchronously.
51 if (router_)
52 router_->RaiseError();
53 } else {
54 task_runner_->PostTask(FROM_HERE,
55 base::Bind(&Router::RaiseError, router_));
56 }
57 }
58 }
59
60 // MessageReceiver implementation:
61 bool Accept(Message* message) override {
62 DCHECK(task_runner_->RunsTasksOnCurrentThread());
63 accept_was_invoked_ = true;
64 DCHECK(message->has_flag(Message::kFlagIsResponse));
65
66 bool result = false;
67
68 if (router_)
69 result = router_->Accept(message);
70
71 return result;
72 }
73
74 // MessageReceiverWithStatus implementation:
75 bool IsValid() override {
76 DCHECK(task_runner_->RunsTasksOnCurrentThread());
77 return router_ && !router_->encountered_error() && router_->is_valid();
78 }
79
80 void DCheckInvalid(const std::string& message) override {
81 if (task_runner_->RunsTasksOnCurrentThread()) {
82 DCheckIfInvalid(router_, message);
83 } else {
84 task_runner_->PostTask(FROM_HERE,
85 base::Bind(&DCheckIfInvalid, router_, message));
86 }
87 }
88
89 private:
90 base::WeakPtr<Router> router_;
91 bool accept_was_invoked_;
92 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
93 };
94
95 } // namespace
96
97 // ----------------------------------------------------------------------------
98
99 Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received)
100 : response_received(in_response_received) {}
101
102 Router::SyncResponseInfo::~SyncResponseInfo() {}
103
104 // ----------------------------------------------------------------------------
105
106 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
107 : router_(router) {
108 }
109
110 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() {
111 }
112
113 bool Router::HandleIncomingMessageThunk::Accept(Message* message) {
114 return router_->HandleIncomingMessage(message);
115 }
116
117 // ----------------------------------------------------------------------------
118
119 Router::Router(ScopedMessagePipeHandle message_pipe,
120 FilterChain filters,
121 bool expects_sync_requests,
122 scoped_refptr<base::SingleThreadTaskRunner> runner,
123 int interface_version)
124 : thunk_(this),
125 filters_(std::move(filters)),
126 connector_(std::move(message_pipe),
127 Connector::SINGLE_THREADED_SEND,
128 std::move(runner)),
129 incoming_receiver_(nullptr),
130 next_request_id_(0),
131 testing_mode_(false),
132 pending_task_for_messages_(false),
133 encountered_error_(false),
134 control_message_proxy_(this),
135 control_message_handler_(interface_version),
136 weak_factory_(this) {
137 filters_.SetSink(&thunk_);
138 if (expects_sync_requests)
139 connector_.AllowWokenUpBySyncWatchOnSameThread();
140 connector_.set_incoming_receiver(&filters_);
141 connector_.set_connection_error_handler(
142 base::Bind(&Router::OnConnectionError, base::Unretained(this)));
143 }
144
145 Router::~Router() {}
146
147 void Router::AddFilter(std::unique_ptr<MessageReceiver> filter) {
148 filters_.Append(std::move(filter));
149 }
150
151 bool Router::Accept(Message* message) {
152 DCHECK(thread_checker_.CalledOnValidThread());
153 DCHECK(!message->has_flag(Message::kFlagExpectsResponse));
154 return connector_.Accept(message);
155 }
156
157 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
158 DCHECK(thread_checker_.CalledOnValidThread());
159 DCHECK(message->has_flag(Message::kFlagExpectsResponse));
160
161 // Reserve 0 in case we want it to convey special meaning in the future.
162 uint64_t request_id = next_request_id_++;
163 if (request_id == 0)
164 request_id = next_request_id_++;
165
166 bool is_sync = message->has_flag(Message::kFlagIsSync);
167 message->set_request_id(request_id);
168 if (!connector_.Accept(message))
169 return false;
170
171 if (!is_sync) {
172 // We assume ownership of |responder|.
173 async_responders_[request_id] = base::WrapUnique(responder);
174 return true;
175 }
176
177 SyncCallRestrictions::AssertSyncCallAllowed();
178
179 bool response_received = false;
180 std::unique_ptr<MessageReceiver> sync_responder(responder);
181 sync_responses_.insert(std::make_pair(
182 request_id, base::MakeUnique<SyncResponseInfo>(&response_received)));
183
184 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
185 connector_.SyncWatch(&response_received);
186 // Make sure that this instance hasn't been destroyed.
187 if (weak_self) {
188 DCHECK(base::ContainsKey(sync_responses_, request_id));
189 auto iter = sync_responses_.find(request_id);
190 DCHECK_EQ(&response_received, iter->second->response_received);
191 if (response_received)
192 ignore_result(sync_responder->Accept(&iter->second->response));
193 sync_responses_.erase(iter);
194 }
195
196 // Return true means that we take ownership of |responder|.
197 return true;
198 }
199
200 void Router::EnableTestingMode() {
201 DCHECK(thread_checker_.CalledOnValidThread());
202 testing_mode_ = true;
203 connector_.set_enforce_errors_from_incoming_receiver(false);
204 }
205
206 bool Router::HandleIncomingMessage(Message* message) {
207 DCHECK(thread_checker_.CalledOnValidThread());
208
209 const bool during_sync_call =
210 connector_.during_sync_handle_watcher_callback();
211 if (!message->has_flag(Message::kFlagIsSync) &&
212 (during_sync_call || !pending_messages_.empty())) {
213 pending_messages_.emplace(std::move(*message));
214
215 if (!pending_task_for_messages_) {
216 pending_task_for_messages_ = true;
217 connector_.task_runner()->PostTask(
218 FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
219 weak_factory_.GetWeakPtr()));
220 }
221
222 return true;
223 }
224
225 return HandleMessageInternal(message);
226 }
227
228 void Router::HandleQueuedMessages() {
229 DCHECK(thread_checker_.CalledOnValidThread());
230 DCHECK(pending_task_for_messages_);
231
232 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
233 while (!pending_messages_.empty()) {
234 Message message(std::move(pending_messages_.front()));
235 pending_messages_.pop();
236
237 bool result = HandleMessageInternal(&message);
238 if (!weak_self)
239 return;
240
241 if (!result && !testing_mode_) {
242 connector_.RaiseError();
243 break;
244 }
245 }
246
247 pending_task_for_messages_ = false;
248
249 // We may have already seen a connection error from the connector, but
250 // haven't notified the user because we want to process all the queued
251 // messages first. We should do it now.
252 if (connector_.encountered_error() && !encountered_error_)
253 OnConnectionError();
254 }
255
256 bool Router::HandleMessageInternal(Message* message) {
257 DCHECK(!encountered_error_);
258
259 if (message->has_flag(Message::kFlagExpectsResponse)) {
260 MessageReceiverWithStatus* responder = new ResponderThunk(
261 weak_factory_.GetWeakPtr(), connector_.task_runner());
262 bool ok = false;
263 if (mojo::internal::ControlMessageHandler::IsControlMessage(message)) {
264 ok = control_message_handler_.AcceptWithResponder(message, responder);
265 } else {
266 ok = incoming_receiver_->AcceptWithResponder(message, responder);
267 }
268 if (!ok)
269 delete responder;
270 return ok;
271
272 } else if (message->has_flag(Message::kFlagIsResponse)) {
273 uint64_t request_id = message->request_id();
274
275 if (message->has_flag(Message::kFlagIsSync)) {
276 auto it = sync_responses_.find(request_id);
277 if (it == sync_responses_.end()) {
278 DCHECK(testing_mode_);
279 return false;
280 }
281 it->second->response = std::move(*message);
282 *it->second->response_received = true;
283 return true;
284 }
285
286 auto it = async_responders_.find(request_id);
287 if (it == async_responders_.end()) {
288 DCHECK(testing_mode_);
289 return false;
290 }
291 std::unique_ptr<MessageReceiver> responder = std::move(it->second);
292 async_responders_.erase(it);
293 return responder->Accept(message);
294 } else {
295 if (mojo::internal::ControlMessageHandler::IsControlMessage(message))
296 return control_message_handler_.Accept(message);
297
298 return incoming_receiver_->Accept(message);
299 }
300 }
301
302 void Router::OnConnectionError() {
303 if (encountered_error_)
304 return;
305
306 if (!pending_messages_.empty()) {
307 // After all the pending messages are processed, we will check whether an
308 // error has been encountered and run the user's connection error handler
309 // if necessary.
310 DCHECK(pending_task_for_messages_);
311 return;
312 }
313
314 if (connector_.during_sync_handle_watcher_callback()) {
315 // We don't want the error handler to reenter an ongoing sync call.
316 connector_.task_runner()->PostTask(
317 FROM_HERE,
318 base::Bind(&Router::OnConnectionError, weak_factory_.GetWeakPtr()));
319 return;
320 }
321
322 control_message_proxy_.OnConnectionError();
323
324 encountered_error_ = true;
325
326 // Response callbacks may hold on to resource, and there's no need to keep
327 // them alive any longer. Note that it's allowed that a pending response
328 // callback may own this endpoint, so we simply move the responders onto the
329 // stack here and let them be destroyed when the stack unwinds.
330 AsyncResponderMap responders = std::move(async_responders_);
331
332 if (!error_handler_.is_null()) {
333 error_handler_.Run();
334 } else if (!error_with_reason_handler_.is_null()) {
335 // Make a copy on the stack. If we directly pass a reference to a member of
336 // |control_message_handler_|, that reference will be invalidated as soon as
337 // the user destroys the interface endpoint.
338 std::string description = control_message_handler_.disconnect_description();
339 error_with_reason_handler_.Run(
340 control_message_handler_.disconnect_custom_reason(), description);
341 }
342 }
343
344 // ----------------------------------------------------------------------------
345
346 } // namespace internal
347 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/router.h ('k') | mojo/public/cpp/bindings/strong_binding.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698