Index: mojo/public/cpp/bindings/lib/router.cc |
diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc |
index 9bc9aa16ec425a7f55be34284b774bbd056b208b..a78cdc706b168357b317d409234cd185f888fa12 100644 |
--- a/mojo/public/cpp/bindings/lib/router.cc |
+++ b/mojo/public/cpp/bindings/lib/router.cc |
@@ -7,7 +7,10 @@ |
#include <stdint.h> |
#include <utility> |
+#include "base/bind.h" |
#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/stl_util.h" |
namespace mojo { |
namespace internal { |
@@ -60,6 +63,13 @@ class ResponderThunk : public MessageReceiverWithStatus { |
// ---------------------------------------------------------------------------- |
+Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received) |
+ : response_received(in_response_received) {} |
+ |
+Router::SyncResponseInfo::~SyncResponseInfo() {} |
+ |
+// ---------------------------------------------------------------------------- |
+ |
Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) |
: router_(router) { |
} |
@@ -75,6 +85,7 @@ bool Router::HandleIncomingMessageThunk::Accept(Message* message) { |
Router::Router(ScopedMessagePipeHandle message_pipe, |
FilterChain filters, |
+ bool expects_sync_requests, |
const MojoAsyncWaiter* waiter) |
: thunk_(this), |
filters_(std::move(filters)), |
@@ -84,19 +95,15 @@ Router::Router(ScopedMessagePipeHandle message_pipe, |
incoming_receiver_(nullptr), |
next_request_id_(0), |
testing_mode_(false), |
+ pending_task_for_messages_(false), |
weak_factory_(this) { |
filters_.SetSink(&thunk_); |
+ if (expects_sync_requests) |
+ connector_.RegisterSyncHandleWatch(); |
connector_.set_incoming_receiver(filters_.GetHead()); |
} |
-Router::~Router() { |
- weak_factory_.InvalidateWeakPtrs(); |
- |
- for (auto& pair : async_responders_) |
- delete pair.second; |
- for (auto& pair : sync_responders_) |
- delete pair.second; |
-} |
+Router::~Router() {} |
bool Router::Accept(Message* message) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
@@ -119,17 +126,21 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { |
if (!message->has_flag(kMessageIsSync)) { |
// We assume ownership of |responder|. |
- async_responders_[request_id] = responder; |
+ async_responders_[request_id] = make_scoped_ptr(responder); |
return true; |
} |
- sync_responders_[request_id] = responder; |
+ if (!connector_.RegisterSyncHandleWatch()) |
+ return false; |
+ |
+ bool response_received = false; |
+ scoped_ptr<MessageReceiver> sync_responder(responder); |
+ sync_responses_.insert(std::make_pair( |
+ request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); |
+ |
base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
- for (;;) { |
- // TODO(yzshen): Here we should allow incoming sync requests to re-enter and |
- // block async messages. |
- bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); |
- // The message pipe has disconnected. |
+ do { |
+ bool result = connector_.RunSyncHandleWatch(&response_received); |
if (!result) |
break; |
@@ -138,9 +149,17 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) { |
break; |
// The corresponding response message has arrived. |
- if (sync_responders_.find(request_id) == sync_responders_.end()) |
- break; |
- } |
+ DCHECK(response_received); |
+ DCHECK(ContainsKey(sync_responses_, request_id)); |
+ auto iter = sync_responses_.find(request_id); |
+ DCHECK_EQ(&response_received, iter->second->response_received); |
+ scoped_ptr<Message> response = std::move(iter->second->response); |
+ sync_responses_.erase(iter); |
+ ignore_result(sync_responder->Accept(response.get())); |
+ } while (false); |
+ |
+ if (weak_self) |
+ connector_.UnregisterSyncHandleWatch(); |
// Return true means that we take ownership of |responder|. |
return true; |
@@ -154,6 +173,51 @@ void Router::EnableTestingMode() { |
bool Router::HandleIncomingMessage(Message* message) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ const bool during_sync_call = |
+ connector_.during_sync_handle_watcher_callback(); |
+ if (!message->has_flag(kMessageIsSync) && |
+ (during_sync_call || !pending_messages_.empty())) { |
+ scoped_ptr<Message> pending_message(new Message); |
+ message->MoveTo(pending_message.get()); |
+ pending_messages_.push(std::move(pending_message)); |
+ |
+ if (!pending_task_for_messages_) { |
+ pending_task_for_messages_ = true; |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, base::Bind(&Router::HandleQueuedMessages, |
+ weak_factory_.GetWeakPtr())); |
+ } |
+ |
+ return true; |
+ } |
+ |
+ return HandleMessageInternal(message); |
+} |
+ |
+void Router::HandleQueuedMessages() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(pending_task_for_messages_); |
+ |
+ base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
+ while (!pending_messages_.empty()) { |
+ scoped_ptr<Message> message(std::move(pending_messages_.front())); |
+ pending_messages_.pop(); |
+ |
+ bool result = HandleMessageInternal(message.get()); |
+ if (!weak_self) |
+ return; |
+ |
+ if (!result && !testing_mode_) { |
+ connector_.RaiseError(); |
+ break; |
+ } |
+ } |
+ |
+ pending_task_for_messages_ = false; |
+} |
+ |
+bool Router::HandleMessageInternal(Message* message) { |
if (message->has_flag(kMessageExpectsResponse)) { |
if (!incoming_receiver_) |
return false; |
@@ -166,20 +230,28 @@ bool Router::HandleIncomingMessage(Message* message) { |
return ok; |
} else if (message->has_flag(kMessageIsResponse)) { |
- ResponderMap& responder_map = message->has_flag(kMessageIsSync) |
- ? sync_responders_ |
- : async_responders_; |
uint64_t request_id = message->request_id(); |
- ResponderMap::iterator it = responder_map.find(request_id); |
- if (it == responder_map.end()) { |
+ |
+ if (message->has_flag(kMessageIsSync)) { |
+ auto it = sync_responses_.find(request_id); |
+ if (it == sync_responses_.end()) { |
+ DCHECK(testing_mode_); |
+ return false; |
+ } |
+ it->second->response.reset(new Message()); |
+ message->MoveTo(it->second->response.get()); |
+ *it->second->response_received = true; |
+ return true; |
+ } |
+ |
+ auto it = async_responders_.find(request_id); |
+ if (it == async_responders_.end()) { |
DCHECK(testing_mode_); |
return false; |
} |
- MessageReceiver* responder = it->second; |
- responder_map.erase(it); |
- bool ok = responder->Accept(message); |
- delete responder; |
- return ok; |
+ scoped_ptr<MessageReceiver> responder = std::move(it->second); |
+ async_responders_.erase(it); |
+ return responder->Accept(message); |
} else { |
if (!incoming_receiver_) |
return false; |