Index: mojo/public/cpp/bindings/lib/router.cc |
diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc |
index a78cdc706b168357b317d409234cd185f888fa12..9bc9aa16ec425a7f55be34284b774bbd056b208b 100644 |
--- a/mojo/public/cpp/bindings/lib/router.cc |
+++ b/mojo/public/cpp/bindings/lib/router.cc |
@@ -7,10 +7,7 @@ |
#include <stdint.h> |
#include <utility> |
-#include "base/bind.h" |
#include "base/logging.h" |
-#include "base/message_loop/message_loop.h" |
-#include "base/stl_util.h" |
namespace mojo { |
namespace internal { |
@@ -63,13 +60,6 @@ |
// ---------------------------------------------------------------------------- |
-Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received) |
- : response_received(in_response_received) {} |
- |
-Router::SyncResponseInfo::~SyncResponseInfo() {} |
- |
-// ---------------------------------------------------------------------------- |
- |
Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router) |
: router_(router) { |
} |
@@ -85,7 +75,6 @@ |
Router::Router(ScopedMessagePipeHandle message_pipe, |
FilterChain filters, |
- bool expects_sync_requests, |
const MojoAsyncWaiter* waiter) |
: thunk_(this), |
filters_(std::move(filters)), |
@@ -95,15 +84,19 @@ |
incoming_receiver_(nullptr), |
next_request_id_(0), |
testing_mode_(false), |
- pending_task_for_messages_(false), |
weak_factory_(this) { |
filters_.SetSink(&thunk_); |
- if (expects_sync_requests) |
- connector_.RegisterSyncHandleWatch(); |
connector_.set_incoming_receiver(filters_.GetHead()); |
} |
-Router::~Router() {} |
+Router::~Router() { |
+ weak_factory_.InvalidateWeakPtrs(); |
+ |
+ for (auto& pair : async_responders_) |
+ delete pair.second; |
+ for (auto& pair : sync_responders_) |
+ delete pair.second; |
+} |
bool Router::Accept(Message* message) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
@@ -126,21 +119,17 @@ |
if (!message->has_flag(kMessageIsSync)) { |
// We assume ownership of |responder|. |
- async_responders_[request_id] = make_scoped_ptr(responder); |
+ async_responders_[request_id] = responder; |
return true; |
} |
- if (!connector_.RegisterSyncHandleWatch()) |
- return false; |
- |
- bool response_received = false; |
- scoped_ptr<MessageReceiver> sync_responder(responder); |
- sync_responses_.insert(std::make_pair( |
- request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); |
- |
+ sync_responders_[request_id] = responder; |
base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
- do { |
- bool result = connector_.RunSyncHandleWatch(&response_received); |
+ for (;;) { |
+ // TODO(yzshen): Here we should allow incoming sync requests to re-enter and |
+ // block async messages. |
+ bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE); |
+ // The message pipe has disconnected. |
if (!result) |
break; |
@@ -149,17 +138,9 @@ |
break; |
// The corresponding response message has arrived. |
- DCHECK(response_received); |
- DCHECK(ContainsKey(sync_responses_, request_id)); |
- auto iter = sync_responses_.find(request_id); |
- DCHECK_EQ(&response_received, iter->second->response_received); |
- scoped_ptr<Message> response = std::move(iter->second->response); |
- sync_responses_.erase(iter); |
- ignore_result(sync_responder->Accept(response.get())); |
- } while (false); |
- |
- if (weak_self) |
- connector_.UnregisterSyncHandleWatch(); |
+ if (sync_responders_.find(request_id) == sync_responders_.end()) |
+ break; |
+ } |
// Return true means that we take ownership of |responder|. |
return true; |
@@ -173,51 +154,6 @@ |
bool Router::HandleIncomingMessage(Message* message) { |
DCHECK(thread_checker_.CalledOnValidThread()); |
- |
- const bool during_sync_call = |
- connector_.during_sync_handle_watcher_callback(); |
- if (!message->has_flag(kMessageIsSync) && |
- (during_sync_call || !pending_messages_.empty())) { |
- scoped_ptr<Message> pending_message(new Message); |
- message->MoveTo(pending_message.get()); |
- pending_messages_.push(std::move(pending_message)); |
- |
- if (!pending_task_for_messages_) { |
- pending_task_for_messages_ = true; |
- base::MessageLoop::current()->PostTask( |
- FROM_HERE, base::Bind(&Router::HandleQueuedMessages, |
- weak_factory_.GetWeakPtr())); |
- } |
- |
- return true; |
- } |
- |
- return HandleMessageInternal(message); |
-} |
- |
-void Router::HandleQueuedMessages() { |
- DCHECK(thread_checker_.CalledOnValidThread()); |
- DCHECK(pending_task_for_messages_); |
- |
- base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr(); |
- while (!pending_messages_.empty()) { |
- scoped_ptr<Message> message(std::move(pending_messages_.front())); |
- pending_messages_.pop(); |
- |
- bool result = HandleMessageInternal(message.get()); |
- if (!weak_self) |
- return; |
- |
- if (!result && !testing_mode_) { |
- connector_.RaiseError(); |
- break; |
- } |
- } |
- |
- pending_task_for_messages_ = false; |
-} |
- |
-bool Router::HandleMessageInternal(Message* message) { |
if (message->has_flag(kMessageExpectsResponse)) { |
if (!incoming_receiver_) |
return false; |
@@ -230,28 +166,20 @@ |
return ok; |
} else if (message->has_flag(kMessageIsResponse)) { |
+ ResponderMap& responder_map = message->has_flag(kMessageIsSync) |
+ ? sync_responders_ |
+ : async_responders_; |
uint64_t request_id = message->request_id(); |
- |
- if (message->has_flag(kMessageIsSync)) { |
- auto it = sync_responses_.find(request_id); |
- if (it == sync_responses_.end()) { |
- DCHECK(testing_mode_); |
- return false; |
- } |
- it->second->response.reset(new Message()); |
- message->MoveTo(it->second->response.get()); |
- *it->second->response_received = true; |
- return true; |
- } |
- |
- auto it = async_responders_.find(request_id); |
- if (it == async_responders_.end()) { |
+ ResponderMap::iterator it = responder_map.find(request_id); |
+ if (it == responder_map.end()) { |
DCHECK(testing_mode_); |
return false; |
} |
- scoped_ptr<MessageReceiver> responder = std::move(it->second); |
- async_responders_.erase(it); |
- return responder->Accept(message); |
+ MessageReceiver* responder = it->second; |
+ responder_map.erase(it); |
+ bool ok = responder->Accept(message); |
+ delete responder; |
+ return ok; |
} else { |
if (!incoming_receiver_) |
return false; |