Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(289)

Side by Side Diff: mojo/public/cpp/bindings/lib/router.cc

Issue 2064903002: Mojo: Report bindings validation errors via MojoNotifyBadMessage (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/router.h" 5 #include "mojo/public/cpp/bindings/lib/router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
(...skipping 23 matching lines...) Expand all
34 : router_(router), 34 : router_(router),
35 accept_was_invoked_(false), 35 accept_was_invoked_(false),
36 task_runner_(std::move(runner)) {} 36 task_runner_(std::move(runner)) {}
37 ~ResponderThunk() override { 37 ~ResponderThunk() override {
38 if (!accept_was_invoked_) { 38 if (!accept_was_invoked_) {
39 // The Mojo application handled a message that was expecting a response 39 // The Mojo application handled a message that was expecting a response
40 // but did not send a response. 40 // but did not send a response.
41 // We raise an error to signal the calling application that an error 41 // We raise an error to signal the calling application that an error
42 // condition occurred. Without this the calling application would have no 42 // condition occurred. Without this the calling application would have no
43 // way of knowing it should stop waiting for a response. 43 // way of knowing it should stop waiting for a response.
44 Result error(Result::Type::RESPONSE_DROPPED);
44 if (task_runner_->RunsTasksOnCurrentThread()) { 45 if (task_runner_->RunsTasksOnCurrentThread()) {
45 // Please note that even if this code is run from a different task 46 // Please note that even if this code is run from a different task
46 // runner on the same thread as |task_runner_|, it is okay to directly 47 // runner on the same thread as |task_runner_|, it is okay to directly
47 // call Router::RaiseError(), because it will raise error from the 48 // call Router::RaiseError(), because it will raise error from the
48 // correct task runner asynchronously. 49 // correct task runner asynchronously.
49 if (router_) 50 if (router_)
50 router_->RaiseError(); 51 router_->RaiseError(std::move(error));
51 } else { 52 } else {
52 task_runner_->PostTask(FROM_HERE, 53 task_runner_->PostTask(FROM_HERE,
53 base::Bind(&Router::RaiseError, router_)); 54 base::Bind(&Router::RaiseError, router_,
55 base::Passed(&error)));
54 } 56 }
55 } 57 }
56 } 58 }
57 59
58 // MessageReceiver implementation: 60 // MessageReceiver implementation:
59 bool Accept(Message* message) override { 61 Result Accept(Message* message) override {
60 DCHECK(task_runner_->RunsTasksOnCurrentThread()); 62 DCHECK(task_runner_->RunsTasksOnCurrentThread());
61 accept_was_invoked_ = true; 63 accept_was_invoked_ = true;
62 DCHECK(message->has_flag(kMessageIsResponse)); 64 DCHECK(message->has_flag(kMessageIsResponse));
63 65
64 bool result = false; 66 if (!router_)
67 return Result(Result::Type::RESPONSE_DROPPED);
65 68
66 if (router_) 69 return router_->Accept(message);
67 result = router_->Accept(message);
68
69 return result;
70 } 70 }
71 71
72 // MessageReceiverWithStatus implementation: 72 // MessageReceiverWithStatus implementation:
73 bool IsValid() override { 73 bool IsValid() override {
74 DCHECK(task_runner_->RunsTasksOnCurrentThread()); 74 DCHECK(task_runner_->RunsTasksOnCurrentThread());
75 return router_ && !router_->encountered_error() && router_->is_valid(); 75 return router_ && !router_->encountered_error() && router_->is_valid();
76 } 76 }
77 77
78 void DCheckInvalid(const std::string& message) override { 78 void DCheckInvalid(const std::string& message) override {
79 if (task_runner_->RunsTasksOnCurrentThread()) { 79 if (task_runner_->RunsTasksOnCurrentThread()) {
(...skipping 21 matching lines...) Expand all
101 101
102 // ---------------------------------------------------------------------------- 102 // ----------------------------------------------------------------------------
103 103
104 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) 104 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
105 : router_(router) { 105 : router_(router) {
106 } 106 }
107 107
108 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { 108 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() {
109 } 109 }
110 110
111 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { 111 MessageReceiver::Result Router::HandleIncomingMessageThunk::Accept(
112 Message* message) {
112 return router_->HandleIncomingMessage(message); 113 return router_->HandleIncomingMessage(message);
113 } 114 }
114 115
115 // ---------------------------------------------------------------------------- 116 // ----------------------------------------------------------------------------
116 117
117 Router::Router(ScopedMessagePipeHandle message_pipe, 118 Router::Router(ScopedMessagePipeHandle message_pipe,
118 FilterChain filters, 119 FilterChain filters,
119 bool expects_sync_requests, 120 bool expects_sync_requests,
120 scoped_refptr<base::SingleThreadTaskRunner> runner) 121 scoped_refptr<base::SingleThreadTaskRunner> runner)
121 : thunk_(this), 122 : thunk_(this),
122 filters_(std::move(filters)), 123 filters_(std::move(filters)),
123 connector_(std::move(message_pipe), 124 connector_(std::move(message_pipe),
124 Connector::SINGLE_THREADED_SEND, 125 Connector::SINGLE_THREADED_SEND,
125 std::move(runner)), 126 std::move(runner)),
126 incoming_receiver_(nullptr), 127 incoming_receiver_(nullptr),
127 next_request_id_(0), 128 next_request_id_(0),
128 testing_mode_(false), 129 testing_mode_(false),
129 pending_task_for_messages_(false), 130 pending_task_for_messages_(false),
130 encountered_error_(false), 131 encountered_error_(false),
131 weak_factory_(this) { 132 weak_factory_(this) {
132 filters_.SetSink(&thunk_); 133 filters_.SetSink(&thunk_);
133 if (expects_sync_requests) 134 if (expects_sync_requests)
134 connector_.AllowWokenUpBySyncWatchOnSameThread(); 135 connector_.AllowWokenUpBySyncWatchOnSameThread();
135 connector_.set_incoming_receiver(filters_.GetHead()); 136 connector_.set_incoming_receiver(filters_.GetHead());
136 connector_.set_connection_error_handler([this]() { OnConnectionError(); }); 137 connector_.set_connection_error_handler([this]() { OnConnectionError(); });
137 } 138 }
138 139
139 Router::~Router() {} 140 Router::~Router() {}
140 141
141 bool Router::Accept(Message* message) { 142 MessageReceiver::Result Router::Accept(Message* message) {
142 DCHECK(thread_checker_.CalledOnValidThread()); 143 DCHECK(thread_checker_.CalledOnValidThread());
143 DCHECK(!message->has_flag(kMessageExpectsResponse)); 144 DCHECK(!message->has_flag(kMessageExpectsResponse));
144 return connector_.Accept(message); 145 return connector_.Accept(message);
145 } 146 }
146 147
147 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { 148 MessageReceiver::Result Router::AcceptWithResponder(
149 Message* message,
150 MessageReceiver* responder) {
148 DCHECK(thread_checker_.CalledOnValidThread()); 151 DCHECK(thread_checker_.CalledOnValidThread());
149 DCHECK(message->has_flag(kMessageExpectsResponse)); 152 DCHECK(message->has_flag(kMessageExpectsResponse));
150 153
151 // Reserve 0 in case we want it to convey special meaning in the future. 154 // Reserve 0 in case we want it to convey special meaning in the future.
152 uint64_t request_id = next_request_id_++; 155 uint64_t request_id = next_request_id_++;
153 if (request_id == 0) 156 if (request_id == 0)
154 request_id = next_request_id_++; 157 request_id = next_request_id_++;
155 158
156 bool is_sync = message->has_flag(kMessageIsSync); 159 bool is_sync = message->has_flag(kMessageIsSync);
157 message->set_request_id(request_id); 160 message->set_request_id(request_id);
158 if (!connector_.Accept(message)) 161 if (!connector_.Accept(message).Succeeded())
159 return false; 162 return Result(Result::Type::SEND_FAILED);
160 163
161 if (!is_sync) { 164 if (!is_sync) {
162 // We assume ownership of |responder|. 165 // We assume ownership of |responder|.
163 async_responders_[request_id] = base::WrapUnique(responder); 166 async_responders_[request_id] = base::WrapUnique(responder);
164 return true; 167 return Result::ForSuccess();
165 } 168 }
166 169
167 bool response_received = false; 170 bool response_received = false;
168 std::unique_ptr<MessageReceiver> sync_responder(responder); 171 std::unique_ptr<MessageReceiver> sync_responder(responder);
169 sync_responses_.insert(std::make_pair( 172 sync_responses_.insert(std::make_pair(
170 request_id, base::WrapUnique(new SyncResponseInfo(&response_received)))); 173 request_id, base::WrapUnique(new SyncResponseInfo(&response_received))));
171 174
172 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); 175 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
173 connector_.SyncWatch(&response_received); 176 connector_.SyncWatch(&response_received);
174 // Make sure that this instance hasn't been destroyed. 177 // Make sure that this instance hasn't been destroyed.
175 if (weak_self) { 178 if (weak_self) {
176 DCHECK(ContainsKey(sync_responses_, request_id)); 179 DCHECK(ContainsKey(sync_responses_, request_id));
177 auto iter = sync_responses_.find(request_id); 180 auto iter = sync_responses_.find(request_id);
178 DCHECK_EQ(&response_received, iter->second->response_received); 181 DCHECK_EQ(&response_received, iter->second->response_received);
179 if (response_received) { 182 if (response_received) {
180 std::unique_ptr<Message> response = std::move(iter->second->response); 183 std::unique_ptr<Message> response = std::move(iter->second->response);
181 ignore_result(sync_responder->Accept(response.get())); 184 ignore_result(sync_responder->Accept(response.get()));
182 } 185 }
183 sync_responses_.erase(iter); 186 sync_responses_.erase(iter);
184 } 187 }
185 188
186 // Return true means that we take ownership of |responder|. 189 // Success means that we take ownership of |responder|.
187 return true; 190 return Result::ForSuccess();
188 } 191 }
189 192
190 void Router::EnableTestingMode() { 193 void Router::EnableTestingMode() {
191 DCHECK(thread_checker_.CalledOnValidThread()); 194 DCHECK(thread_checker_.CalledOnValidThread());
192 testing_mode_ = true; 195 testing_mode_ = true;
193 connector_.set_enforce_errors_from_incoming_receiver(false); 196 connector_.set_enforce_errors_from_incoming_receiver(false);
194 } 197 }
195 198
196 bool Router::HandleIncomingMessage(Message* message) { 199 MessageReceiver::Result Router::HandleIncomingMessage(Message* message) {
197 DCHECK(thread_checker_.CalledOnValidThread()); 200 DCHECK(thread_checker_.CalledOnValidThread());
198 201
199 const bool during_sync_call = 202 const bool during_sync_call =
200 connector_.during_sync_handle_watcher_callback(); 203 connector_.during_sync_handle_watcher_callback();
201 if (!message->has_flag(kMessageIsSync) && 204 if (!message->has_flag(kMessageIsSync) &&
202 (during_sync_call || !pending_messages_.empty())) { 205 (during_sync_call || !pending_messages_.empty())) {
203 std::unique_ptr<Message> pending_message(new Message); 206 std::unique_ptr<Message> pending_message(new Message);
204 message->MoveTo(pending_message.get()); 207 message->MoveTo(pending_message.get());
205 pending_messages_.push(std::move(pending_message)); 208 pending_messages_.push(std::move(pending_message));
206 209
207 if (!pending_task_for_messages_) { 210 if (!pending_task_for_messages_) {
208 pending_task_for_messages_ = true; 211 pending_task_for_messages_ = true;
209 connector_.task_runner()->PostTask( 212 connector_.task_runner()->PostTask(
210 FROM_HERE, base::Bind(&Router::HandleQueuedMessages, 213 FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
211 weak_factory_.GetWeakPtr())); 214 weak_factory_.GetWeakPtr()));
212 } 215 }
213 216
214 return true; 217 return Result::ForSuccess();
215 } 218 }
216 219
217 return HandleMessageInternal(message); 220 return HandleMessageInternal(message);
218 } 221 }
219 222
220 void Router::HandleQueuedMessages() { 223 void Router::HandleQueuedMessages() {
221 DCHECK(thread_checker_.CalledOnValidThread()); 224 DCHECK(thread_checker_.CalledOnValidThread());
222 DCHECK(pending_task_for_messages_); 225 DCHECK(pending_task_for_messages_);
223 226
224 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); 227 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
225 while (!pending_messages_.empty()) { 228 while (!pending_messages_.empty()) {
226 std::unique_ptr<Message> message(std::move(pending_messages_.front())); 229 std::unique_ptr<Message> message(std::move(pending_messages_.front()));
227 pending_messages_.pop(); 230 pending_messages_.pop();
228 231
229 bool result = HandleMessageInternal(message.get()); 232 Result result = HandleMessageInternal(message.get());
230 if (!weak_self) 233 if (!weak_self)
231 return; 234 return;
232 235
233 if (!result && !testing_mode_) { 236 if (!result.Succeeded() && !testing_mode_) {
234 connector_.RaiseError(); 237 connector_.RaiseError(std::move(result));
235 break; 238 break;
236 } 239 }
237 } 240 }
238 241
239 pending_task_for_messages_ = false; 242 pending_task_for_messages_ = false;
240 243
241 // We may have already seen a connection error from the connector, but 244 // We may have already seen a connection error from the connector, but
242 // haven't notified the user because we want to process all the queued 245 // haven't notified the user because we want to process all the queued
243 // messages first. We should do it now. 246 // messages first. We should do it now.
244 if (connector_.encountered_error() && !encountered_error_) 247 if (connector_.encountered_error() && !encountered_error_)
245 OnConnectionError(); 248 OnConnectionError();
246 } 249 }
247 250
248 bool Router::HandleMessageInternal(Message* message) { 251 MessageReceiver::Result Router::HandleMessageInternal(Message* message) {
249 if (message->has_flag(kMessageExpectsResponse)) { 252 if (message->has_flag(kMessageExpectsResponse)) {
250 if (!incoming_receiver_) 253 if (!incoming_receiver_)
251 return false; 254 return Result(Result::Type::REQUEST_DROPPED);
252 255
253 MessageReceiverWithStatus* responder = new ResponderThunk( 256 MessageReceiverWithStatus* responder = new ResponderThunk(
254 weak_factory_.GetWeakPtr(), connector_.task_runner()); 257 weak_factory_.GetWeakPtr(), connector_.task_runner());
255 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); 258 Result result = incoming_receiver_->AcceptWithResponder(message, responder);
256 if (!ok) 259 if (!result.Succeeded())
257 delete responder; 260 delete responder;
258 return ok; 261 return result;
259 262
260 } else if (message->has_flag(kMessageIsResponse)) { 263 } else if (message->has_flag(kMessageIsResponse)) {
261 uint64_t request_id = message->request_id(); 264 uint64_t request_id = message->request_id();
262 265
263 if (message->has_flag(kMessageIsSync)) { 266 if (message->has_flag(kMessageIsSync)) {
264 auto it = sync_responses_.find(request_id); 267 auto it = sync_responses_.find(request_id);
265 if (it == sync_responses_.end()) { 268 if (it == sync_responses_.end()) {
266 DCHECK(testing_mode_); 269 DCHECK(testing_mode_);
267 return false; 270 return Result::ForUnexpectedResponse(interface_name_, message);
268 } 271 }
269 it->second->response.reset(new Message()); 272 it->second->response.reset(new Message());
270 message->MoveTo(it->second->response.get()); 273 message->MoveTo(it->second->response.get());
271 *it->second->response_received = true; 274 *it->second->response_received = true;
272 return true; 275 return Result::ForSuccess();
273 } 276 }
274 277
275 auto it = async_responders_.find(request_id); 278 auto it = async_responders_.find(request_id);
276 if (it == async_responders_.end()) { 279 if (it == async_responders_.end()) {
277 DCHECK(testing_mode_); 280 DCHECK(testing_mode_);
278 return false; 281 return Result::ForUnexpectedResponse(interface_name_, message);
279 } 282 }
280 std::unique_ptr<MessageReceiver> responder = std::move(it->second); 283 std::unique_ptr<MessageReceiver> responder = std::move(it->second);
281 async_responders_.erase(it); 284 async_responders_.erase(it);
282 return responder->Accept(message); 285 return responder->Accept(message);
283 } else { 286 } else {
284 if (!incoming_receiver_) 287 if (!incoming_receiver_)
285 return false; 288 return Result(Result::Type::REQUEST_DROPPED);
286 289
287 return incoming_receiver_->Accept(message); 290 return incoming_receiver_->Accept(message);
288 } 291 }
289 } 292 }
290 293
291 void Router::OnConnectionError() { 294 void Router::OnConnectionError() {
292 if (encountered_error_) 295 if (encountered_error_)
293 return; 296 return;
294 297
295 if (!pending_messages_.empty()) { 298 if (!pending_messages_.empty()) {
(...skipping 13 matching lines...) Expand all
309 } 312 }
310 313
311 encountered_error_ = true; 314 encountered_error_ = true;
312 error_handler_.Run(); 315 error_handler_.Run();
313 } 316 }
314 317
315 // ---------------------------------------------------------------------------- 318 // ----------------------------------------------------------------------------
316 319
317 } // namespace internal 320 } // namespace internal
318 } // namespace mojo 321 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698