| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/router.h" | 5 #include "mojo/public/cpp/bindings/lib/router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| 11 #include "base/bind.h" | 11 #include "base/bind.h" |
| 12 #include "base/location.h" | 12 #include "base/location.h" |
| 13 #include "base/logging.h" | 13 #include "base/logging.h" |
| 14 #include "base/memory/ptr_util.h" | 14 #include "base/memory/ptr_util.h" |
| 15 #include "base/stl_util.h" | 15 #include "base/stl_util.h" |
| 16 #include "mojo/public/cpp/bindings/lib/validation_util.h" |
| 16 #include "mojo/public/cpp/bindings/sync_call_restrictions.h" | 17 #include "mojo/public/cpp/bindings/sync_call_restrictions.h" |
| 17 | 18 |
| 18 namespace mojo { | 19 namespace mojo { |
| 19 namespace internal { | 20 namespace internal { |
| 20 | 21 |
| 21 // ---------------------------------------------------------------------------- | 22 // ---------------------------------------------------------------------------- |
| 22 | 23 |
| 23 namespace { | 24 namespace { |
| 24 | 25 |
| 25 void DCheckIfInvalid(const base::WeakPtr<Router>& router, | 26 void DCheckIfInvalid(const base::WeakPtr<Router>& router, |
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 111 | 112 |
| 112 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { | 113 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { |
| 113 return router_->HandleIncomingMessage(message); | 114 return router_->HandleIncomingMessage(message); |
| 114 } | 115 } |
| 115 | 116 |
| 116 // ---------------------------------------------------------------------------- | 117 // ---------------------------------------------------------------------------- |
| 117 | 118 |
| 118 Router::Router(ScopedMessagePipeHandle message_pipe, | 119 Router::Router(ScopedMessagePipeHandle message_pipe, |
| 119 FilterChain filters, | 120 FilterChain filters, |
| 120 bool expects_sync_requests, | 121 bool expects_sync_requests, |
| 121 scoped_refptr<base::SingleThreadTaskRunner> runner) | 122 scoped_refptr<base::SingleThreadTaskRunner> runner, |
| 123 int interface_version) |
| 122 : thunk_(this), | 124 : thunk_(this), |
| 123 filters_(std::move(filters)), | 125 filters_(std::move(filters)), |
| 124 connector_(std::move(message_pipe), | 126 connector_(std::move(message_pipe), |
| 125 Connector::SINGLE_THREADED_SEND, | 127 Connector::SINGLE_THREADED_SEND, |
| 126 std::move(runner)), | 128 std::move(runner)), |
| 127 incoming_receiver_(nullptr), | 129 incoming_receiver_(nullptr), |
| 128 next_request_id_(0), | 130 next_request_id_(0), |
| 129 testing_mode_(false), | 131 testing_mode_(false), |
| 130 pending_task_for_messages_(false), | 132 pending_task_for_messages_(false), |
| 131 encountered_error_(false), | 133 encountered_error_(false), |
| 134 control_message_proxy_(this), |
| 135 control_message_handler_(interface_version), |
| 132 weak_factory_(this) { | 136 weak_factory_(this) { |
| 133 filters_.SetSink(&thunk_); | 137 filters_.SetSink(&thunk_); |
| 134 if (expects_sync_requests) | 138 if (expects_sync_requests) |
| 135 connector_.AllowWokenUpBySyncWatchOnSameThread(); | 139 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
| 136 connector_.set_incoming_receiver(&filters_); | 140 connector_.set_incoming_receiver(&filters_); |
| 137 connector_.set_connection_error_handler( | 141 connector_.set_connection_error_handler( |
| 138 base::Bind(&Router::OnConnectionError, base::Unretained(this))); | 142 base::Bind(&Router::OnConnectionError, base::Unretained(this))); |
| 139 } | 143 } |
| 140 | 144 |
| 141 Router::~Router() {} | 145 Router::~Router() {} |
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 246 // haven't notified the user because we want to process all the queued | 250 // haven't notified the user because we want to process all the queued |
| 247 // messages first. We should do it now. | 251 // messages first. We should do it now. |
| 248 if (connector_.encountered_error() && !encountered_error_) | 252 if (connector_.encountered_error() && !encountered_error_) |
| 249 OnConnectionError(); | 253 OnConnectionError(); |
| 250 } | 254 } |
| 251 | 255 |
| 252 bool Router::HandleMessageInternal(Message* message) { | 256 bool Router::HandleMessageInternal(Message* message) { |
| 253 DCHECK(!encountered_error_); | 257 DCHECK(!encountered_error_); |
| 254 | 258 |
| 255 if (message->has_flag(Message::kFlagExpectsResponse)) { | 259 if (message->has_flag(Message::kFlagExpectsResponse)) { |
| 256 if (!incoming_receiver_) | |
| 257 return false; | |
| 258 | |
| 259 MessageReceiverWithStatus* responder = new ResponderThunk( | 260 MessageReceiverWithStatus* responder = new ResponderThunk( |
| 260 weak_factory_.GetWeakPtr(), connector_.task_runner()); | 261 weak_factory_.GetWeakPtr(), connector_.task_runner()); |
| 261 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); | 262 bool ok = false; |
| 263 if (mojo::internal::ControlMessageHandler::IsControlMessage(message)) { |
| 264 ok = control_message_handler_.AcceptWithResponder(message, responder); |
| 265 } else { |
| 266 ok = incoming_receiver_->AcceptWithResponder(message, responder); |
| 267 } |
| 262 if (!ok) | 268 if (!ok) |
| 263 delete responder; | 269 delete responder; |
| 264 return ok; | 270 return ok; |
| 265 | 271 |
| 266 } else if (message->has_flag(Message::kFlagIsResponse)) { | 272 } else if (message->has_flag(Message::kFlagIsResponse)) { |
| 267 uint64_t request_id = message->request_id(); | 273 uint64_t request_id = message->request_id(); |
| 268 | 274 |
| 269 if (message->has_flag(Message::kFlagIsSync)) { | 275 if (message->has_flag(Message::kFlagIsSync)) { |
| 270 auto it = sync_responses_.find(request_id); | 276 auto it = sync_responses_.find(request_id); |
| 271 if (it == sync_responses_.end()) { | 277 if (it == sync_responses_.end()) { |
| 272 DCHECK(testing_mode_); | 278 DCHECK(testing_mode_); |
| 273 return false; | 279 return false; |
| 274 } | 280 } |
| 275 it->second->response = std::move(*message); | 281 it->second->response = std::move(*message); |
| 276 *it->second->response_received = true; | 282 *it->second->response_received = true; |
| 277 return true; | 283 return true; |
| 278 } | 284 } |
| 279 | 285 |
| 280 auto it = async_responders_.find(request_id); | 286 auto it = async_responders_.find(request_id); |
| 281 if (it == async_responders_.end()) { | 287 if (it == async_responders_.end()) { |
| 282 DCHECK(testing_mode_); | 288 DCHECK(testing_mode_); |
| 283 return false; | 289 return false; |
| 284 } | 290 } |
| 285 std::unique_ptr<MessageReceiver> responder = std::move(it->second); | 291 std::unique_ptr<MessageReceiver> responder = std::move(it->second); |
| 286 async_responders_.erase(it); | 292 async_responders_.erase(it); |
| 287 return responder->Accept(message); | 293 return responder->Accept(message); |
| 288 } else { | 294 } else { |
| 289 if (!incoming_receiver_) | 295 if (mojo::internal::ControlMessageHandler::IsControlMessage(message)) |
| 290 return false; | 296 return control_message_handler_.Accept(message); |
| 291 | 297 |
| 292 return incoming_receiver_->Accept(message); | 298 return incoming_receiver_->Accept(message); |
| 293 } | 299 } |
| 294 } | 300 } |
| 295 | 301 |
| 296 void Router::OnConnectionError() { | 302 void Router::OnConnectionError() { |
| 297 if (encountered_error_) | 303 if (encountered_error_) |
| 298 return; | 304 return; |
| 299 | 305 |
| 300 if (!pending_messages_.empty()) { | 306 if (!pending_messages_.empty()) { |
| 301 // After all the pending messages are processed, we will check whether an | 307 // After all the pending messages are processed, we will check whether an |
| 302 // error has been encountered and run the user's connection error handler | 308 // error has been encountered and run the user's connection error handler |
| 303 // if necessary. | 309 // if necessary. |
| 304 DCHECK(pending_task_for_messages_); | 310 DCHECK(pending_task_for_messages_); |
| 305 return; | 311 return; |
| 306 } | 312 } |
| 307 | 313 |
| 308 if (connector_.during_sync_handle_watcher_callback()) { | 314 if (connector_.during_sync_handle_watcher_callback()) { |
| 309 // We don't want the error handler to reenter an ongoing sync call. | 315 // We don't want the error handler to reenter an ongoing sync call. |
| 310 connector_.task_runner()->PostTask( | 316 connector_.task_runner()->PostTask( |
| 311 FROM_HERE, | 317 FROM_HERE, |
| 312 base::Bind(&Router::OnConnectionError, weak_factory_.GetWeakPtr())); | 318 base::Bind(&Router::OnConnectionError, weak_factory_.GetWeakPtr())); |
| 313 return; | 319 return; |
| 314 } | 320 } |
| 315 | 321 |
| 322 control_message_proxy_.OnConnectionError(); |
| 323 |
| 316 encountered_error_ = true; | 324 encountered_error_ = true; |
| 317 | 325 |
| 318 // The callbacks may hold on to resources. There is no need to keep them any | 326 // The callbacks may hold on to resources. There is no need to keep them any |
| 319 // longer. | 327 // longer. |
| 320 async_responders_.clear(); | 328 async_responders_.clear(); |
| 321 | 329 |
| 322 if (!error_handler_.is_null()) | 330 if (!error_handler_.is_null()) |
| 323 error_handler_.Run(); | 331 error_handler_.Run(); |
| 324 } | 332 } |
| 325 | 333 |
| 326 // ---------------------------------------------------------------------------- | 334 // ---------------------------------------------------------------------------- |
| 327 | 335 |
| 328 } // namespace internal | 336 } // namespace internal |
| 329 } // namespace mojo | 337 } // namespace mojo |
| OLD | NEW |