| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "mojo/public/cpp/bindings/lib/router.h" | |
| 6 | |
| 7 #include <stdint.h> | |
| 8 | |
| 9 #include <utility> | |
| 10 | |
| 11 #include "base/bind.h" | |
| 12 #include "base/location.h" | |
| 13 #include "base/logging.h" | |
| 14 #include "base/memory/ptr_util.h" | |
| 15 #include "base/stl_util.h" | |
| 16 #include "mojo/public/cpp/bindings/lib/validation_util.h" | |
| 17 #include "mojo/public/cpp/bindings/sync_call_restrictions.h" | |
| 18 | |
| 19 namespace mojo { | |
| 20 namespace internal { | |
| 21 | |
| 22 // ---------------------------------------------------------------------------- | |
| 23 | |
| 24 namespace { | |
| 25 | |
| 26 void DCheckIfInvalid(const base::WeakPtr<Router>& router, | |
| 27 const std::string& message) { | |
| 28 bool is_valid = router && !router->encountered_error() && router->is_valid(); | |
| 29 DCHECK(!is_valid) << message; | |
| 30 } | |
| 31 | |
| 32 class ResponderThunk : public MessageReceiverWithStatus { | |
| 33 public: | |
| 34 explicit ResponderThunk(const base::WeakPtr<Router>& router, | |
| 35 scoped_refptr<base::SingleThreadTaskRunner> runner) | |
| 36 : router_(router), | |
| 37 accept_was_invoked_(false), | |
| 38 task_runner_(std::move(runner)) {} | |
| 39 ~ResponderThunk() override { | |
| 40 if (!accept_was_invoked_) { | |
| 41 // The Service handled a message that was expecting a response | |
| 42 // but did not send a response. | |
| 43 // We raise an error to signal the calling application that an error | |
| 44 // condition occurred. Without this the calling application would have no | |
| 45 // way of knowing it should stop waiting for a response. | |
| 46 if (task_runner_->RunsTasksOnCurrentThread()) { | |
| 47 // Please note that even if this code is run from a different task | |
| 48 // runner on the same thread as |task_runner_|, it is okay to directly | |
| 49 // call Router::RaiseError(), because it will raise error from the | |
| 50 // correct task runner asynchronously. | |
| 51 if (router_) | |
| 52 router_->RaiseError(); | |
| 53 } else { | |
| 54 task_runner_->PostTask(FROM_HERE, | |
| 55 base::Bind(&Router::RaiseError, router_)); | |
| 56 } | |
| 57 } | |
| 58 } | |
| 59 | |
| 60 // MessageReceiver implementation: | |
| 61 bool Accept(Message* message) override { | |
| 62 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | |
| 63 accept_was_invoked_ = true; | |
| 64 DCHECK(message->has_flag(Message::kFlagIsResponse)); | |
| 65 | |
| 66 bool result = false; | |
| 67 | |
| 68 if (router_) | |
| 69 result = router_->Accept(message); | |
| 70 | |
| 71 return result; | |
| 72 } | |
| 73 | |
| 74 // MessageReceiverWithStatus implementation: | |
| 75 bool IsValid() override { | |
| 76 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | |
| 77 return router_ && !router_->encountered_error() && router_->is_valid(); | |
| 78 } | |
| 79 | |
| 80 void DCheckInvalid(const std::string& message) override { | |
| 81 if (task_runner_->RunsTasksOnCurrentThread()) { | |
| 82 DCheckIfInvalid(router_, message); | |
| 83 } else { | |
| 84 task_runner_->PostTask(FROM_HERE, | |
| 85 base::Bind(&DCheckIfInvalid, router_, message)); | |
| 86 } | |
| 87 } | |
| 88 | |
| 89 private: | |
| 90 base::WeakPtr<Router> router_; | |
| 91 bool accept_was_invoked_; | |
| 92 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | |
| 93 }; | |
| 94 | |
| 95 } // namespace | |
| 96 | |
| 97 // ---------------------------------------------------------------------------- | |
| 98 | |
| 99 Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received) | |
| 100 : response_received(in_response_received) {} | |
| 101 | |
| 102 Router::SyncResponseInfo::~SyncResponseInfo() {} | |
| 103 | |
| 104 // ---------------------------------------------------------------------------- | |
| 105 | |
| 106 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) | |
| 107 : router_(router) { | |
| 108 } | |
| 109 | |
| 110 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { | |
| 111 } | |
| 112 | |
| 113 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { | |
| 114 return router_->HandleIncomingMessage(message); | |
| 115 } | |
| 116 | |
| 117 // ---------------------------------------------------------------------------- | |
| 118 | |
| 119 Router::Router(ScopedMessagePipeHandle message_pipe, | |
| 120 FilterChain filters, | |
| 121 bool expects_sync_requests, | |
| 122 scoped_refptr<base::SingleThreadTaskRunner> runner, | |
| 123 int interface_version) | |
| 124 : thunk_(this), | |
| 125 filters_(std::move(filters)), | |
| 126 connector_(std::move(message_pipe), | |
| 127 Connector::SINGLE_THREADED_SEND, | |
| 128 std::move(runner)), | |
| 129 incoming_receiver_(nullptr), | |
| 130 next_request_id_(0), | |
| 131 testing_mode_(false), | |
| 132 pending_task_for_messages_(false), | |
| 133 encountered_error_(false), | |
| 134 control_message_proxy_(this), | |
| 135 control_message_handler_(interface_version), | |
| 136 weak_factory_(this) { | |
| 137 filters_.SetSink(&thunk_); | |
| 138 if (expects_sync_requests) | |
| 139 connector_.AllowWokenUpBySyncWatchOnSameThread(); | |
| 140 connector_.set_incoming_receiver(&filters_); | |
| 141 connector_.set_connection_error_handler( | |
| 142 base::Bind(&Router::OnConnectionError, base::Unretained(this))); | |
| 143 } | |
| 144 | |
| 145 Router::~Router() {} | |
| 146 | |
| 147 void Router::AddFilter(std::unique_ptr<MessageReceiver> filter) { | |
| 148 filters_.Append(std::move(filter)); | |
| 149 } | |
| 150 | |
| 151 bool Router::Accept(Message* message) { | |
| 152 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 153 DCHECK(!message->has_flag(Message::kFlagExpectsResponse)); | |
| 154 return connector_.Accept(message); | |
| 155 } | |
| 156 | |
| 157 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { | |
| 158 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 159 DCHECK(message->has_flag(Message::kFlagExpectsResponse)); | |
| 160 | |
| 161 // Reserve 0 in case we want it to convey special meaning in the future. | |
| 162 uint64_t request_id = next_request_id_++; | |
| 163 if (request_id == 0) | |
| 164 request_id = next_request_id_++; | |
| 165 | |
| 166 bool is_sync = message->has_flag(Message::kFlagIsSync); | |
| 167 message->set_request_id(request_id); | |
| 168 if (!connector_.Accept(message)) | |
| 169 return false; | |
| 170 | |
| 171 if (!is_sync) { | |
| 172 // We assume ownership of |responder|. | |
| 173 async_responders_[request_id] = base::WrapUnique(responder); | |
| 174 return true; | |
| 175 } | |
| 176 | |
| 177 SyncCallRestrictions::AssertSyncCallAllowed(); | |
| 178 | |
| 179 bool response_received = false; | |
| 180 std::unique_ptr<MessageReceiver> sync_responder(responder); | |
| 181 sync_responses_.insert(std::make_pair( | |
| 182 request_id, base::MakeUnique<SyncResponseInfo>(&response_received))); | |
| 183 | |
| 184 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); | |
| 185 connector_.SyncWatch(&response_received); | |
| 186 // Make sure that this instance hasn't been destroyed. | |
| 187 if (weak_self) { | |
| 188 DCHECK(base::ContainsKey(sync_responses_, request_id)); | |
| 189 auto iter = sync_responses_.find(request_id); | |
| 190 DCHECK_EQ(&response_received, iter->second->response_received); | |
| 191 if (response_received) | |
| 192 ignore_result(sync_responder->Accept(&iter->second->response)); | |
| 193 sync_responses_.erase(iter); | |
| 194 } | |
| 195 | |
| 196 // Return true means that we take ownership of |responder|. | |
| 197 return true; | |
| 198 } | |
| 199 | |
| 200 void Router::EnableTestingMode() { | |
| 201 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 202 testing_mode_ = true; | |
| 203 connector_.set_enforce_errors_from_incoming_receiver(false); | |
| 204 } | |
| 205 | |
| 206 bool Router::HandleIncomingMessage(Message* message) { | |
| 207 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 208 | |
| 209 const bool during_sync_call = | |
| 210 connector_.during_sync_handle_watcher_callback(); | |
| 211 if (!message->has_flag(Message::kFlagIsSync) && | |
| 212 (during_sync_call || !pending_messages_.empty())) { | |
| 213 pending_messages_.emplace(std::move(*message)); | |
| 214 | |
| 215 if (!pending_task_for_messages_) { | |
| 216 pending_task_for_messages_ = true; | |
| 217 connector_.task_runner()->PostTask( | |
| 218 FROM_HERE, base::Bind(&Router::HandleQueuedMessages, | |
| 219 weak_factory_.GetWeakPtr())); | |
| 220 } | |
| 221 | |
| 222 return true; | |
| 223 } | |
| 224 | |
| 225 return HandleMessageInternal(message); | |
| 226 } | |
| 227 | |
| 228 void Router::HandleQueuedMessages() { | |
| 229 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 230 DCHECK(pending_task_for_messages_); | |
| 231 | |
| 232 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); | |
| 233 while (!pending_messages_.empty()) { | |
| 234 Message message(std::move(pending_messages_.front())); | |
| 235 pending_messages_.pop(); | |
| 236 | |
| 237 bool result = HandleMessageInternal(&message); | |
| 238 if (!weak_self) | |
| 239 return; | |
| 240 | |
| 241 if (!result && !testing_mode_) { | |
| 242 connector_.RaiseError(); | |
| 243 break; | |
| 244 } | |
| 245 } | |
| 246 | |
| 247 pending_task_for_messages_ = false; | |
| 248 | |
| 249 // We may have already seen a connection error from the connector, but | |
| 250 // haven't notified the user because we want to process all the queued | |
| 251 // messages first. We should do it now. | |
| 252 if (connector_.encountered_error() && !encountered_error_) | |
| 253 OnConnectionError(); | |
| 254 } | |
| 255 | |
| 256 bool Router::HandleMessageInternal(Message* message) { | |
| 257 DCHECK(!encountered_error_); | |
| 258 | |
| 259 if (message->has_flag(Message::kFlagExpectsResponse)) { | |
| 260 MessageReceiverWithStatus* responder = new ResponderThunk( | |
| 261 weak_factory_.GetWeakPtr(), connector_.task_runner()); | |
| 262 bool ok = false; | |
| 263 if (mojo::internal::ControlMessageHandler::IsControlMessage(message)) { | |
| 264 ok = control_message_handler_.AcceptWithResponder(message, responder); | |
| 265 } else { | |
| 266 ok = incoming_receiver_->AcceptWithResponder(message, responder); | |
| 267 } | |
| 268 if (!ok) | |
| 269 delete responder; | |
| 270 return ok; | |
| 271 | |
| 272 } else if (message->has_flag(Message::kFlagIsResponse)) { | |
| 273 uint64_t request_id = message->request_id(); | |
| 274 | |
| 275 if (message->has_flag(Message::kFlagIsSync)) { | |
| 276 auto it = sync_responses_.find(request_id); | |
| 277 if (it == sync_responses_.end()) { | |
| 278 DCHECK(testing_mode_); | |
| 279 return false; | |
| 280 } | |
| 281 it->second->response = std::move(*message); | |
| 282 *it->second->response_received = true; | |
| 283 return true; | |
| 284 } | |
| 285 | |
| 286 auto it = async_responders_.find(request_id); | |
| 287 if (it == async_responders_.end()) { | |
| 288 DCHECK(testing_mode_); | |
| 289 return false; | |
| 290 } | |
| 291 std::unique_ptr<MessageReceiver> responder = std::move(it->second); | |
| 292 async_responders_.erase(it); | |
| 293 return responder->Accept(message); | |
| 294 } else { | |
| 295 if (mojo::internal::ControlMessageHandler::IsControlMessage(message)) | |
| 296 return control_message_handler_.Accept(message); | |
| 297 | |
| 298 return incoming_receiver_->Accept(message); | |
| 299 } | |
| 300 } | |
| 301 | |
| 302 void Router::OnConnectionError() { | |
| 303 if (encountered_error_) | |
| 304 return; | |
| 305 | |
| 306 if (!pending_messages_.empty()) { | |
| 307 // After all the pending messages are processed, we will check whether an | |
| 308 // error has been encountered and run the user's connection error handler | |
| 309 // if necessary. | |
| 310 DCHECK(pending_task_for_messages_); | |
| 311 return; | |
| 312 } | |
| 313 | |
| 314 if (connector_.during_sync_handle_watcher_callback()) { | |
| 315 // We don't want the error handler to reenter an ongoing sync call. | |
| 316 connector_.task_runner()->PostTask( | |
| 317 FROM_HERE, | |
| 318 base::Bind(&Router::OnConnectionError, weak_factory_.GetWeakPtr())); | |
| 319 return; | |
| 320 } | |
| 321 | |
| 322 control_message_proxy_.OnConnectionError(); | |
| 323 | |
| 324 encountered_error_ = true; | |
| 325 | |
| 326 // Response callbacks may hold on to resource, and there's no need to keep | |
| 327 // them alive any longer. Note that it's allowed that a pending response | |
| 328 // callback may own this endpoint, so we simply move the responders onto the | |
| 329 // stack here and let them be destroyed when the stack unwinds. | |
| 330 AsyncResponderMap responders = std::move(async_responders_); | |
| 331 | |
| 332 if (!error_handler_.is_null()) { | |
| 333 error_handler_.Run(); | |
| 334 } else if (!error_with_reason_handler_.is_null()) { | |
| 335 // Make a copy on the stack. If we directly pass a reference to a member of | |
| 336 // |control_message_handler_|, that reference will be invalidated as soon as | |
| 337 // the user destroys the interface endpoint. | |
| 338 std::string description = control_message_handler_.disconnect_description(); | |
| 339 error_with_reason_handler_.Run( | |
| 340 control_message_handler_.disconnect_custom_reason(), description); | |
| 341 } | |
| 342 } | |
| 343 | |
| 344 // ---------------------------------------------------------------------------- | |
| 345 | |
| 346 } // namespace internal | |
| 347 } // namespace mojo | |
| OLD | NEW |