Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(280)

Unified Diff: mojo/public/cpp/bindings/lib/router.cc

Issue 1713203002: Mojo C++ bindings: support sync methods - part 2 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/public/cpp/bindings/lib/router.h ('k') | mojo/public/cpp/bindings/lib/sync_handle_watcher.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/public/cpp/bindings/lib/router.cc
diff --git a/mojo/public/cpp/bindings/lib/router.cc b/mojo/public/cpp/bindings/lib/router.cc
index 9bc9aa16ec425a7f55be34284b774bbd056b208b..a78cdc706b168357b317d409234cd185f888fa12 100644
--- a/mojo/public/cpp/bindings/lib/router.cc
+++ b/mojo/public/cpp/bindings/lib/router.cc
@@ -7,7 +7,10 @@
#include <stdint.h>
#include <utility>
+#include "base/bind.h"
#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "base/stl_util.h"
namespace mojo {
namespace internal {
@@ -60,6 +63,13 @@ class ResponderThunk : public MessageReceiverWithStatus {
// ----------------------------------------------------------------------------
+Router::SyncResponseInfo::SyncResponseInfo(bool* in_response_received)
+ : response_received(in_response_received) {}
+
+Router::SyncResponseInfo::~SyncResponseInfo() {}
+
+// ----------------------------------------------------------------------------
+
Router::HandleIncomingMessageThunk::HandleIncomingMessageThunk(Router* router)
: router_(router) {
}
@@ -75,6 +85,7 @@ bool Router::HandleIncomingMessageThunk::Accept(Message* message) {
Router::Router(ScopedMessagePipeHandle message_pipe,
FilterChain filters,
+ bool expects_sync_requests,
const MojoAsyncWaiter* waiter)
: thunk_(this),
filters_(std::move(filters)),
@@ -84,19 +95,15 @@ Router::Router(ScopedMessagePipeHandle message_pipe,
incoming_receiver_(nullptr),
next_request_id_(0),
testing_mode_(false),
+ pending_task_for_messages_(false),
weak_factory_(this) {
filters_.SetSink(&thunk_);
+ if (expects_sync_requests)
+ connector_.RegisterSyncHandleWatch();
connector_.set_incoming_receiver(filters_.GetHead());
}
-Router::~Router() {
- weak_factory_.InvalidateWeakPtrs();
-
- for (auto& pair : async_responders_)
- delete pair.second;
- for (auto& pair : sync_responders_)
- delete pair.second;
-}
+Router::~Router() {}
bool Router::Accept(Message* message) {
DCHECK(thread_checker_.CalledOnValidThread());
@@ -119,17 +126,21 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
if (!message->has_flag(kMessageIsSync)) {
// We assume ownership of |responder|.
- async_responders_[request_id] = responder;
+ async_responders_[request_id] = make_scoped_ptr(responder);
return true;
}
- sync_responders_[request_id] = responder;
+ if (!connector_.RegisterSyncHandleWatch())
+ return false;
+
+ bool response_received = false;
+ scoped_ptr<MessageReceiver> sync_responder(responder);
+ sync_responses_.insert(std::make_pair(
+ request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
+
base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
- for (;;) {
- // TODO(yzshen): Here we should allow incoming sync requests to re-enter and
- // block async messages.
- bool result = WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
- // The message pipe has disconnected.
+ do {
+ bool result = connector_.RunSyncHandleWatch(&response_received);
if (!result)
break;
@@ -138,9 +149,17 @@ bool Router::AcceptWithResponder(Message* message, MessageReceiver* responder) {
break;
// The corresponding response message has arrived.
- if (sync_responders_.find(request_id) == sync_responders_.end())
- break;
- }
+ DCHECK(response_received);
+ DCHECK(ContainsKey(sync_responses_, request_id));
+ auto iter = sync_responses_.find(request_id);
+ DCHECK_EQ(&response_received, iter->second->response_received);
+ scoped_ptr<Message> response = std::move(iter->second->response);
+ sync_responses_.erase(iter);
+ ignore_result(sync_responder->Accept(response.get()));
+ } while (false);
+
+ if (weak_self)
+ connector_.UnregisterSyncHandleWatch();
// Return true means that we take ownership of |responder|.
return true;
@@ -154,6 +173,51 @@ void Router::EnableTestingMode() {
bool Router::HandleIncomingMessage(Message* message) {
DCHECK(thread_checker_.CalledOnValidThread());
+
+ const bool during_sync_call =
+ connector_.during_sync_handle_watcher_callback();
+ if (!message->has_flag(kMessageIsSync) &&
+ (during_sync_call || !pending_messages_.empty())) {
+ scoped_ptr<Message> pending_message(new Message);
+ message->MoveTo(pending_message.get());
+ pending_messages_.push(std::move(pending_message));
+
+ if (!pending_task_for_messages_) {
+ pending_task_for_messages_ = true;
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE, base::Bind(&Router::HandleQueuedMessages,
+ weak_factory_.GetWeakPtr()));
+ }
+
+ return true;
+ }
+
+ return HandleMessageInternal(message);
+}
+
+void Router::HandleQueuedMessages() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK(pending_task_for_messages_);
+
+ base::WeakPtr<Router> weak_self = weak_factory_.GetWeakPtr();
+ while (!pending_messages_.empty()) {
+ scoped_ptr<Message> message(std::move(pending_messages_.front()));
+ pending_messages_.pop();
+
+ bool result = HandleMessageInternal(message.get());
+ if (!weak_self)
+ return;
+
+ if (!result && !testing_mode_) {
+ connector_.RaiseError();
+ break;
+ }
+ }
+
+ pending_task_for_messages_ = false;
+}
+
+bool Router::HandleMessageInternal(Message* message) {
if (message->has_flag(kMessageExpectsResponse)) {
if (!incoming_receiver_)
return false;
@@ -166,20 +230,28 @@ bool Router::HandleIncomingMessage(Message* message) {
return ok;
} else if (message->has_flag(kMessageIsResponse)) {
- ResponderMap& responder_map = message->has_flag(kMessageIsSync)
- ? sync_responders_
- : async_responders_;
uint64_t request_id = message->request_id();
- ResponderMap::iterator it = responder_map.find(request_id);
- if (it == responder_map.end()) {
+
+ if (message->has_flag(kMessageIsSync)) {
+ auto it = sync_responses_.find(request_id);
+ if (it == sync_responses_.end()) {
+ DCHECK(testing_mode_);
+ return false;
+ }
+ it->second->response.reset(new Message());
+ message->MoveTo(it->second->response.get());
+ *it->second->response_received = true;
+ return true;
+ }
+
+ auto it = async_responders_.find(request_id);
+ if (it == async_responders_.end()) {
DCHECK(testing_mode_);
return false;
}
- MessageReceiver* responder = it->second;
- responder_map.erase(it);
- bool ok = responder->Accept(message);
- delete responder;
- return ok;
+ scoped_ptr<MessageReceiver> responder = std::move(it->second);
+ async_responders_.erase(it);
+ return responder->Accept(message);
} else {
if (!incoming_receiver_)
return false;
« no previous file with comments | « mojo/public/cpp/bindings/lib/router.h ('k') | mojo/public/cpp/bindings/lib/sync_handle_watcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698