Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(445)

Unified Diff: mojo/public/cpp/bindings/thread_safe_interface_ptr.h

Issue 2770153003: mojo: Support sync calls through ThreadSafeInterfacePtr (Closed)
Patch Set: Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/cpp/bindings/thread_safe_interface_ptr.h
diff --git a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
index 8b32b30b4fbefe1a5a07e1bb089b97e337d9be74..a11992ad1c60d54ace748e09a6aa63774153f8e0 100644
--- a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
+++ b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
@@ -16,8 +16,14 @@
#include "mojo/public/cpp/bindings/associated_interface_ptr.h"
#include "mojo/public/cpp/bindings/interface_ptr.h"
#include "mojo/public/cpp/bindings/message.h"
+#include "mojo/public/cpp/bindings/sync_event_watcher.h"
namespace mojo {
+namespace {
+void AssignTrue(bool* b) {
+ *b = true;
+}
+} // namespace
// Instances of this class may be used from any thread to serialize |Interface|
// messages and forward them elsewhere. In general you should use one of the
@@ -46,7 +52,10 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
task_runner_(task_runner),
forward_(forward),
forward_with_responder_(forward_with_responder),
- associated_group_(associated_group) {}
+ associated_group_(associated_group),
+ sync_message_event_(base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED),
+ weak_factory_(this) {}
~ThreadSafeForwarder() override {}
@@ -75,27 +84,81 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
bool AcceptWithResponder(
Message* message,
- std::unique_ptr<MessageReceiver> response_receiver) override {
+ std::unique_ptr<MessageReceiver> responder) override {
if (!message->associated_endpoint_handles()->empty()) {
// Please see comment for the DCHECK in the previous method.
DCHECK(associated_group_.GetController());
message->SerializeAssociatedEndpointHandles(
associated_group_.GetController());
}
- auto responder =
- base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver));
+
+ if (!message->has_flag(Message::kFlagIsSync)) {
+ auto reply_forwarder =
+ base::MakeUnique<ForwardToCallingThread>(std::move(responder));
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
+ base::Passed(&reply_forwarder)));
+ return true;
+ }
+
+ // Sync call on this thread.
+ if (task_runner_->RunsTasksOnCurrentThread()) {
+ forward_with_responder_.Run(std::move(*message), std::move(responder));
+ return true;
+ }
+
+ // Sync call on a different thread.
+ // TODO: allow the call to be async on the other thread.
+ Message response;
+ auto reply_signaler =
+ base::MakeUnique<SyncReplySignaler>(&response, &sync_message_event_);
task_runner_->PostTask(
FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
- base::Passed(&responder)));
+ base::Passed(&reply_signaler)));
+ bool response_received = false;
+ // XXX: What does it mean to have reentrant sync calls that are all waiting
+ // on the same sync_message_event_? Correct because only reentrant sync
+ // calls are allowed?
+ SyncEventWatcher watcher(&sync_message_event_,
+ base::Bind(&AssignTrue, &response_received));
+ watcher.AllowWokenUpBySyncWatchOnSameThread();
+ // XXX: Check that this handles the case that sync_message_event_ is already
+ // signaled.
+ auto weak_self = weak_factory_.GetWeakPtr();
+ watcher.SyncWatch(&response_received);
+ if (weak_self) {
+ sync_message_event_.Reset();
+ if (response_received)
+ ignore_result(responder->Accept(&response));
+ }
+
return true;
}
+ class SyncReplySignaler : public MessageReceiver {
+ public:
+ SyncReplySignaler(Message* sync_response_destination,
+ base::WaitableEvent* sync_response_event)
+ : sync_response_destination_(sync_response_destination),
+ sync_response_event_(sync_response_event) {}
+
+ private:
+ bool Accept(Message* message) {
+ *sync_response_destination_ = std::move(*message);
+ sync_response_event_->Signal();
+ return true;
+ }
+
+ // XXX: What guarantees do we have about the lifetime of these?
+ Message* sync_response_destination_;
+ base::WaitableEvent* sync_response_event_;
+ };
+
class ForwardToCallingThread : public MessageReceiver {
public:
explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder)
: responder_(std::move(responder)),
- caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
- }
+ caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
private:
bool Accept(Message* message) {
@@ -125,6 +188,10 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
const ForwardMessageWithResponderCallback forward_with_responder_;
AssociatedGroup associated_group_;
+ // An event used to signal that sync messages are available.
+ base::WaitableEvent sync_message_event_;
+ base::WeakPtrFactory<ThreadSafeForwarder> weak_factory_;
+
DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder);
};

Powered by Google App Engine
This is Rietveld 408576698