Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(115)

Side by Side Diff: mojo/public/cpp/bindings/lib/router.cc

Issue 2064903002: Mojo: Report bindings validation errors via MojoNotifyBadMessage (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/router.h" 5 #include "mojo/public/cpp/bindings/lib/router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
(...skipping 23 matching lines...) Expand all
34 : router_(router), 34 : router_(router),
35 accept_was_invoked_(false), 35 accept_was_invoked_(false),
36 task_runner_(std::move(runner)) {} 36 task_runner_(std::move(runner)) {}
37 ~ResponderThunk() override { 37 ~ResponderThunk() override {
38 if (!accept_was_invoked_) { 38 if (!accept_was_invoked_) {
39 // The Mojo application handled a message that was expecting a response 39 // The Mojo application handled a message that was expecting a response
40 // but did not send a response. 40 // but did not send a response.
41 // We raise an error to signal the calling application that an error 41 // We raise an error to signal the calling application that an error
42 // condition occurred. Without this the calling application would have no 42 // condition occurred. Without this the calling application would have no
43 // way of knowing it should stop waiting for a response. 43 // way of knowing it should stop waiting for a response.
44 Error error(Error::Type::REQUEST_CANCELED);
44 if (task_runner_->RunsTasksOnCurrentThread()) { 45 if (task_runner_->RunsTasksOnCurrentThread()) {
45 // Please note that even if this code is run from a different task 46 // Please note that even if this code is run from a different task
46 // runner on the same thread as |task_runner_|, it is okay to directly 47 // runner on the same thread as |task_runner_|, it is okay to directly
47 // call Router::RaiseError(), because it will raise error from the 48 // call Router::RaiseError(), because it will raise error from the
48 // correct task runner asynchronously. 49 // correct task runner asynchronously.
49 if (router_) 50 if (router_)
50 router_->RaiseError(); 51 router_->RaiseError(std::move(error));
51 } else { 52 } else {
52 task_runner_->PostTask(FROM_HERE, 53 task_runner_->PostTask(FROM_HERE,
53 base::Bind(&Router::RaiseError, router_)); 54 base::Bind(&Router::RaiseError, router_,
55 base::Passed(&error)));
54 } 56 }
55 } 57 }
56 } 58 }
57 59
58 // MessageReceiver implementation: 60 // MessageReceiver implementation:
59 bool Accept(Message* message) override { 61 bool Accept(Message* message, Error* error) override {
60 DCHECK(task_runner_->RunsTasksOnCurrentThread()); 62 DCHECK(task_runner_->RunsTasksOnCurrentThread());
61 accept_was_invoked_ = true; 63 accept_was_invoked_ = true;
62 DCHECK(message->has_flag(kMessageIsResponse)); 64 DCHECK(message->has_flag(kMessageIsResponse));
63 65
64 bool result = false; 66 if (router_)
67 return router_->Accept(message, error);
65 68
66 if (router_) 69 *error = Error(Error::Type::RESPONSE_DROPPED);
67 result = router_->Accept(message); 70 return false;
68
69 return result;
70 } 71 }
71 72
72 // MessageReceiverWithStatus implementation: 73 // MessageReceiverWithStatus implementation:
73 bool IsValid() override { 74 bool IsValid() override {
74 DCHECK(task_runner_->RunsTasksOnCurrentThread()); 75 DCHECK(task_runner_->RunsTasksOnCurrentThread());
75 return router_ && !router_->encountered_error() && router_->is_valid(); 76 return router_ && !router_->encountered_error() && router_->is_valid();
76 } 77 }
77 78
78 void DCheckInvalid(const std::string& message) override { 79 void DCheckInvalid(const std::string& message) override {
79 if (task_runner_->RunsTasksOnCurrentThread()) { 80 if (task_runner_->RunsTasksOnCurrentThread()) {
(...skipping 21 matching lines...) Expand all
101 102
102 // ---------------------------------------------------------------------------- 103 // ----------------------------------------------------------------------------
103 104
104 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) 105 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
105 : router_(router) { 106 : router_(router) {
106 } 107 }
107 108
108 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { 109 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() {
109 } 110 }
110 111
111 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { 112 bool Router::HandleIncomingMessageThunk::Accept(Message* message,
112 return router_->HandleIncomingMessage(message); 113 Error* error) {
114 return router_->HandleIncomingMessage(message, error);
113 } 115 }
114 116
115 // ---------------------------------------------------------------------------- 117 // ----------------------------------------------------------------------------
116 118
117 Router::Router(ScopedMessagePipeHandle message_pipe, 119 Router::Router(ScopedMessagePipeHandle message_pipe,
118 FilterChain filters, 120 FilterChain filters,
119 bool expects_sync_requests, 121 bool expects_sync_requests,
120 scoped_refptr<base::SingleThreadTaskRunner> runner) 122 scoped_refptr<base::SingleThreadTaskRunner> runner)
121 : thunk_(this), 123 : thunk_(this),
122 filters_(std::move(filters)), 124 filters_(std::move(filters)),
123 connector_(std::move(message_pipe), 125 connector_(std::move(message_pipe),
124 Connector::SINGLE_THREADED_SEND, 126 Connector::SINGLE_THREADED_SEND,
125 std::move(runner)), 127 std::move(runner)),
126 incoming_receiver_(nullptr), 128 incoming_receiver_(nullptr),
127 next_request_id_(0), 129 next_request_id_(0),
128 testing_mode_(false), 130 testing_mode_(false),
129 pending_task_for_messages_(false), 131 pending_task_for_messages_(false),
130 encountered_error_(false), 132 encountered_error_(false),
131 weak_factory_(this) { 133 weak_factory_(this) {
132 filters_.SetSink(&thunk_); 134 filters_.SetSink(&thunk_);
133 if (expects_sync_requests) 135 if (expects_sync_requests)
134 connector_.AllowWokenUpBySyncWatchOnSameThread(); 136 connector_.AllowWokenUpBySyncWatchOnSameThread();
135 connector_.set_incoming_receiver(filters_.GetHead()); 137 connector_.set_incoming_receiver(filters_.GetHead());
136 connector_.set_connection_error_handler([this]() { OnConnectionError(); }); 138 connector_.set_connection_error_handler([this]() { OnConnectionError(); });
137 } 139 }
138 140
139 Router::~Router() {} 141 Router::~Router() {}
140 142
141 bool Router::Accept(Message* message) { 143 bool Router::Accept(Message* message, Error* error) {
142 DCHECK(thread_checker_.CalledOnValidThread()); 144 DCHECK(thread_checker_.CalledOnValidThread());
143 DCHECK(!message->has_flag(kMessageExpectsResponse)); 145 DCHECK(!message->has_flag(kMessageExpectsResponse));
144 return connector_.Accept(message); 146 return connector_.Accept(message, error);
145 } 147 }
146 148
147 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { 149 bool Router::AcceptWithResponder(Message* message,
150 MessageReceiver* responder,
151 Error* error) {
148 DCHECK(thread_checker_.CalledOnValidThread()); 152 DCHECK(thread_checker_.CalledOnValidThread());
149 DCHECK(message->has_flag(kMessageExpectsResponse)); 153 DCHECK(message->has_flag(kMessageExpectsResponse));
150 154
151 // Reserve 0 in case we want it to convey special meaning in the future. 155 // Reserve 0 in case we want it to convey special meaning in the future.
152 uint64_t request_id = next_request_id_++; 156 uint64_t request_id = next_request_id_++;
153 if (request_id == 0) 157 if (request_id == 0)
154 request_id = next_request_id_++; 158 request_id = next_request_id_++;
155 159
156 bool is_sync = message->has_flag(kMessageIsSync); 160 bool is_sync = message->has_flag(kMessageIsSync);
157 message->set_request_id(request_id); 161 message->set_request_id(request_id);
158 if (!connector_.Accept(message)) 162 if (!connector_.Accept(message, error))
159 return false; 163 return false;
160 164
161 if (!is_sync) { 165 if (!is_sync) {
162 // We assume ownership of |responder|. 166 // We assume ownership of |responder|.
163 async_responders_[request_id] = base::WrapUnique(responder); 167 async_responders_[request_id] = base::WrapUnique(responder);
164 return true; 168 return true;
165 } 169 }
166 170
167 bool response_received = false; 171 bool response_received = false;
168 std::unique_ptr<MessageReceiver> sync_responder(responder); 172 std::unique_ptr<MessageReceiver> sync_responder(responder);
169 sync_responses_.insert(std::make_pair( 173 sync_responses_.insert(std::make_pair(
170 request_id, base::WrapUnique(new SyncResponseInfo(&response_received)))); 174 request_id, base::WrapUnique(new SyncResponseInfo(&response_received))));
171 175
172 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); 176 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
173 connector_.SyncWatch(&response_received); 177 connector_.SyncWatch(&response_received);
174 // Make sure that this instance hasn't been destroyed. 178 // Make sure that this instance hasn't been destroyed.
175 if (weak_self) { 179 if (weak_self) {
176 DCHECK(ContainsKey(sync_responses_, request_id)); 180 DCHECK(ContainsKey(sync_responses_, request_id));
177 auto iter = sync_responses_.find(request_id); 181 auto iter = sync_responses_.find(request_id);
178 DCHECK_EQ(&response_received, iter->second->response_received); 182 DCHECK_EQ(&response_received, iter->second->response_received);
179 if (response_received) { 183 if (response_received) {
180 std::unique_ptr<Message> response = std::move(iter->second->response); 184 std::unique_ptr<Message> response = std::move(iter->second->response);
181 ignore_result(sync_responder->Accept(response.get())); 185
186 Error send_response_error;
187 ignore_result(sync_responder->Accept(response.get(),
188 &send_response_error));
182 } 189 }
183 sync_responses_.erase(iter); 190 sync_responses_.erase(iter);
184 } 191 }
185 192
186 // Return true means that we take ownership of |responder|. 193 // Return true means that we take ownership of |responder|.
187 return true; 194 return true;
188 } 195 }
189 196
190 void Router::EnableTestingMode() { 197 void Router::EnableTestingMode() {
191 DCHECK(thread_checker_.CalledOnValidThread()); 198 DCHECK(thread_checker_.CalledOnValidThread());
192 testing_mode_ = true; 199 testing_mode_ = true;
193 connector_.set_enforce_errors_from_incoming_receiver(false); 200 connector_.set_enforce_errors_from_incoming_receiver(false);
194 } 201 }
195 202
196 bool Router::HandleIncomingMessage(Message* message) { 203 bool Router::HandleIncomingMessage(Message* message, Error* error) {
197 DCHECK(thread_checker_.CalledOnValidThread()); 204 DCHECK(thread_checker_.CalledOnValidThread());
198 205
199 const bool during_sync_call = 206 const bool during_sync_call =
200 connector_.during_sync_handle_watcher_callback(); 207 connector_.during_sync_handle_watcher_callback();
201 if (!message->has_flag(kMessageIsSync) && 208 if (!message->has_flag(kMessageIsSync) &&
202 (during_sync_call || !pending_messages_.empty())) { 209 (during_sync_call || !pending_messages_.empty())) {
203 std::unique_ptr<Message> pending_message(new Message); 210 std::unique_ptr<Message> pending_message(new Message);
204 message->MoveTo(pending_message.get()); 211 message->MoveTo(pending_message.get());
205 pending_messages_.push(std::move(pending_message)); 212 pending_messages_.push(std::move(pending_message));
206 213
207 if (!pending_task_for_messages_) { 214 if (!pending_task_for_messages_) {
208 pending_task_for_messages_ = true; 215 pending_task_for_messages_ = true;
209 connector_.task_runner()->PostTask( 216 connector_.task_runner()->PostTask(
210 FROM_HERE, base::Bind(&Router::HandleQueuedMessages, 217 FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
211 weak_factory_.GetWeakPtr())); 218 weak_factory_.GetWeakPtr()));
212 } 219 }
213 220
214 return true; 221 return true;
215 } 222 }
216 223
217 return HandleMessageInternal(message); 224 return HandleMessageInternal(message, error);
218 } 225 }
219 226
220 void Router::HandleQueuedMessages() { 227 void Router::HandleQueuedMessages() {
221 DCHECK(thread_checker_.CalledOnValidThread()); 228 DCHECK(thread_checker_.CalledOnValidThread());
222 DCHECK(pending_task_for_messages_); 229 DCHECK(pending_task_for_messages_);
223 230
224 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); 231 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
225 while (!pending_messages_.empty()) { 232 while (!pending_messages_.empty()) {
226 std::unique_ptr<Message> message(std::move(pending_messages_.front())); 233 std::unique_ptr<Message> message(std::move(pending_messages_.front()));
227 pending_messages_.pop(); 234 pending_messages_.pop();
228 235
229 bool result = HandleMessageInternal(message.get()); 236 Error error;
237 bool result = HandleMessageInternal(message.get(), &error);
230 if (!weak_self) 238 if (!weak_self)
231 return; 239 return;
232 240
233 if (!result && !testing_mode_) { 241 if (!result && !testing_mode_) {
234 connector_.RaiseError(); 242 connector_.RaiseError(std::move(error));
235 break; 243 break;
236 } 244 }
237 } 245 }
238 246
239 pending_task_for_messages_ = false; 247 pending_task_for_messages_ = false;
240 248
241 // We may have already seen a connection error from the connector, but 249 // We may have already seen a connection error from the connector, but
242 // haven't notified the user because we want to process all the queued 250 // haven't notified the user because we want to process all the queued
243 // messages first. We should do it now. 251 // messages first. We should do it now.
244 if (connector_.encountered_error() && !encountered_error_) 252 if (connector_.encountered_error() && !encountered_error_)
245 OnConnectionError(); 253 OnConnectionError();
246 } 254 }
247 255
248 bool Router::HandleMessageInternal(Message* message) { 256 bool Router::HandleMessageInternal(Message* message, Error* error) {
249 if (message->has_flag(kMessageExpectsResponse)) { 257 if (message->has_flag(kMessageExpectsResponse)) {
250 if (!incoming_receiver_) 258 if (!incoming_receiver_) {
259 *error = Error(Error::Type::REQUEST_DROPPED);
251 return false; 260 return false;
261 }
252 262
253 MessageReceiverWithStatus* responder = new ResponderThunk( 263 MessageReceiverWithStatus* responder = new ResponderThunk(
254 weak_factory_.GetWeakPtr(), connector_.task_runner()); 264 weak_factory_.GetWeakPtr(), connector_.task_runner());
255 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); 265 bool ok = incoming_receiver_->AcceptWithResponder(message, responder,
266 error);
256 if (!ok) 267 if (!ok)
257 delete responder; 268 delete responder;
258 return ok; 269 return ok;
259 270
260 } else if (message->has_flag(kMessageIsResponse)) { 271 } else if (message->has_flag(kMessageIsResponse)) {
261 uint64_t request_id = message->request_id(); 272 uint64_t request_id = message->request_id();
262 273
263 if (message->has_flag(kMessageIsSync)) { 274 if (message->has_flag(kMessageIsSync)) {
264 auto it = sync_responses_.find(request_id); 275 auto it = sync_responses_.find(request_id);
265 if (it == sync_responses_.end()) { 276 if (it == sync_responses_.end()) {
266 DCHECK(testing_mode_); 277 DCHECK(testing_mode_);
278 *error = Error::ForUnexpectedResponse(interface_name_, message);
267 return false; 279 return false;
268 } 280 }
269 it->second->response.reset(new Message()); 281 it->second->response.reset(new Message());
270 message->MoveTo(it->second->response.get()); 282 message->MoveTo(it->second->response.get());
271 *it->second->response_received = true; 283 *it->second->response_received = true;
272 return true; 284 return true;
273 } 285 }
274 286
275 auto it = async_responders_.find(request_id); 287 auto it = async_responders_.find(request_id);
276 if (it == async_responders_.end()) { 288 if (it == async_responders_.end()) {
277 DCHECK(testing_mode_); 289 DCHECK(testing_mode_);
290 *error = Error::ForUnexpectedResponse(interface_name_, message);
278 return false; 291 return false;
279 } 292 }
280 std::unique_ptr<MessageReceiver> responder = std::move(it->second); 293 std::unique_ptr<MessageReceiver> responder = std::move(it->second);
281 async_responders_.erase(it); 294 async_responders_.erase(it);
282 return responder->Accept(message); 295 return responder->Accept(message, error);
283 } else { 296 } else {
284 if (!incoming_receiver_) 297 if (!incoming_receiver_) {
298 *error = Error(Error::Type::REQUEST_DROPPED);
285 return false; 299 return false;
300 }
286 301
287 return incoming_receiver_->Accept(message); 302 return incoming_receiver_->Accept(message, error);
288 } 303 }
289 } 304 }
290 305
291 void Router::OnConnectionError() { 306 void Router::OnConnectionError() {
292 if (encountered_error_) 307 if (encountered_error_)
293 return; 308 return;
294 309
295 if (!pending_messages_.empty()) { 310 if (!pending_messages_.empty()) {
296 // After all the pending messages are processed, we will check whether an 311 // After all the pending messages are processed, we will check whether an
297 // error has been encountered and run the user's connection error handler 312 // error has been encountered and run the user's connection error handler
(...skipping 11 matching lines...) Expand all
309 } 324 }
310 325
311 encountered_error_ = true; 326 encountered_error_ = true;
312 error_handler_.Run(); 327 error_handler_.Run();
313 } 328 }
314 329
315 // ---------------------------------------------------------------------------- 330 // ----------------------------------------------------------------------------
316 331
317 } // namespace internal 332 } // namespace internal
318 } // namespace mojo 333 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698