| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/router.h" | 5 #include "mojo/public/cpp/bindings/lib/router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| (...skipping 23 matching lines...) Expand all Loading... |
| 34 : router_(router), | 34 : router_(router), |
| 35 accept_was_invoked_(false), | 35 accept_was_invoked_(false), |
| 36 task_runner_(std::move(runner)) {} | 36 task_runner_(std::move(runner)) {} |
| 37 ~ResponderThunk() override { | 37 ~ResponderThunk() override { |
| 38 if (!accept_was_invoked_) { | 38 if (!accept_was_invoked_) { |
| 39 // The Mojo application handled a message that was expecting a response | 39 // The Mojo application handled a message that was expecting a response |
| 40 // but did not send a response. | 40 // but did not send a response. |
| 41 // We raise an error to signal the calling application that an error | 41 // We raise an error to signal the calling application that an error |
| 42 // condition occurred. Without this the calling application would have no | 42 // condition occurred. Without this the calling application would have no |
| 43 // way of knowing it should stop waiting for a response. | 43 // way of knowing it should stop waiting for a response. |
| 44 Error error(Error::Type::REQUEST_CANCELED); |
| 44 if (task_runner_->RunsTasksOnCurrentThread()) { | 45 if (task_runner_->RunsTasksOnCurrentThread()) { |
| 45 // Please note that even if this code is run from a different task | 46 // Please note that even if this code is run from a different task |
| 46 // runner on the same thread as |task_runner_|, it is okay to directly | 47 // runner on the same thread as |task_runner_|, it is okay to directly |
| 47 // call Router::RaiseError(), because it will raise error from the | 48 // call Router::RaiseError(), because it will raise error from the |
| 48 // correct task runner asynchronously. | 49 // correct task runner asynchronously. |
| 49 if (router_) | 50 if (router_) |
| 50 router_->RaiseError(); | 51 router_->RaiseError(std::move(error)); |
| 51 } else { | 52 } else { |
| 52 task_runner_->PostTask(FROM_HERE, | 53 task_runner_->PostTask(FROM_HERE, |
| 53 base::Bind(&Router::RaiseError, router_)); | 54 base::Bind(&Router::RaiseError, router_, |
| 55 base::Passed(&error))); |
| 54 } | 56 } |
| 55 } | 57 } |
| 56 } | 58 } |
| 57 | 59 |
| 58 // MessageReceiver implementation: | 60 // MessageReceiver implementation: |
| 59 bool Accept(Message* message) override { | 61 bool Accept(Message* message, Error* error) override { |
| 60 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 62 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 61 accept_was_invoked_ = true; | 63 accept_was_invoked_ = true; |
| 62 DCHECK(message->has_flag(kMessageIsResponse)); | 64 DCHECK(message->has_flag(kMessageIsResponse)); |
| 63 | 65 |
| 64 bool result = false; | 66 if (router_) |
| 67 return router_->Accept(message, error); |
| 65 | 68 |
| 66 if (router_) | 69 *error = Error(Error::Type::RESPONSE_DROPPED); |
| 67 result = router_->Accept(message); | 70 return false; |
| 68 | |
| 69 return result; | |
| 70 } | 71 } |
| 71 | 72 |
| 72 // MessageReceiverWithStatus implementation: | 73 // MessageReceiverWithStatus implementation: |
| 73 bool IsValid() override { | 74 bool IsValid() override { |
| 74 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 75 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 75 return router_ && !router_->encountered_error() && router_->is_valid(); | 76 return router_ && !router_->encountered_error() && router_->is_valid(); |
| 76 } | 77 } |
| 77 | 78 |
| 78 void DCheckInvalid(const std::string& message) override { | 79 void DCheckInvalid(const std::string& message) override { |
| 79 if (task_runner_->RunsTasksOnCurrentThread()) { | 80 if (task_runner_->RunsTasksOnCurrentThread()) { |
| (...skipping 21 matching lines...) Expand all Loading... |
| 101 | 102 |
| 102 // ---------------------------------------------------------------------------- | 103 // ---------------------------------------------------------------------------- |
| 103 | 104 |
| 104 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) | 105 Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) |
| 105 : router_(router) { | 106 : router_(router) { |
| 106 } | 107 } |
| 107 | 108 |
| 108 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { | 109 Router::HandleIncomingMessageThunk::~HandleIncomingMessageThunk() { |
| 109 } | 110 } |
| 110 | 111 |
| 111 bool Router::HandleIncomingMessageThunk::Accept(Message* message) { | 112 bool Router::HandleIncomingMessageThunk::Accept(Message* message, |
| 112 return router_->HandleIncomingMessage(message); | 113 Error* error) { |
| 114 return router_->HandleIncomingMessage(message, error); |
| 113 } | 115 } |
| 114 | 116 |
| 115 // ---------------------------------------------------------------------------- | 117 // ---------------------------------------------------------------------------- |
| 116 | 118 |
| 117 Router::Router(ScopedMessagePipeHandle message_pipe, | 119 Router::Router(ScopedMessagePipeHandle message_pipe, |
| 118 FilterChain filters, | 120 FilterChain filters, |
| 119 bool expects_sync_requests, | 121 bool expects_sync_requests, |
| 120 scoped_refptr<base::SingleThreadTaskRunner> runner) | 122 scoped_refptr<base::SingleThreadTaskRunner> runner) |
| 121 : thunk_(this), | 123 : thunk_(this), |
| 122 filters_(std::move(filters)), | 124 filters_(std::move(filters)), |
| 123 connector_(std::move(message_pipe), | 125 connector_(std::move(message_pipe), |
| 124 Connector::SINGLE_THREADED_SEND, | 126 Connector::SINGLE_THREADED_SEND, |
| 125 std::move(runner)), | 127 std::move(runner)), |
| 126 incoming_receiver_(nullptr), | 128 incoming_receiver_(nullptr), |
| 127 next_request_id_(0), | 129 next_request_id_(0), |
| 128 testing_mode_(false), | 130 testing_mode_(false), |
| 129 pending_task_for_messages_(false), | 131 pending_task_for_messages_(false), |
| 130 encountered_error_(false), | 132 encountered_error_(false), |
| 131 weak_factory_(this) { | 133 weak_factory_(this) { |
| 132 filters_.SetSink(&thunk_); | 134 filters_.SetSink(&thunk_); |
| 133 if (expects_sync_requests) | 135 if (expects_sync_requests) |
| 134 connector_.AllowWokenUpBySyncWatchOnSameThread(); | 136 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
| 135 connector_.set_incoming_receiver(filters_.GetHead()); | 137 connector_.set_incoming_receiver(filters_.GetHead()); |
| 136 connector_.set_connection_error_handler([this]() { OnConnectionError(); }); | 138 connector_.set_connection_error_handler([this]() { OnConnectionError(); }); |
| 137 } | 139 } |
| 138 | 140 |
| 139 Router::~Router() {} | 141 Router::~Router() {} |
| 140 | 142 |
| 141 bool Router::Accept(Message* message) { | 143 bool Router::Accept(Message* message, Error* error) { |
| 142 DCHECK(thread_checker_.CalledOnValidThread()); | 144 DCHECK(thread_checker_.CalledOnValidThread()); |
| 143 DCHECK(!message->has_flag(kMessageExpectsResponse)); | 145 DCHECK(!message->has_flag(kMessageExpectsResponse)); |
| 144 return connector_.Accept(message); | 146 return connector_.Accept(message, error); |
| 145 } | 147 } |
| 146 | 148 |
| 147 bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { | 149 bool Router::AcceptWithResponder(Message* message, |
| 150 MessageReceiver* responder, |
| 151 Error* error) { |
| 148 DCHECK(thread_checker_.CalledOnValidThread()); | 152 DCHECK(thread_checker_.CalledOnValidThread()); |
| 149 DCHECK(message->has_flag(kMessageExpectsResponse)); | 153 DCHECK(message->has_flag(kMessageExpectsResponse)); |
| 150 | 154 |
| 151 // Reserve 0 in case we want it to convey special meaning in the future. | 155 // Reserve 0 in case we want it to convey special meaning in the future. |
| 152 uint64_t request_id = next_request_id_++; | 156 uint64_t request_id = next_request_id_++; |
| 153 if (request_id == 0) | 157 if (request_id == 0) |
| 154 request_id = next_request_id_++; | 158 request_id = next_request_id_++; |
| 155 | 159 |
| 156 bool is_sync = message->has_flag(kMessageIsSync); | 160 bool is_sync = message->has_flag(kMessageIsSync); |
| 157 message->set_request_id(request_id); | 161 message->set_request_id(request_id); |
| 158 if (!connector_.Accept(message)) | 162 if (!connector_.Accept(message, error)) |
| 159 return false; | 163 return false; |
| 160 | 164 |
| 161 if (!is_sync) { | 165 if (!is_sync) { |
| 162 // We assume ownership of |responder|. | 166 // We assume ownership of |responder|. |
| 163 async_responders_[request_id] = base::WrapUnique(responder); | 167 async_responders_[request_id] = base::WrapUnique(responder); |
| 164 return true; | 168 return true; |
| 165 } | 169 } |
| 166 | 170 |
| 167 bool response_received = false; | 171 bool response_received = false; |
| 168 std::unique_ptr<MessageReceiver> sync_responder(responder); | 172 std::unique_ptr<MessageReceiver> sync_responder(responder); |
| 169 sync_responses_.insert(std::make_pair( | 173 sync_responses_.insert(std::make_pair( |
| 170 request_id, base::WrapUnique(new SyncResponseInfo(&response_received)))); | 174 request_id, base::WrapUnique(new SyncResponseInfo(&response_received)))); |
| 171 | 175 |
| 172 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); | 176 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
| 173 connector_.SyncWatch(&response_received); | 177 connector_.SyncWatch(&response_received); |
| 174 // Make sure that this instance hasn't been destroyed. | 178 // Make sure that this instance hasn't been destroyed. |
| 175 if (weak_self) { | 179 if (weak_self) { |
| 176 DCHECK(ContainsKey(sync_responses_, request_id)); | 180 DCHECK(ContainsKey(sync_responses_, request_id)); |
| 177 auto iter = sync_responses_.find(request_id); | 181 auto iter = sync_responses_.find(request_id); |
| 178 DCHECK_EQ(&response_received, iter->second->response_received); | 182 DCHECK_EQ(&response_received, iter->second->response_received); |
| 179 if (response_received) { | 183 if (response_received) { |
| 180 std::unique_ptr<Message> response = std::move(iter->second->response); | 184 std::unique_ptr<Message> response = std::move(iter->second->response); |
| 181 ignore_result(sync_responder->Accept(response.get())); | 185 |
| 186 Error send_response_error; |
| 187 ignore_result(sync_responder->Accept(response.get(), |
| 188 &send_response_error)); |
| 182 } | 189 } |
| 183 sync_responses_.erase(iter); | 190 sync_responses_.erase(iter); |
| 184 } | 191 } |
| 185 | 192 |
| 186 // Return true means that we take ownership of |responder|. | 193 // Return true means that we take ownership of |responder|. |
| 187 return true; | 194 return true; |
| 188 } | 195 } |
| 189 | 196 |
| 190 void Router::EnableTestingMode() { | 197 void Router::EnableTestingMode() { |
| 191 DCHECK(thread_checker_.CalledOnValidThread()); | 198 DCHECK(thread_checker_.CalledOnValidThread()); |
| 192 testing_mode_ = true; | 199 testing_mode_ = true; |
| 193 connector_.set_enforce_errors_from_incoming_receiver(false); | 200 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 194 } | 201 } |
| 195 | 202 |
| 196 bool Router::HandleIncomingMessage(Message* message) { | 203 bool Router::HandleIncomingMessage(Message* message, Error* error) { |
| 197 DCHECK(thread_checker_.CalledOnValidThread()); | 204 DCHECK(thread_checker_.CalledOnValidThread()); |
| 198 | 205 |
| 199 const bool during_sync_call = | 206 const bool during_sync_call = |
| 200 connector_.during_sync_handle_watcher_callback(); | 207 connector_.during_sync_handle_watcher_callback(); |
| 201 if (!message->has_flag(kMessageIsSync) && | 208 if (!message->has_flag(kMessageIsSync) && |
| 202 (during_sync_call || !pending_messages_.empty())) { | 209 (during_sync_call || !pending_messages_.empty())) { |
| 203 std::unique_ptr<Message> pending_message(new Message); | 210 std::unique_ptr<Message> pending_message(new Message); |
| 204 message->MoveTo(pending_message.get()); | 211 message->MoveTo(pending_message.get()); |
| 205 pending_messages_.push(std::move(pending_message)); | 212 pending_messages_.push(std::move(pending_message)); |
| 206 | 213 |
| 207 if (!pending_task_for_messages_) { | 214 if (!pending_task_for_messages_) { |
| 208 pending_task_for_messages_ = true; | 215 pending_task_for_messages_ = true; |
| 209 connector_.task_runner()->PostTask( | 216 connector_.task_runner()->PostTask( |
| 210 FROM_HERE, base::Bind(&Router::HandleQueuedMessages, | 217 FROM_HERE, base::Bind(&Router::HandleQueuedMessages, |
| 211 weak_factory_.GetWeakPtr())); | 218 weak_factory_.GetWeakPtr())); |
| 212 } | 219 } |
| 213 | 220 |
| 214 return true; | 221 return true; |
| 215 } | 222 } |
| 216 | 223 |
| 217 return HandleMessageInternal(message); | 224 return HandleMessageInternal(message, error); |
| 218 } | 225 } |
| 219 | 226 |
| 220 void Router::HandleQueuedMessages() { | 227 void Router::HandleQueuedMessages() { |
| 221 DCHECK(thread_checker_.CalledOnValidThread()); | 228 DCHECK(thread_checker_.CalledOnValidThread()); |
| 222 DCHECK(pending_task_for_messages_); | 229 DCHECK(pending_task_for_messages_); |
| 223 | 230 |
| 224 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); | 231 base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
| 225 while (!pending_messages_.empty()) { | 232 while (!pending_messages_.empty()) { |
| 226 std::unique_ptr<Message> message(std::move(pending_messages_.front())); | 233 std::unique_ptr<Message> message(std::move(pending_messages_.front())); |
| 227 pending_messages_.pop(); | 234 pending_messages_.pop(); |
| 228 | 235 |
| 229 bool result = HandleMessageInternal(message.get()); | 236 Error error; |
| 237 bool result = HandleMessageInternal(message.get(), &error); |
| 230 if (!weak_self) | 238 if (!weak_self) |
| 231 return; | 239 return; |
| 232 | 240 |
| 233 if (!result && !testing_mode_) { | 241 if (!result && !testing_mode_) { |
| 234 connector_.RaiseError(); | 242 connector_.RaiseError(std::move(error)); |
| 235 break; | 243 break; |
| 236 } | 244 } |
| 237 } | 245 } |
| 238 | 246 |
| 239 pending_task_for_messages_ = false; | 247 pending_task_for_messages_ = false; |
| 240 | 248 |
| 241 // We may have already seen a connection error from the connector, but | 249 // We may have already seen a connection error from the connector, but |
| 242 // haven't notified the user because we want to process all the queued | 250 // haven't notified the user because we want to process all the queued |
| 243 // messages first. We should do it now. | 251 // messages first. We should do it now. |
| 244 if (connector_.encountered_error() && !encountered_error_) | 252 if (connector_.encountered_error() && !encountered_error_) |
| 245 OnConnectionError(); | 253 OnConnectionError(); |
| 246 } | 254 } |
| 247 | 255 |
| 248 bool Router::HandleMessageInternal(Message* message) { | 256 bool Router::HandleMessageInternal(Message* message, Error* error) { |
| 249 if (message->has_flag(kMessageExpectsResponse)) { | 257 if (message->has_flag(kMessageExpectsResponse)) { |
| 250 if (!incoming_receiver_) | 258 if (!incoming_receiver_) { |
| 259 *error = Error(Error::Type::REQUEST_DROPPED); |
| 251 return false; | 260 return false; |
| 261 } |
| 252 | 262 |
| 253 MessageReceiverWithStatus* responder = new ResponderThunk( | 263 MessageReceiverWithStatus* responder = new ResponderThunk( |
| 254 weak_factory_.GetWeakPtr(), connector_.task_runner()); | 264 weak_factory_.GetWeakPtr(), connector_.task_runner()); |
| 255 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); | 265 bool ok = incoming_receiver_->AcceptWithResponder(message, responder, |
| 266 error); |
| 256 if (!ok) | 267 if (!ok) |
| 257 delete responder; | 268 delete responder; |
| 258 return ok; | 269 return ok; |
| 259 | 270 |
| 260 } else if (message->has_flag(kMessageIsResponse)) { | 271 } else if (message->has_flag(kMessageIsResponse)) { |
| 261 uint64_t request_id = message->request_id(); | 272 uint64_t request_id = message->request_id(); |
| 262 | 273 |
| 263 if (message->has_flag(kMessageIsSync)) { | 274 if (message->has_flag(kMessageIsSync)) { |
| 264 auto it = sync_responses_.find(request_id); | 275 auto it = sync_responses_.find(request_id); |
| 265 if (it == sync_responses_.end()) { | 276 if (it == sync_responses_.end()) { |
| 266 DCHECK(testing_mode_); | 277 DCHECK(testing_mode_); |
| 278 *error = Error::ForUnexpectedResponse(interface_name_, message); |
| 267 return false; | 279 return false; |
| 268 } | 280 } |
| 269 it->second->response.reset(new Message()); | 281 it->second->response.reset(new Message()); |
| 270 message->MoveTo(it->second->response.get()); | 282 message->MoveTo(it->second->response.get()); |
| 271 *it->second->response_received = true; | 283 *it->second->response_received = true; |
| 272 return true; | 284 return true; |
| 273 } | 285 } |
| 274 | 286 |
| 275 auto it = async_responders_.find(request_id); | 287 auto it = async_responders_.find(request_id); |
| 276 if (it == async_responders_.end()) { | 288 if (it == async_responders_.end()) { |
| 277 DCHECK(testing_mode_); | 289 DCHECK(testing_mode_); |
| 290 *error = Error::ForUnexpectedResponse(interface_name_, message); |
| 278 return false; | 291 return false; |
| 279 } | 292 } |
| 280 std::unique_ptr<MessageReceiver> responder = std::move(it->second); | 293 std::unique_ptr<MessageReceiver> responder = std::move(it->second); |
| 281 async_responders_.erase(it); | 294 async_responders_.erase(it); |
| 282 return responder->Accept(message); | 295 return responder->Accept(message, error); |
| 283 } else { | 296 } else { |
| 284 if (!incoming_receiver_) | 297 if (!incoming_receiver_) { |
| 298 *error = Error(Error::Type::REQUEST_DROPPED); |
| 285 return false; | 299 return false; |
| 300 } |
| 286 | 301 |
| 287 return incoming_receiver_->Accept(message); | 302 return incoming_receiver_->Accept(message, error); |
| 288 } | 303 } |
| 289 } | 304 } |
| 290 | 305 |
| 291 void Router::OnConnectionError() { | 306 void Router::OnConnectionError() { |
| 292 if (encountered_error_) | 307 if (encountered_error_) |
| 293 return; | 308 return; |
| 294 | 309 |
| 295 if (!pending_messages_.empty()) { | 310 if (!pending_messages_.empty()) { |
| 296 // After all the pending messages are processed, we will check whether an | 311 // After all the pending messages are processed, we will check whether an |
| 297 // error has been encountered and run the user's connection error handler | 312 // error has been encountered and run the user's connection error handler |
| (...skipping 11 matching lines...) Expand all Loading... |
| 309 } | 324 } |
| 310 | 325 |
| 311 encountered_error_ = true; | 326 encountered_error_ = true; |
| 312 error_handler_.Run(); | 327 error_handler_.Run(); |
| 313 } | 328 } |
| 314 | 329 |
| 315 // ---------------------------------------------------------------------------- | 330 // ---------------------------------------------------------------------------- |
| 316 | 331 |
| 317 } // namespace internal | 332 } // namespace internal |
| 318 } // namespace mojo | 333 } // namespace mojo |
| OLD | NEW |