| Index: mojo/public/cpp/bindings/thread_safe_interface_ptr.h
|
| diff --git a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
|
| index 8b32b30b4fbefe1a5a07e1bb089b97e337d9be74..a11992ad1c60d54ace748e09a6aa63774153f8e0 100644
|
| --- a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
|
| +++ b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
|
| @@ -16,8 +16,14 @@
|
| #include "mojo/public/cpp/bindings/associated_interface_ptr.h"
|
| #include "mojo/public/cpp/bindings/interface_ptr.h"
|
| #include "mojo/public/cpp/bindings/message.h"
|
| +#include "mojo/public/cpp/bindings/sync_event_watcher.h"
|
|
|
| namespace mojo {
|
| +namespace {
|
| +void AssignTrue(bool* b) {
|
| + *b = true;
|
| +}
|
| +} // namespace
|
|
|
| // Instances of this class may be used from any thread to serialize |Interface|
|
| // messages and forward them elsewhere. In general you should use one of the
|
| @@ -46,7 +52,10 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
|
| task_runner_(task_runner),
|
| forward_(forward),
|
| forward_with_responder_(forward_with_responder),
|
| - associated_group_(associated_group) {}
|
| + associated_group_(associated_group),
|
| + sync_message_event_(base::WaitableEvent::ResetPolicy::MANUAL,
|
| + base::WaitableEvent::InitialState::NOT_SIGNALED),
|
| + weak_factory_(this) {}
|
|
|
| ~ThreadSafeForwarder() override {}
|
|
|
| @@ -75,27 +84,81 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
|
|
|
| bool AcceptWithResponder(
|
| Message* message,
|
| - std::unique_ptr<MessageReceiver> response_receiver) override {
|
| + std::unique_ptr<MessageReceiver> responder) override {
|
| if (!message->associated_endpoint_handles()->empty()) {
|
| // Please see comment for the DCHECK in the previous method.
|
| DCHECK(associated_group_.GetController());
|
| message->SerializeAssociatedEndpointHandles(
|
| associated_group_.GetController());
|
| }
|
| - auto responder =
|
| - base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver));
|
| +
|
| + if (!message->has_flag(Message::kFlagIsSync)) {
|
| + auto reply_forwarder =
|
| + base::MakeUnique<ForwardToCallingThread>(std::move(responder));
|
| + task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
|
| + base::Passed(&reply_forwarder)));
|
| + return true;
|
| + }
|
| +
|
| + // Sync call on this thread.
|
| + if (task_runner_->RunsTasksOnCurrentThread()) {
|
| + forward_with_responder_.Run(std::move(*message), std::move(responder));
|
| + return true;
|
| + }
|
| +
|
| + // Sync call on a different thread.
|
| + // TODO: allow the call to be async on the other thread.
|
| + Message response;
|
| + auto reply_signaler =
|
| + base::MakeUnique<SyncReplySignaler>(&response, &sync_message_event_);
|
| task_runner_->PostTask(
|
| FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
|
| - base::Passed(&responder)));
|
| + base::Passed(&reply_signaler)));
|
| + bool response_received = false;
|
| + // XXX: What does it mean to have reentrant sync calls that are all waiting
|
| + // on the same sync_message_event_? Correct because only reentrant sync
|
| + // calls are allowed?
|
| + SyncEventWatcher watcher(&sync_message_event_,
|
| + base::Bind(&AssignTrue, &response_received));
|
| + watcher.AllowWokenUpBySyncWatchOnSameThread();
|
| + // XXX: Check that this handles the case that sync_message_event_ is already
|
| + // signaled.
|
| + auto weak_self = weak_factory_.GetWeakPtr();
|
| + watcher.SyncWatch(&response_received);
|
| + if (weak_self) {
|
| + sync_message_event_.Reset();
|
| + if (response_received)
|
| + ignore_result(responder->Accept(&response));
|
| + }
|
| +
|
| return true;
|
| }
|
|
|
| + class SyncReplySignaler : public MessageReceiver {
|
| + public:
|
| + SyncReplySignaler(Message* sync_response_destination,
|
| + base::WaitableEvent* sync_response_event)
|
| + : sync_response_destination_(sync_response_destination),
|
| + sync_response_event_(sync_response_event) {}
|
| +
|
| + private:
|
| + bool Accept(Message* message) {
|
| + *sync_response_destination_ = std::move(*message);
|
| + sync_response_event_->Signal();
|
| + return true;
|
| + }
|
| +
|
| + // XXX: What guarantees do we have about the lifetime of these?
|
| + Message* sync_response_destination_;
|
| + base::WaitableEvent* sync_response_event_;
|
| + };
|
| +
|
| class ForwardToCallingThread : public MessageReceiver {
|
| public:
|
| explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder)
|
| : responder_(std::move(responder)),
|
| - caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
|
| - }
|
| + caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
|
|
|
| private:
|
| bool Accept(Message* message) {
|
| @@ -125,6 +188,10 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
|
| const ForwardMessageWithResponderCallback forward_with_responder_;
|
| AssociatedGroup associated_group_;
|
|
|
| + // An event used to signal that sync messages are available.
|
| + base::WaitableEvent sync_message_event_;
|
| + base::WeakPtrFactory<ThreadSafeForwarder> weak_factory_;
|
| +
|
| DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder);
|
| };
|
|
|
|
|