| Index: mojo/public/cpp/bindings/lib/router.cc
|
| diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc
|
| index 9bc9aa16ec425a7f55be34284b774bbd056b208b..a78cdc706b168357b317d409234cd185f888fa12 100644
|
| --- a/mojo/public/cpp/bindings/lib/router.cc
|
| +++ b/mojo/public/cpp/bindings/lib/router.cc
|
| @@ -7,7 +7,10 @@
|
| #include <stdint.h>
|
| #include <utility>
|
|
|
| +#include "base/bind.h"
|
| #include "base/logging.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "base/stl_util.h"
|
|
|
| namespace mojo {
|
| namespace internal {
|
| @@ -60,6 +63,13 @@ class ResponderThunk : public MessageReceiverWithStatus {
|
|
|
| // ----------------------------------------------------------------------------
|
|
|
| +Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received)
|
| + : response_received(in_response_received) {}
|
| +
|
| +Router::SyncResponseInfo::~SyncResponseInfo() {}
|
| +
|
| +// ----------------------------------------------------------------------------
|
| +
|
| Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
|
| : router_(router) {
|
| }
|
| @@ -75,6 +85,7 @@ bool Router::HandleIncomingMessageThunk::Accept(Message* message) {
|
|
|
| Router::Router(ScopedMessagePipeHandle message_pipe,
|
| FilterChain filters,
|
| + bool expects_sync_requests,
|
| const MojoAsyncWaiter* waiter)
|
| : thunk_(this),
|
| filters_(std::move(filters)),
|
| @@ -84,19 +95,15 @@ Router::Router(ScopedMessagePipeHandle message_pipe,
|
| incoming_receiver_(nullptr),
|
| next_request_id_(0),
|
| testing_mode_(false),
|
| + pending_task_for_messages_(false),
|
| weak_factory_(this) {
|
| filters_.SetSink(&thunk_);
|
| + if (expects_sync_requests)
|
| + connector_.RegisterSyncHandleWatch();
|
| connector_.set_incoming_receiver(filters_.GetHead());
|
| }
|
|
|
| -Router::~Router() {
|
| - weak_factory_.InvalidateWeakPtrs();
|
| -
|
| - for (auto& pair : async_responders_)
|
| - delete pair.second;
|
| - for (auto& pair : sync_responders_)
|
| - delete pair.second;
|
| -}
|
| +Router::~Router() {}
|
|
|
| bool Router::Accept(Message* message) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
| @@ -119,17 +126,21 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
|
|
|
| if (!message->has_flag(kMessageIsSync)) {
|
| // We assume ownership of |responder|.
|
| - async_responders_[request_id] = responder;
|
| + async_responders_[request_id] = make_scoped_ptr(responder);
|
| return true;
|
| }
|
|
|
| - sync_responders_[request_id] = responder;
|
| + if (!connector_.RegisterSyncHandleWatch())
|
| + return false;
|
| +
|
| + bool response_received = false;
|
| + scoped_ptr<MessageReceiver> sync_responder(responder);
|
| + sync_responses_.insert(std::make_pair(
|
| + request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
|
| +
|
| base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
|
| - for (;;) {
|
| - // TODO(yzshen): Here we should allow incoming sync requests to re-enter and
|
| - // block async messages.
|
| - bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
|
| - // The message pipe has disconnected.
|
| + do {
|
| + bool result = connector_.RunSyncHandleWatch(&response_received);
|
| if (!result)
|
| break;
|
|
|
| @@ -138,9 +149,17 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
|
| break;
|
|
|
| // The corresponding response message has arrived.
|
| - if (sync_responders_.find(request_id) == sync_responders_.end())
|
| - break;
|
| - }
|
| + DCHECK(response_received);
|
| + DCHECK(ContainsKey(sync_responses_, request_id));
|
| + auto iter = sync_responses_.find(request_id);
|
| + DCHECK_EQ(&response_received, iter->second->response_received);
|
| + scoped_ptr<Message> response = std::move(iter->second->response);
|
| + sync_responses_.erase(iter);
|
| + ignore_result(sync_responder->Accept(response.get()));
|
| + } while (false);
|
| +
|
| + if (weak_self)
|
| + connector_.UnregisterSyncHandleWatch();
|
|
|
| // Return true means that we take ownership of |responder|.
|
| return true;
|
| @@ -154,6 +173,51 @@ void Router::EnableTestingMode() {
|
|
|
| bool Router::HandleIncomingMessage(Message* message) {
|
| DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + const bool during_sync_call =
|
| + connector_.during_sync_handle_watcher_callback();
|
| + if (!message->has_flag(kMessageIsSync) &&
|
| + (during_sync_call || !pending_messages_.empty())) {
|
| + scoped_ptr<Message> pending_message(new Message);
|
| + message->MoveTo(pending_message.get());
|
| + pending_messages_.push(std::move(pending_message));
|
| +
|
| + if (!pending_task_for_messages_) {
|
| + pending_task_for_messages_ = true;
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
|
| + weak_factory_.GetWeakPtr()));
|
| + }
|
| +
|
| + return true;
|
| + }
|
| +
|
| + return HandleMessageInternal(message);
|
| +}
|
| +
|
| +void Router::HandleQueuedMessages() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + DCHECK(pending_task_for_messages_);
|
| +
|
| + base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
|
| + while (!pending_messages_.empty()) {
|
| + scoped_ptr<Message> message(std::move(pending_messages_.front()));
|
| + pending_messages_.pop();
|
| +
|
| + bool result = HandleMessageInternal(message.get());
|
| + if (!weak_self)
|
| + return;
|
| +
|
| + if (!result && !testing_mode_) {
|
| + connector_.RaiseError();
|
| + break;
|
| + }
|
| + }
|
| +
|
| + pending_task_for_messages_ = false;
|
| +}
|
| +
|
| +bool Router::HandleMessageInternal(Message* message) {
|
| if (message->has_flag(kMessageExpectsResponse)) {
|
| if (!incoming_receiver_)
|
| return false;
|
| @@ -166,20 +230,28 @@ bool Router::HandleIncomingMessage(Message* message) {
|
| return ok;
|
|
|
| } else if (message->has_flag(kMessageIsResponse)) {
|
| - ResponderMap& responder_map = message->has_flag(kMessageIsSync)
|
| - ? sync_responders_
|
| - : async_responders_;
|
| uint64_t request_id = message->request_id();
|
| - ResponderMap::iterator it = responder_map.find(request_id);
|
| - if (it == responder_map.end()) {
|
| +
|
| + if (message->has_flag(kMessageIsSync)) {
|
| + auto it = sync_responses_.find(request_id);
|
| + if (it == sync_responses_.end()) {
|
| + DCHECK(testing_mode_);
|
| + return false;
|
| + }
|
| + it->second->response.reset(new Message());
|
| + message->MoveTo(it->second->response.get());
|
| + *it->second->response_received = true;
|
| + return true;
|
| + }
|
| +
|
| + auto it = async_responders_.find(request_id);
|
| + if (it == async_responders_.end()) {
|
| DCHECK(testing_mode_);
|
| return false;
|
| }
|
| - MessageReceiver* responder = it->second;
|
| - responder_map.erase(it);
|
| - bool ok = responder->Accept(message);
|
| - delete responder;
|
| - return ok;
|
| + scoped_ptr<MessageReceiver> responder = std::move(it->second);
|
| + async_responders_.erase(it);
|
| + return responder->Accept(message);
|
| } else {
|
| if (!incoming_receiver_)
|
| return false;
|
|
|