Index: mojo/public/cpp/bindings/lib/interface_endpoint_client.cc |
diff --git a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc |
index 8094ad0728018cb46cc5216af9414953daa41969..88da186fb82b0df638a0619dd1735fe10fa252dc 100644 |
--- a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc |
+++ b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc |
@@ -15,6 +15,7 @@ |
#include "base/stl_util.h" |
#include "base/thread_task_runner_handle.h" |
#include "mojo/public/cpp/bindings/associated_group.h" |
+#include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h" |
#include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
namespace mojo { |
@@ -97,6 +98,13 @@ class ResponderThunk : public MessageReceiverWithStatus { |
}; |
} // namespace |
+// ---------------------------------------------------------------------------- |
Ken Rockot(use gerrit already)
2016/03/29 06:36:49
nit: vertical space above?
yzshen1
2016/03/29 16:19:01
Done.
|
+ |
+InterfaceEndpointClient::SyncResponseInfo::SyncResponseInfo( |
+ bool* in_response_received) |
+ : response_received(in_response_received) {} |
+ |
+InterfaceEndpointClient::SyncResponseInfo::~SyncResponseInfo() {} |
// ---------------------------------------------------------------------------- |
@@ -117,7 +125,8 @@ bool InterfaceEndpointClient::HandleIncomingMessageThunk::Accept( |
InterfaceEndpointClient::InterfaceEndpointClient( |
ScopedInterfaceEndpointHandle handle, |
MessageReceiverWithResponderStatus* receiver, |
- scoped_ptr<MessageFilter> payload_validator) |
+ scoped_ptr<MessageFilter> payload_validator, |
+ bool expect_sync_requests) |
: handle_(std::move(handle)), |
incoming_receiver_(receiver), |
payload_validator_(std::move(payload_validator)), |
@@ -132,14 +141,14 @@ InterfaceEndpointClient::InterfaceEndpointClient( |
// directly is a little awkward. |
payload_validator_->set_sink(&thunk_); |
- handle_.router()->AttachEndpointClient(handle_, this); |
+ controller_ = handle_.router()->AttachEndpointClient(handle_, this); |
+ if (expect_sync_requests) |
+ controller_->AllowWokenUpBySyncWatchOnSameThread(); |
} |
InterfaceEndpointClient::~InterfaceEndpointClient() { |
DCHECK(thread_checker_.CalledOnValidThread()); |
- STLDeleteValues(&responders_); |
- |
handle_.router()->DetachEndpointClient(handle_); |
} |
@@ -161,6 +170,7 @@ ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() { |
if (!handle_.is_valid()) |
return ScopedInterfaceEndpointHandle(); |
+ controller_ = nullptr; |
handle_.router()->DetachEndpointClient(handle_); |
return std::move(handle_); |
@@ -174,26 +184,21 @@ void InterfaceEndpointClient::RaiseError() { |
bool InterfaceEndpointClient::Accept(Message* message) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(controller_); |
DCHECK(!message->has_flag(kMessageExpectsResponse)); |
if (encountered_error_) |
return false; |
- return handle_.router()->SendMessage(handle_, message); |
+ return controller_->SendMessage(message); |
} |
bool InterfaceEndpointClient::AcceptWithResponder(Message* message, |
MessageReceiver* responder) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(controller_); |
DCHECK(message->has_flag(kMessageExpectsResponse)); |
- // TODO(yzshen): Sync method call using assoicated interfaces or master |
- // interfaces that serve associated interfaces hasn't been supported yet. |
- if (message->has_flag(kMessageIsSync)) { |
- NOTIMPLEMENTED(); |
- return false; |
- } |
- |
if (encountered_error_) |
return false; |
@@ -204,11 +209,36 @@ bool InterfaceEndpointClient::AcceptWithResponder(Message* message, |
message->set_request_id(request_id); |
- if (!handle_.router()->SendMessage(handle_, message)) |
+ if (!controller_->SendMessage(message)) |
return false; |
- // We assume ownership of |responder|. |
- responders_[request_id] = responder; |
+ if (!message->has_flag(kMessageIsSync)) { |
+ // We assume ownership of |responder|. |
+ async_responders_[request_id] = make_scoped_ptr(responder); |
+ return true; |
+ } |
+ |
+ bool response_received = false; |
+ scoped_ptr<MessageReceiver> sync_responder(responder); |
+ sync_responses_.insert(std::make_pair( |
+ request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); |
+ |
+ base::WeakPtr<InterfaceEndpointClient> weak_self = |
+ weak_ptr_factory_.GetWeakPtr(); |
+ controller_->SyncWatch(&response_received); |
+ // Make sure that this instance hasn't been destroyed. |
+ if (weak_self) { |
+ DCHECK(ContainsKey(sync_responses_, request_id)); |
+ auto iter = sync_responses_.find(request_id); |
+ DCHECK_EQ(&response_received, iter->second->response_received); |
+ if (response_received) { |
+ scoped_ptr<Message> response = std::move(iter->second->response); |
+ ignore_result(sync_responder->Accept(response.get())); |
+ } |
+ sync_responses_.erase(iter); |
+ } |
+ |
+ // Return true means that we take ownership of |responder|. |
return true; |
} |
@@ -242,14 +272,23 @@ bool InterfaceEndpointClient::HandleValidatedMessage(Message* message) { |
return ok; |
} else if (message->has_flag(kMessageIsResponse)) { |
uint64_t request_id = message->request_id(); |
- ResponderMap::iterator it = responders_.find(request_id); |
- if (it == responders_.end()) |
+ |
+ if (message->has_flag(kMessageIsSync)) { |
+ auto it = sync_responses_.find(request_id); |
+ if (it == sync_responses_.end()) |
+ return false; |
+ it->second->response.reset(new Message()); |
+ message->MoveTo(it->second->response.get()); |
+ *it->second->response_received = true; |
+ return true; |
+ } |
+ |
+ auto it = async_responders_.find(request_id); |
+ if (it == async_responders_.end()) |
return false; |
- MessageReceiver* responder = it->second; |
- responders_.erase(it); |
- bool ok = responder->Accept(message); |
- delete responder; |
- return ok; |
+ scoped_ptr<MessageReceiver> responder = std::move(it->second); |
+ async_responders_.erase(it); |
+ return responder->Accept(message); |
} else { |
if (!incoming_receiver_) |
return false; |