| Index: mojo/public/cpp/bindings/thread_safe_interface_ptr.h
|
| diff --git a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
|
| index 8b32b30b4fbefe1a5a07e1bb089b97e337d9be74..740687f37984f4864f9ddebf445e897bb346b8f1 100644
|
| --- a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
|
| +++ b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
|
| @@ -10,12 +10,26 @@
|
| #include "base/macros.h"
|
| #include "base/memory/ptr_util.h"
|
| #include "base/memory/ref_counted.h"
|
| +#include "base/stl_util.h"
|
| +#include "base/synchronization/waitable_event.h"
|
| #include "base/task_runner.h"
|
| #include "base/threading/thread_task_runner_handle.h"
|
| #include "mojo/public/cpp/bindings/associated_group.h"
|
| #include "mojo/public/cpp/bindings/associated_interface_ptr.h"
|
| #include "mojo/public/cpp/bindings/interface_ptr.h"
|
| #include "mojo/public/cpp/bindings/message.h"
|
| +#include "mojo/public/cpp/bindings/sync_call_restrictions.h"
|
| +#include "mojo/public/cpp/bindings/sync_event_watcher.h"
|
| +
|
| +// ThreadSafeInterfacePtr wraps a non-thread-safe InterfacePtr and proxies
|
| +// messages to it. Async calls are posted to the thread that the InteracePtr is
|
| +// bound to, and the responses are posted back. Sync calls are dispatched
|
| +// directly if the call is made on the thread that the wrapped InterfacePtr is
|
| +// bound to, or posted otherwise. It's important to be aware that sync calls
|
| +// block both the calling thread and the InterfacePtr thread. That means that
|
| +// you cannot make sync calls through a ThreadSafeInterfacePtr if the
|
| +// underlying InterfacePtr is bound to a thread that cannot block, like the IO
|
| +// thread.
|
|
|
| namespace mojo {
|
|
|
| @@ -46,9 +60,15 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
|
| task_runner_(task_runner),
|
| forward_(forward),
|
| forward_with_responder_(forward_with_responder),
|
| - associated_group_(associated_group) {}
|
| + associated_group_(associated_group),
|
| + sync_calls_(new InProgressSyncCalls()) {}
|
|
|
| - ~ThreadSafeForwarder() override {}
|
| + ~ThreadSafeForwarder() override {
|
| + // If there are ongoing sync calls signal their completion now.
|
| + base::AutoLock l(sync_calls_->lock);
|
| + for (const auto& pending_response : sync_calls_->pending_responses)
|
| + pending_response->event.Signal();
|
| + }
|
|
|
| ProxyType& proxy() { return proxy_; }
|
|
|
| @@ -75,34 +95,128 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
|
|
|
| bool AcceptWithResponder(
|
| Message* message,
|
| - std::unique_ptr<MessageReceiver> response_receiver) override {
|
| + std::unique_ptr<MessageReceiver> responder) override {
|
| if (!message->associated_endpoint_handles()->empty()) {
|
| // Please see comment for the DCHECK in the previous method.
|
| DCHECK(associated_group_.GetController());
|
| message->SerializeAssociatedEndpointHandles(
|
| associated_group_.GetController());
|
| }
|
| - auto responder =
|
| - base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver));
|
| +
|
| + // Async messages are always posted (even if |task_runner_| runs tasks on
|
| + // this thread) to guarantee that two async calls can't be reordered.
|
| + if (!message->has_flag(Message::kFlagIsSync)) {
|
| + auto reply_forwarder =
|
| + base::MakeUnique<ForwardToCallingThread>(std::move(responder));
|
| + task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
|
| + base::Passed(&reply_forwarder)));
|
| + return true;
|
| + }
|
| +
|
| + SyncCallRestrictions::AssertSyncCallAllowed();
|
| +
|
| + // If the InterfacePtr is bound to this thread, dispatch it directly.
|
| + if (task_runner_->RunsTasksOnCurrentThread()) {
|
| + forward_with_responder_.Run(std::move(*message), std::move(responder));
|
| + return true;
|
| + }
|
| +
|
| + // If the InterfacePtr is bound on another thread, post the call.
|
| + // TODO(yzshen, watk): We block both this thread and the InterfacePtr
|
| + // thread. Ideally only this thread would block.
|
| + auto response = make_scoped_refptr(new SyncResponseInfo());
|
| + auto response_signaler = base::MakeUnique<SyncResponseSignaler>(response);
|
| task_runner_->PostTask(
|
| FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
|
| - base::Passed(&responder)));
|
| + base::Passed(&response_signaler)));
|
| +
|
| + // Save the pending SyncResponseInfo so that if the sync call deletes
|
| + // |this|, we can signal the completion of the call to return from
|
| + // SyncWatch().
|
| + auto sync_calls = sync_calls_;
|
| + {
|
| + base::AutoLock l(sync_calls->lock);
|
| + sync_calls->pending_responses.push_back(response.get());
|
| + }
|
| +
|
| + auto assign_true = [](bool* b) { *b = true; };
|
| + bool event_signaled = false;
|
| + SyncEventWatcher watcher(&response->event,
|
| + base::Bind(assign_true, &event_signaled));
|
| + watcher.SyncWatch(&event_signaled);
|
| +
|
| + {
|
| + base::AutoLock l(sync_calls->lock);
|
| + base::Erase(sync_calls->pending_responses, response.get());
|
| + }
|
| +
|
| + if (response->received)
|
| + ignore_result(responder->Accept(&response->message));
|
| +
|
| return true;
|
| }
|
|
|
| + // Data that we need to share between the threads involved in a sync call.
|
| + struct SyncResponseInfo
|
| + : public base::RefCountedThreadSafe<SyncResponseInfo> {
|
| + Message message;
|
| + bool received = false;
|
| + base::WaitableEvent event{base::WaitableEvent::ResetPolicy::MANUAL,
|
| + base::WaitableEvent::InitialState::NOT_SIGNALED};
|
| +
|
| + private:
|
| + friend class base::RefCountedThreadSafe<SyncResponseInfo>;
|
| + };
|
| +
|
| + // A MessageReceiver that signals |response| when it either accepts the
|
| + // response message, or is destructed.
|
| + class SyncResponseSignaler : public MessageReceiver {
|
| + public:
|
| + explicit SyncResponseSignaler(scoped_refptr<SyncResponseInfo> response)
|
| + : response_(response) {}
|
| +
|
| + ~SyncResponseSignaler() override {
|
| + // If Accept() was not called we must still notify the waiter that the
|
| + // sync call is finished.
|
| + if (response_)
|
| + response_->event.Signal();
|
| + }
|
| +
|
| + bool Accept(Message* message) {
|
| + response_->message = std::move(*message);
|
| + response_->received = true;
|
| + response_->event.Signal();
|
| + response_ = nullptr;
|
| + return true;
|
| + }
|
| +
|
| + private:
|
| + scoped_refptr<SyncResponseInfo> response_;
|
| + };
|
| +
|
| + // A record of the pending sync responses for canceling pending sync calls
|
| + // when the owning ThreadSafeForwarder is destructed.
|
| + struct InProgressSyncCalls
|
| + : public base::RefCountedThreadSafe<InProgressSyncCalls> {
|
| + // |lock| protects access to |pending_responses|.
|
| + base::Lock lock;
|
| + std::vector<SyncResponseInfo*> pending_responses;
|
| + };
|
| +
|
| class ForwardToCallingThread : public MessageReceiver {
|
| public:
|
| explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder)
|
| : responder_(std::move(responder)),
|
| - caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
|
| - }
|
| + caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
|
|
|
| private:
|
| bool Accept(Message* message) {
|
| // The current instance will be deleted when this method returns, so we
|
| // have to relinquish the responder's ownership so it does not get
|
| // deleted.
|
| - caller_task_runner_->PostTask(FROM_HERE,
|
| + caller_task_runner_->PostTask(
|
| + FROM_HERE,
|
| base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder,
|
| base::Passed(std::move(responder_)),
|
| base::Passed(std::move(*message))));
|
| @@ -124,6 +238,7 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
|
| const ForwardMessageCallback forward_;
|
| const ForwardMessageWithResponderCallback forward_with_responder_;
|
| AssociatedGroup associated_group_;
|
| + scoped_refptr<InProgressSyncCalls> sync_calls_;
|
|
|
| DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder);
|
| };
|
|
|