| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" | 5 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 43 : endpoint_client_(endpoint_client), | 43 : endpoint_client_(endpoint_client), |
| 44 accept_was_invoked_(false), | 44 accept_was_invoked_(false), |
| 45 task_runner_(std::move(runner)) {} | 45 task_runner_(std::move(runner)) {} |
| 46 ~ResponderThunk() override { | 46 ~ResponderThunk() override { |
| 47 if (!accept_was_invoked_) { | 47 if (!accept_was_invoked_) { |
| 48 // The Mojo application handled a message that was expecting a response | 48 // The Mojo application handled a message that was expecting a response |
| 49 // but did not send a response. | 49 // but did not send a response. |
| 50 // We raise an error to signal the calling application that an error | 50 // We raise an error to signal the calling application that an error |
| 51 // condition occurred. Without this the calling application would have no | 51 // condition occurred. Without this the calling application would have no |
| 52 // way of knowing it should stop waiting for a response. | 52 // way of knowing it should stop waiting for a response. |
| 53 Error error(Error::Type::REQUEST_CANCELED); |
| 53 if (task_runner_->RunsTasksOnCurrentThread()) { | 54 if (task_runner_->RunsTasksOnCurrentThread()) { |
| 54 // Please note that even if this code is run from a different task | 55 // Please note that even if this code is run from a different task |
| 55 // runner on the same thread as |task_runner_|, it is okay to directly | 56 // runner on the same thread as |task_runner_|, it is okay to directly |
| 56 // call InterfaceEndpointClient::RaiseError(), because it will raise | 57 // call InterfaceEndpointClient::RaiseError(), because it will raise |
| 57 // error from the correct task runner asynchronously. | 58 // error from the correct task runner asynchronously. |
| 58 if (endpoint_client_) { | 59 if (endpoint_client_) |
| 59 endpoint_client_->RaiseError(); | 60 endpoint_client_->RaiseError(std::move(error)); |
| 60 } | |
| 61 } else { | 61 } else { |
| 62 task_runner_->PostTask( | 62 task_runner_->PostTask( |
| 63 FROM_HERE, | 63 FROM_HERE, |
| 64 base::Bind(&InterfaceEndpointClient::RaiseError, endpoint_client_)); | 64 base::Bind(&InterfaceEndpointClient::RaiseError, endpoint_client_, |
| 65 base::Passed(&error))); |
| 65 } | 66 } |
| 66 } | 67 } |
| 67 } | 68 } |
| 68 | 69 |
| 69 // MessageReceiver implementation: | 70 // MessageReceiver implementation: |
| 70 bool Accept(Message* message) override { | 71 bool Accept(Message* message, Error* error) override { |
| 71 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 72 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 72 accept_was_invoked_ = true; | 73 accept_was_invoked_ = true; |
| 73 DCHECK(message->has_flag(kMessageIsResponse)); | 74 DCHECK(message->has_flag(kMessageIsResponse)); |
| 74 | 75 |
| 75 bool result = false; | 76 if (endpoint_client_) |
| 77 return endpoint_client_->Accept(message, error); |
| 76 | 78 |
| 77 if (endpoint_client_) | 79 *error = Error(Error::Type::RESPONSE_DROPPED); |
| 78 result = endpoint_client_->Accept(message); | 80 return false; |
| 79 | |
| 80 return result; | |
| 81 } | 81 } |
| 82 | 82 |
| 83 // MessageReceiverWithStatus implementation: | 83 // MessageReceiverWithStatus implementation: |
| 84 bool IsValid() override { | 84 bool IsValid() override { |
| 85 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 85 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
| 86 return endpoint_client_ && !endpoint_client_->encountered_error(); | 86 return endpoint_client_ && !endpoint_client_->encountered_error(); |
| 87 } | 87 } |
| 88 | 88 |
| 89 void DCheckInvalid(const std::string& message) override { | 89 void DCheckInvalid(const std::string& message) override { |
| 90 if (task_runner_->RunsTasksOnCurrentThread()) { | 90 if (task_runner_->RunsTasksOnCurrentThread()) { |
| (...skipping 25 matching lines...) Expand all Loading... |
| 116 // ---------------------------------------------------------------------------- | 116 // ---------------------------------------------------------------------------- |
| 117 | 117 |
| 118 InterfaceEndpointClient::HandleIncomingMessageThunk::HandleIncomingMessageThunk( | 118 InterfaceEndpointClient::HandleIncomingMessageThunk::HandleIncomingMessageThunk( |
| 119 InterfaceEndpointClient* owner) | 119 InterfaceEndpointClient* owner) |
| 120 : owner_(owner) {} | 120 : owner_(owner) {} |
| 121 | 121 |
| 122 InterfaceEndpointClient::HandleIncomingMessageThunk:: | 122 InterfaceEndpointClient::HandleIncomingMessageThunk:: |
| 123 ~HandleIncomingMessageThunk() {} | 123 ~HandleIncomingMessageThunk() {} |
| 124 | 124 |
| 125 bool InterfaceEndpointClient::HandleIncomingMessageThunk::Accept( | 125 bool InterfaceEndpointClient::HandleIncomingMessageThunk::Accept( |
| 126 Message* message) { | 126 Message* message, |
| 127 return owner_->HandleValidatedMessage(message); | 127 Error* error) { |
| 128 return owner_->HandleValidatedMessage(message, error); |
| 128 } | 129 } |
| 129 | 130 |
| 130 // ---------------------------------------------------------------------------- | 131 // ---------------------------------------------------------------------------- |
| 131 | 132 |
| 132 InterfaceEndpointClient::InterfaceEndpointClient( | 133 InterfaceEndpointClient::InterfaceEndpointClient( |
| 133 ScopedInterfaceEndpointHandle handle, | 134 ScopedInterfaceEndpointHandle handle, |
| 134 MessageReceiverWithResponderStatus* receiver, | 135 MessageReceiverWithResponderStatus* receiver, |
| 135 std::unique_ptr<MessageFilter> payload_validator, | 136 std::unique_ptr<MessageFilter> payload_validator, |
| 136 bool expect_sync_requests, | 137 bool expect_sync_requests, |
| 137 scoped_refptr<base::SingleThreadTaskRunner> runner) | 138 scoped_refptr<base::SingleThreadTaskRunner> runner) |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 179 | 180 |
| 180 if (!handle_.is_valid()) | 181 if (!handle_.is_valid()) |
| 181 return ScopedInterfaceEndpointHandle(); | 182 return ScopedInterfaceEndpointHandle(); |
| 182 | 183 |
| 183 controller_ = nullptr; | 184 controller_ = nullptr; |
| 184 handle_.router()->DetachEndpointClient(handle_); | 185 handle_.router()->DetachEndpointClient(handle_); |
| 185 | 186 |
| 186 return std::move(handle_); | 187 return std::move(handle_); |
| 187 } | 188 } |
| 188 | 189 |
| 189 void InterfaceEndpointClient::RaiseError() { | 190 void InterfaceEndpointClient::RaiseError(Error error) { |
| 190 DCHECK(thread_checker_.CalledOnValidThread()); | 191 DCHECK(thread_checker_.CalledOnValidThread()); |
| 191 | 192 |
| 192 handle_.router()->RaiseError(); | 193 handle_.router()->RaiseError(std::move(error)); |
| 193 } | 194 } |
| 194 | 195 |
| 195 bool InterfaceEndpointClient::Accept(Message* message) { | 196 bool InterfaceEndpointClient::Accept(Message* message, Error* error) { |
| 196 DCHECK(thread_checker_.CalledOnValidThread()); | 197 DCHECK(thread_checker_.CalledOnValidThread()); |
| 197 DCHECK(controller_); | 198 DCHECK(controller_); |
| 198 DCHECK(!message->has_flag(kMessageExpectsResponse)); | 199 DCHECK(!message->has_flag(kMessageExpectsResponse)); |
| 199 | 200 |
| 200 if (encountered_error_) | 201 if (encountered_error_ || !controller_->SendMessage(message)) { |
| 202 *error = Error(Error::Type::SEND_FAILED); |
| 201 return false; | 203 return false; |
| 204 } |
| 202 | 205 |
| 203 return controller_->SendMessage(message); | 206 return true; |
| 204 } | 207 } |
| 205 | 208 |
| 206 bool InterfaceEndpointClient::AcceptWithResponder(Message* message, | 209 bool InterfaceEndpointClient::AcceptWithResponder(Message* message, |
| 207 MessageReceiver* responder) { | 210 MessageReceiver* responder, |
| 211 Error* error) { |
| 208 DCHECK(thread_checker_.CalledOnValidThread()); | 212 DCHECK(thread_checker_.CalledOnValidThread()); |
| 209 DCHECK(controller_); | 213 DCHECK(controller_); |
| 210 DCHECK(message->has_flag(kMessageExpectsResponse)); | 214 DCHECK(message->has_flag(kMessageExpectsResponse)); |
| 211 | 215 |
| 212 if (encountered_error_) | 216 if (encountered_error_) { |
| 217 *error = Error(Error::Type::SEND_FAILED); |
| 213 return false; | 218 return false; |
| 219 } |
| 214 | 220 |
| 215 // Reserve 0 in case we want it to convey special meaning in the future. | 221 // Reserve 0 in case we want it to convey special meaning in the future. |
| 216 uint64_t request_id = next_request_id_++; | 222 uint64_t request_id = next_request_id_++; |
| 217 if (request_id == 0) | 223 if (request_id == 0) |
| 218 request_id = next_request_id_++; | 224 request_id = next_request_id_++; |
| 219 | 225 |
| 220 message->set_request_id(request_id); | 226 message->set_request_id(request_id); |
| 221 | 227 |
| 222 bool is_sync = message->has_flag(kMessageIsSync); | 228 bool is_sync = message->has_flag(kMessageIsSync); |
| 223 if (!controller_->SendMessage(message)) | 229 if (!controller_->SendMessage(message)) { |
| 230 *error = Error(Error::Type::SEND_FAILED); |
| 224 return false; | 231 return false; |
| 232 } |
| 225 | 233 |
| 226 if (!is_sync) { | 234 if (!is_sync) { |
| 227 // We assume ownership of |responder|. | 235 // We assume ownership of |responder|. |
| 228 async_responders_[request_id] = base::WrapUnique(responder); | 236 async_responders_[request_id] = base::WrapUnique(responder); |
| 229 return true; | 237 return true; |
| 230 } | 238 } |
| 231 | 239 |
| 232 bool response_received = false; | 240 bool response_received = false; |
| 233 std::unique_ptr<MessageReceiver> sync_responder(responder); | 241 std::unique_ptr<MessageReceiver> sync_responder(responder); |
| 234 sync_responses_.insert(std::make_pair( | 242 sync_responses_.insert(std::make_pair( |
| 235 request_id, base::WrapUnique(new SyncResponseInfo(&response_received)))); | 243 request_id, base::WrapUnique(new SyncResponseInfo(&response_received)))); |
| 236 | 244 |
| 237 base::WeakPtr<InterfaceEndpointClient> weak_self = | 245 base::WeakPtr<InterfaceEndpointClient> weak_self = |
| 238 weak_ptr_factory_.GetWeakPtr(); | 246 weak_ptr_factory_.GetWeakPtr(); |
| 239 controller_->SyncWatch(&response_received); | 247 controller_->SyncWatch(&response_received); |
| 240 // Make sure that this instance hasn't been destroyed. | 248 // Make sure that this instance hasn't been destroyed. |
| 241 if (weak_self) { | 249 if (weak_self) { |
| 242 DCHECK(ContainsKey(sync_responses_, request_id)); | 250 DCHECK(ContainsKey(sync_responses_, request_id)); |
| 243 auto iter = sync_responses_.find(request_id); | 251 auto iter = sync_responses_.find(request_id); |
| 244 DCHECK_EQ(&response_received, iter->second->response_received); | 252 DCHECK_EQ(&response_received, iter->second->response_received); |
| 245 if (response_received) { | 253 if (response_received) { |
| 254 Error send_response_error; |
| 246 std::unique_ptr<Message> response = std::move(iter->second->response); | 255 std::unique_ptr<Message> response = std::move(iter->second->response); |
| 247 ignore_result(sync_responder->Accept(response.get())); | 256 ignore_result(sync_responder->Accept(response.get(), |
| 257 &send_response_error)); |
| 248 } | 258 } |
| 249 sync_responses_.erase(iter); | 259 sync_responses_.erase(iter); |
| 250 } | 260 } |
| 251 | 261 |
| 252 // Return true means that we take ownership of |responder|. | 262 // Return true means that we take ownership of |responder|. |
| 253 return true; | 263 return true; |
| 254 } | 264 } |
| 255 | 265 |
| 256 bool InterfaceEndpointClient::HandleIncomingMessage(Message* message) { | 266 bool InterfaceEndpointClient::HandleIncomingMessage(Message* message, |
| 267 Error* error) { |
| 257 DCHECK(thread_checker_.CalledOnValidThread()); | 268 DCHECK(thread_checker_.CalledOnValidThread()); |
| 258 | 269 |
| 259 return payload_validator_->Accept(message); | 270 return payload_validator_->Accept(message, error); |
| 260 } | 271 } |
| 261 | 272 |
| 262 void InterfaceEndpointClient::NotifyError() { | 273 void InterfaceEndpointClient::NotifyError() { |
| 263 DCHECK(thread_checker_.CalledOnValidThread()); | 274 DCHECK(thread_checker_.CalledOnValidThread()); |
| 264 | 275 |
| 265 if (encountered_error_) | 276 if (encountered_error_) |
| 266 return; | 277 return; |
| 267 encountered_error_ = true; | 278 encountered_error_ = true; |
| 268 error_handler_.Run(); | 279 error_handler_.Run(); |
| 269 } | 280 } |
| 270 | 281 |
| 271 bool InterfaceEndpointClient::HandleValidatedMessage(Message* message) { | 282 bool InterfaceEndpointClient::HandleValidatedMessage(Message* message, |
| 283 Error* error) { |
| 272 DCHECK_EQ(handle_.id(), message->interface_id()); | 284 DCHECK_EQ(handle_.id(), message->interface_id()); |
| 273 | 285 |
| 274 if (message->has_flag(kMessageExpectsResponse)) { | 286 if (message->has_flag(kMessageExpectsResponse)) { |
| 275 if (!incoming_receiver_) | 287 if (!incoming_receiver_) { |
| 288 *error = Error(Error::Type::REQUEST_DROPPED); |
| 276 return false; | 289 return false; |
| 290 } |
| 277 | 291 |
| 278 MessageReceiverWithStatus* responder = | 292 MessageReceiverWithStatus* responder = |
| 279 new ResponderThunk(weak_ptr_factory_.GetWeakPtr(), task_runner_); | 293 new ResponderThunk(weak_ptr_factory_.GetWeakPtr(), task_runner_); |
| 280 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); | 294 bool ok = incoming_receiver_->AcceptWithResponder(message, responder, |
| 295 error); |
| 281 if (!ok) | 296 if (!ok) |
| 282 delete responder; | 297 delete responder; |
| 283 return ok; | 298 return ok; |
| 284 } else if (message->has_flag(kMessageIsResponse)) { | 299 } else if (message->has_flag(kMessageIsResponse)) { |
| 285 uint64_t request_id = message->request_id(); | 300 uint64_t request_id = message->request_id(); |
| 286 | 301 |
| 287 if (message->has_flag(kMessageIsSync)) { | 302 if (message->has_flag(kMessageIsSync)) { |
| 288 auto it = sync_responses_.find(request_id); | 303 auto it = sync_responses_.find(request_id); |
| 289 if (it == sync_responses_.end()) | 304 if (it == sync_responses_.end()) { |
| 305 *error = Error::ForUnexpectedResponse(interface_name_, message); |
| 290 return false; | 306 return false; |
| 307 } |
| 291 it->second->response.reset(new Message()); | 308 it->second->response.reset(new Message()); |
| 292 message->MoveTo(it->second->response.get()); | 309 message->MoveTo(it->second->response.get()); |
| 293 *it->second->response_received = true; | 310 *it->second->response_received = true; |
| 294 return true; | 311 return true; |
| 295 } | 312 } |
| 296 | 313 |
| 297 auto it = async_responders_.find(request_id); | 314 auto it = async_responders_.find(request_id); |
| 298 if (it == async_responders_.end()) | 315 if (it == async_responders_.end()) { |
| 316 *error = Error::ForUnexpectedResponse(interface_name_, message); |
| 299 return false; | 317 return false; |
| 318 } |
| 300 std::unique_ptr<MessageReceiver> responder = std::move(it->second); | 319 std::unique_ptr<MessageReceiver> responder = std::move(it->second); |
| 301 async_responders_.erase(it); | 320 async_responders_.erase(it); |
| 302 return responder->Accept(message); | 321 return responder->Accept(message, error); |
| 303 } else { | 322 } else { |
| 304 if (!incoming_receiver_) | 323 if (!incoming_receiver_) { |
| 324 *error = Error(Error::Type::REQUEST_DROPPED); |
| 305 return false; | 325 return false; |
| 326 } |
| 306 | 327 |
| 307 return incoming_receiver_->Accept(message); | 328 return incoming_receiver_->Accept(message, error); |
| 308 } | 329 } |
| 309 } | 330 } |
| 310 | 331 |
| 311 } // namespace internal | 332 } // namespace internal |
| 312 } // namespace mojo | 333 } // namespace mojo |
| OLD | NEW |