Index: mojo/public/cpp/bindings/lib/router.cc |
diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc |
index 44244aa86324a07b1bebefe3969ce498dd3b53e2..9bc9aa16ec425a7f55be34284b774bbd056b208b 100644 |
--- a/mojo/public/cpp/bindings/lib/router.cc |
+++ b/mojo/public/cpp/bindings/lib/router.cc |
@@ -18,18 +18,17 @@ namespace { |
class ResponderThunk : public MessageReceiverWithStatus { |
public: |
- explicit ResponderThunk(const SharedData<Router*>& router) |
+ explicit ResponderThunk(const base::WeakPtr<Router>& router) |
: router_(router), accept_was_invoked_(false) {} |
~ResponderThunk() override { |
if (!accept_was_invoked_) { |
// The Mojo application handled a message that was expecting a response |
// but did not send a response. |
- Router* router = router_.value(); |
- if (router) { |
+ if (router_) { |
// We raise an error to signal the calling application that an error |
// condition occurred. Without this the calling application would have |
// no way of knowing it should stop waiting for a response. |
- router->RaiseError(); |
+ router_->RaiseError(); |
} |
} |
} |
@@ -41,21 +40,19 @@ class ResponderThunk : public MessageReceiverWithStatus { |
bool result = false; |
- Router* router = router_.value(); |
- if (router) |
- result = router->Accept(message); |
+ if (router_) |
+ result = router_->Accept(message); |
return result; |
} |
// MessageReceiverWithStatus implementation: |
bool IsValid() override { |
- Router* router = router_.value(); |
- return router && !router->encountered_error() && router->is_valid(); |
+ return router_ && !router_->encountered_error() && router_->is_valid(); |
} |
private: |
- SharedData<Router*> router_; |
+ base::WeakPtr<Router> router_; |
bool accept_was_invoked_; |
}; |
@@ -84,18 +81,20 @@ Router::Router(ScopedMessagePipeHandle message_pipe, |
connector_(std::move(message_pipe), |
Connector::SINGLE_THREADED_SEND, |
waiter), |
- weak_self_(this), |
incoming_receiver_(nullptr), |
next_request_id_(0), |
- testing_mode_(false) { |
+ testing_mode_(false), |
+ weak_factory_(this) { |
filters_.SetSink(&thunk_); |
connector_.set_incoming_receiver(filters_.GetHead()); |
} |
Router::~Router() { |
- weak_self_.set_value(nullptr); |
+ weak_factory_.InvalidateWeakPtrs(); |
- for (auto& pair : responders_) |
+ for (auto& pair : async_responders_) |
+ delete pair.second; |
+ for (auto& pair : sync_responders_) |
delete pair.second; |
} |
@@ -118,8 +117,32 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { |
if (!connector_.Accept(message)) |
return false; |
- // We assume ownership of |responder|. |
- responders_[request_id] = responder; |
+ if (!message->has_flag(kMessageIsSync)) { |
+ // We assume ownership of |responder|. |
+ async_responders_[request_id] = responder; |
+ return true; |
+ } |
+ |
+ sync_responders_[request_id] = responder; |
+ base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
+ for (;;) { |
+ // TODO(yzshen): Here we should allow incoming sync requests to re-enter and |
+ // block async messages. |
+ bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); |
+ // The message pipe has disconnected. |
+ if (!result) |
+ break; |
+ |
+ // This instance has been destroyed. |
+ if (!weak_self) |
+ break; |
+ |
+ // The corresponding response message has arrived. |
+ if (sync_responders_.find(request_id) == sync_responders_.end()) |
+ break; |
+ } |
+ |
+ // Return true means that we take ownership of |responder|. |
return true; |
} |
@@ -135,21 +158,25 @@ bool Router::HandleIncomingMessage(Message* message) { |
if (!incoming_receiver_) |
return false; |
- MessageReceiverWithStatus* responder = new ResponderThunk(weak_self_); |
+ MessageReceiverWithStatus* responder = |
+ new ResponderThunk(weak_factory_.GetWeakPtr()); |
bool ok = incoming_receiver_->AcceptWithResponder(message, responder); |
if (!ok) |
delete responder; |
return ok; |
} else if (message->has_flag(kMessageIsResponse)) { |
+ ResponderMap& responder_map = message->has_flag(kMessageIsSync) |
+ ? sync_responders_ |
+ : async_responders_; |
uint64_t request_id = message->request_id(); |
- ResponderMap::iterator it = responders_.find(request_id); |
- if (it == responders_.end()) { |
+ ResponderMap::iterator it = responder_map.find(request_id); |
+ if (it == responder_map.end()) { |
DCHECK(testing_mode_); |
return false; |
} |
MessageReceiver* responder = it->second; |
- responders_.erase(it); |
+ responder_map.erase(it); |
bool ok = responder->Accept(message); |
delete responder; |
return ok; |