| Index: mojo/public/cpp/bindings/lib/router.cc
|
| diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc
|
| index a78cdc706b168357b317d409234cd185f888fa12..9bc9aa16ec425a7f55be34284b774bbd056b208b 100644
|
| --- a/mojo/public/cpp/bindings/lib/router.cc
|
| +++ b/mojo/public/cpp/bindings/lib/router.cc
|
| @@ -7,10 +7,7 @@
|
| #include <stdint.h>
|
| #include <utility>
|
|
|
| -#include "base/bind.h"
|
| #include "base/logging.h"
|
| -#include "base/message_loop/message_loop.h"
|
| -#include "base/stl_util.h"
|
|
|
| namespace mojo {
|
| namespace internal {
|
| @@ -63,13 +60,6 @@
|
|
|
| // ----------------------------------------------------------------------------
|
|
|
| -Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received)
|
| - : response_received(in_response_received) {}
|
| -
|
| -Router::SyncResponseInfo::~SyncResponseInfo() {}
|
| -
|
| -// ----------------------------------------------------------------------------
|
| -
|
| Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
|
| : router_(router) {
|
| }
|
| @@ -85,7 +75,6 @@
|
|
|
| Router::Router(ScopedMessagePipeHandle message_pipe,
|
| FilterChain filters,
|
| - bool expects_sync_requests,
|
| const MojoAsyncWaiter* waiter)
|
| : thunk_(this),
|
| filters_(std::move(filters)),
|
| @@ -95,15 +84,19 @@
|
| incoming_receiver_(nullptr),
|
| next_request_id_(0),
|
| testing_mode_(false),
|
| - pending_task_for_messages_(false),
|
| weak_factory_(this) {
|
| filters_.SetSink(&thunk_);
|
| - if (expects_sync_requests)
|
| - connector_.RegisterSyncHandleWatch();
|
| connector_.set_incoming_receiver(filters_.GetHead());
|
| }
|
|
|
| -Router::~Router() {}
|
| +Router::~Router() {
|
| + weak_factory_.InvalidateWeakPtrs();
|
| +
|
| + for (auto& pair : async_responders_)
|
| + delete pair.second;
|
| + for (auto& pair : sync_responders_)
|
| + delete pair.second;
|
| +}
|
|
|
| bool Router::Accept(Message* message) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
| @@ -126,21 +119,17 @@
|
|
|
| if (!message->has_flag(kMessageIsSync)) {
|
| // We assume ownership of |responder|.
|
| - async_responders_[request_id] = make_scoped_ptr(responder);
|
| + async_responders_[request_id] = responder;
|
| return true;
|
| }
|
|
|
| - if (!connector_.RegisterSyncHandleWatch())
|
| - return false;
|
| -
|
| - bool response_received = false;
|
| - scoped_ptr<MessageReceiver> sync_responder(responder);
|
| - sync_responses_.insert(std::make_pair(
|
| - request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
|
| -
|
| + sync_responders_[request_id] = responder;
|
| base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
|
| - do {
|
| - bool result = connector_.RunSyncHandleWatch(&response_received);
|
| + for (;;) {
|
| + // TODO(yzshen): Here we should allow incoming sync requests to re-enter and
|
| + // block async messages.
|
| + bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
|
| + // The message pipe has disconnected.
|
| if (!result)
|
| break;
|
|
|
| @@ -149,17 +138,9 @@
|
| break;
|
|
|
| // The corresponding response message has arrived.
|
| - DCHECK(response_received);
|
| - DCHECK(ContainsKey(sync_responses_, request_id));
|
| - auto iter = sync_responses_.find(request_id);
|
| - DCHECK_EQ(&response_received, iter->second->response_received);
|
| - scoped_ptr<Message> response = std::move(iter->second->response);
|
| - sync_responses_.erase(iter);
|
| - ignore_result(sync_responder->Accept(response.get()));
|
| - } while (false);
|
| -
|
| - if (weak_self)
|
| - connector_.UnregisterSyncHandleWatch();
|
| + if (sync_responders_.find(request_id) == sync_responders_.end())
|
| + break;
|
| + }
|
|
|
| // Return true means that we take ownership of |responder|.
|
| return true;
|
| @@ -173,51 +154,6 @@
|
|
|
| bool Router::HandleIncomingMessage(Message* message) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
| -
|
| - const bool during_sync_call =
|
| - connector_.during_sync_handle_watcher_callback();
|
| - if (!message->has_flag(kMessageIsSync) &&
|
| - (during_sync_call || !pending_messages_.empty())) {
|
| - scoped_ptr<Message> pending_message(new Message);
|
| - message->MoveTo(pending_message.get());
|
| - pending_messages_.push(std::move(pending_message));
|
| -
|
| - if (!pending_task_for_messages_) {
|
| - pending_task_for_messages_ = true;
|
| - base::MessageLoop::current()->PostTask(
|
| - FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
|
| - weak_factory_.GetWeakPtr()));
|
| - }
|
| -
|
| - return true;
|
| - }
|
| -
|
| - return HandleMessageInternal(message);
|
| -}
|
| -
|
| -void Router::HandleQueuedMessages() {
|
| - DCHECK(thread_checker_.CalledOnValidThread());
|
| - DCHECK(pending_task_for_messages_);
|
| -
|
| - base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
|
| - while (!pending_messages_.empty()) {
|
| - scoped_ptr<Message> message(std::move(pending_messages_.front()));
|
| - pending_messages_.pop();
|
| -
|
| - bool result = HandleMessageInternal(message.get());
|
| - if (!weak_self)
|
| - return;
|
| -
|
| - if (!result && !testing_mode_) {
|
| - connector_.RaiseError();
|
| - break;
|
| - }
|
| - }
|
| -
|
| - pending_task_for_messages_ = false;
|
| -}
|
| -
|
| -bool Router::HandleMessageInternal(Message* message) {
|
| if (message->has_flag(kMessageExpectsResponse)) {
|
| if (!incoming_receiver_)
|
| return false;
|
| @@ -230,28 +166,20 @@
|
| return ok;
|
|
|
| } else if (message->has_flag(kMessageIsResponse)) {
|
| + ResponderMap& responder_map = message->has_flag(kMessageIsSync)
|
| + ? sync_responders_
|
| + : async_responders_;
|
| uint64_t request_id = message->request_id();
|
| -
|
| - if (message->has_flag(kMessageIsSync)) {
|
| - auto it = sync_responses_.find(request_id);
|
| - if (it == sync_responses_.end()) {
|
| - DCHECK(testing_mode_);
|
| - return false;
|
| - }
|
| - it->second->response.reset(new Message());
|
| - message->MoveTo(it->second->response.get());
|
| - *it->second->response_received = true;
|
| - return true;
|
| - }
|
| -
|
| - auto it = async_responders_.find(request_id);
|
| - if (it == async_responders_.end()) {
|
| + ResponderMap::iterator it = responder_map.find(request_id);
|
| + if (it == responder_map.end()) {
|
| DCHECK(testing_mode_);
|
| return false;
|
| }
|
| - scoped_ptr<MessageReceiver> responder = std::move(it->second);
|
| - async_responders_.erase(it);
|
| - return responder->Accept(message);
|
| + MessageReceiver* responder = it->second;
|
| + responder_map.erase(it);
|
| + bool ok = responder->Accept(message);
|
| + delete responder;
|
| + return ok;
|
| } else {
|
| if (!incoming_receiver_)
|
| return false;
|
|
|