| Index: mojo/public/cpp/bindings/lib/router.cc
|
| diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc
|
| index 44244aa86324a07b1bebefe3969ce498dd3b53e2..9bc9aa16ec425a7f55be34284b774bbd056b208b 100644
|
| --- a/mojo/public/cpp/bindings/lib/router.cc
|
| +++ b/mojo/public/cpp/bindings/lib/router.cc
|
| @@ -18,18 +18,17 @@ namespace {
|
|
|
| class ResponderThunk : public MessageReceiverWithStatus {
|
| public:
|
| - explicit ResponderThunk(const SharedData<Router*>& router)
|
| + explicit ResponderThunk(const base::WeakPtr<Router>& router)
|
| : router_(router), accept_was_invoked_(false) {}
|
| ~ResponderThunk() override {
|
| if (!accept_was_invoked_) {
|
| // The Mojo application handled a message that was expecting a response
|
| // but did not send a response.
|
| - Router* router = router_.value();
|
| - if (router) {
|
| + if (router_) {
|
| // We raise an error to signal the calling application that an error
|
| // condition occurred. Without this the calling application would have
|
| // no way of knowing it should stop waiting for a response.
|
| - router->RaiseError();
|
| + router_->RaiseError();
|
| }
|
| }
|
| }
|
| @@ -41,21 +40,19 @@ class ResponderThunk : public MessageReceiverWithStatus {
|
|
|
| bool result = false;
|
|
|
| - Router* router = router_.value();
|
| - if (router)
|
| - result = router->Accept(message);
|
| + if (router_)
|
| + result = router_->Accept(message);
|
|
|
| return result;
|
| }
|
|
|
| // MessageReceiverWithStatus implementation:
|
| bool IsValid() override {
|
| - Router* router = router_.value();
|
| - return router && !router->encountered_error() && router->is_valid();
|
| + return router_ && !router_->encountered_error() && router_->is_valid();
|
| }
|
|
|
| private:
|
| - SharedData<Router*> router_;
|
| + base::WeakPtr<Router> router_;
|
| bool accept_was_invoked_;
|
| };
|
|
|
| @@ -84,18 +81,20 @@ Router::Router(ScopedMessagePipeHandle message_pipe,
|
| connector_(std::move(message_pipe),
|
| Connector::SINGLE_THREADED_SEND,
|
| waiter),
|
| - weak_self_(this),
|
| incoming_receiver_(nullptr),
|
| next_request_id_(0),
|
| - testing_mode_(false) {
|
| + testing_mode_(false),
|
| + weak_factory_(this) {
|
| filters_.SetSink(&thunk_);
|
| connector_.set_incoming_receiver(filters_.GetHead());
|
| }
|
|
|
| Router::~Router() {
|
| - weak_self_.set_value(nullptr);
|
| + weak_factory_.InvalidateWeakPtrs();
|
|
|
| - for (auto& pair : responders_)
|
| + for (auto& pair : async_responders_)
|
| + delete pair.second;
|
| + for (auto& pair : sync_responders_)
|
| delete pair.second;
|
| }
|
|
|
| @@ -118,8 +117,32 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
|
| if (!connector_.Accept(message))
|
| return false;
|
|
|
| - // We assume ownership of |responder|.
|
| - responders_[request_id] = responder;
|
| + if (!message->has_flag(kMessageIsSync)) {
|
| + // We assume ownership of |responder|.
|
| + async_responders_[request_id] = responder;
|
| + return true;
|
| + }
|
| +
|
| + sync_responders_[request_id] = responder;
|
| + base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
|
| + for (;;) {
|
| + // TODO(yzshen): Here we should allow incoming sync requests to re-enter and
|
| + // block async messages.
|
| + bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
|
| + // The message pipe has disconnected.
|
| + if (!result)
|
| + break;
|
| +
|
| + // This instance has been destroyed.
|
| + if (!weak_self)
|
| + break;
|
| +
|
| + // The corresponding response message has arrived.
|
| + if (sync_responders_.find(request_id) == sync_responders_.end())
|
| + break;
|
| + }
|
| +
|
| + // Return true means that we take ownership of |responder|.
|
| return true;
|
| }
|
|
|
| @@ -135,21 +158,25 @@ bool Router::HandleIncomingMessage(Message* message) {
|
| if (!incoming_receiver_)
|
| return false;
|
|
|
| - MessageReceiverWithStatus* responder = new ResponderThunk(weak_self_);
|
| + MessageReceiverWithStatus* responder =
|
| + new ResponderThunk(weak_factory_.GetWeakPtr());
|
| bool ok = incoming_receiver_->AcceptWithResponder(message, responder);
|
| if (!ok)
|
| delete responder;
|
| return ok;
|
|
|
| } else if (message->has_flag(kMessageIsResponse)) {
|
| + ResponderMap& responder_map = message->has_flag(kMessageIsSync)
|
| + ? sync_responders_
|
| + : async_responders_;
|
| uint64_t request_id = message->request_id();
|
| - ResponderMap::iterator it = responders_.find(request_id);
|
| - if (it == responders_.end()) {
|
| + ResponderMap::iterator it = responder_map.find(request_id);
|
| + if (it == responder_map.end()) {
|
| DCHECK(testing_mode_);
|
| return false;
|
| }
|
| MessageReceiver* responder = it->second;
|
| - responders_.erase(it);
|
| + responder_map.erase(it);
|
| bool ok = responder->Accept(message);
|
| delete responder;
|
| return ok;
|
|
|