Index: mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
diff --git a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
index 8b32b30b4fbefe1a5a07e1bb089b97e337d9be74..740687f37984f4864f9ddebf445e897bb346b8f1 100644 |
--- a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
+++ b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
@@ -10,12 +10,26 @@ |
#include "base/macros.h" |
#include "base/memory/ptr_util.h" |
#include "base/memory/ref_counted.h" |
+#include "base/stl_util.h" |
+#include "base/synchronization/waitable_event.h" |
#include "base/task_runner.h" |
#include "base/threading/thread_task_runner_handle.h" |
#include "mojo/public/cpp/bindings/associated_group.h" |
#include "mojo/public/cpp/bindings/associated_interface_ptr.h" |
#include "mojo/public/cpp/bindings/interface_ptr.h" |
#include "mojo/public/cpp/bindings/message.h" |
+#include "mojo/public/cpp/bindings/sync_call_restrictions.h" |
+#include "mojo/public/cpp/bindings/sync_event_watcher.h" |
+ |
+// ThreadSafeInterfacePtr wraps a non-thread-safe InterfacePtr and proxies |
+// messages to it. Async calls are posted to the thread that the InteracePtr is |
+// bound to, and the responses are posted back. Sync calls are dispatched |
+// directly if the call is made on the thread that the wrapped InterfacePtr is |
+// bound to, or posted otherwise. It's important to be aware that sync calls |
+// block both the calling thread and the InterfacePtr thread. That means that |
+// you cannot make sync calls through a ThreadSafeInterfacePtr if the |
+// underlying InterfacePtr is bound to a thread that cannot block, like the IO |
+// thread. |
namespace mojo { |
@@ -46,9 +60,15 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
task_runner_(task_runner), |
forward_(forward), |
forward_with_responder_(forward_with_responder), |
- associated_group_(associated_group) {} |
+ associated_group_(associated_group), |
+ sync_calls_(new InProgressSyncCalls()) {} |
- ~ThreadSafeForwarder() override {} |
+ ~ThreadSafeForwarder() override { |
+ // If there are ongoing sync calls signal their completion now. |
+ base::AutoLock l(sync_calls_->lock); |
+ for (const auto& pending_response : sync_calls_->pending_responses) |
+ pending_response->event.Signal(); |
+ } |
ProxyType& proxy() { return proxy_; } |
@@ -75,34 +95,128 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
bool AcceptWithResponder( |
Message* message, |
- std::unique_ptr<MessageReceiver> response_receiver) override { |
+ std::unique_ptr<MessageReceiver> responder) override { |
if (!message->associated_endpoint_handles()->empty()) { |
// Please see comment for the DCHECK in the previous method. |
DCHECK(associated_group_.GetController()); |
message->SerializeAssociatedEndpointHandles( |
associated_group_.GetController()); |
} |
- auto responder = |
- base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver)); |
+ |
+ // Async messages are always posted (even if |task_runner_| runs tasks on |
+ // this thread) to guarantee that two async calls can't be reordered. |
+ if (!message->has_flag(Message::kFlagIsSync)) { |
+ auto reply_forwarder = |
+ base::MakeUnique<ForwardToCallingThread>(std::move(responder)); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
+ base::Passed(&reply_forwarder))); |
+ return true; |
+ } |
+ |
+ SyncCallRestrictions::AssertSyncCallAllowed(); |
+ |
+ // If the InterfacePtr is bound to this thread, dispatch it directly. |
+ if (task_runner_->RunsTasksOnCurrentThread()) { |
+ forward_with_responder_.Run(std::move(*message), std::move(responder)); |
+ return true; |
+ } |
+ |
+ // If the InterfacePtr is bound on another thread, post the call. |
+ // TODO(yzshen, watk): We block both this thread and the InterfacePtr |
+ // thread. Ideally only this thread would block. |
+ auto response = make_scoped_refptr(new SyncResponseInfo()); |
+ auto response_signaler = base::MakeUnique<SyncResponseSignaler>(response); |
task_runner_->PostTask( |
FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
- base::Passed(&responder))); |
+ base::Passed(&response_signaler))); |
+ |
+ // Save the pending SyncResponseInfo so that if the sync call deletes |
+ // |this|, we can signal the completion of the call to return from |
+ // SyncWatch(). |
+ auto sync_calls = sync_calls_; |
+ { |
+ base::AutoLock l(sync_calls->lock); |
+ sync_calls->pending_responses.push_back(response.get()); |
+ } |
+ |
+ auto assign_true = [](bool* b) { *b = true; }; |
+ bool event_signaled = false; |
+ SyncEventWatcher watcher(&response->event, |
+ base::Bind(assign_true, &event_signaled)); |
+ watcher.SyncWatch(&event_signaled); |
+ |
+ { |
+ base::AutoLock l(sync_calls->lock); |
+ base::Erase(sync_calls->pending_responses, response.get()); |
+ } |
+ |
+ if (response->received) |
+ ignore_result(responder->Accept(&response->message)); |
+ |
return true; |
} |
+ // Data that we need to share between the threads involved in a sync call. |
+ struct SyncResponseInfo |
+ : public base::RefCountedThreadSafe<SyncResponseInfo> { |
+ Message message; |
+ bool received = false; |
+ base::WaitableEvent event{base::WaitableEvent::ResetPolicy::MANUAL, |
+ base::WaitableEvent::InitialState::NOT_SIGNALED}; |
+ |
+ private: |
+ friend class base::RefCountedThreadSafe<SyncResponseInfo>; |
+ }; |
+ |
+ // A MessageReceiver that signals |response| when it either accepts the |
+ // response message, or is destructed. |
+ class SyncResponseSignaler : public MessageReceiver { |
+ public: |
+ explicit SyncResponseSignaler(scoped_refptr<SyncResponseInfo> response) |
+ : response_(response) {} |
+ |
+ ~SyncResponseSignaler() override { |
+ // If Accept() was not called we must still notify the waiter that the |
+ // sync call is finished. |
+ if (response_) |
+ response_->event.Signal(); |
+ } |
+ |
+ bool Accept(Message* message) { |
+ response_->message = std::move(*message); |
+ response_->received = true; |
+ response_->event.Signal(); |
+ response_ = nullptr; |
+ return true; |
+ } |
+ |
+ private: |
+ scoped_refptr<SyncResponseInfo> response_; |
+ }; |
+ |
+ // A record of the pending sync responses for canceling pending sync calls |
+ // when the owning ThreadSafeForwarder is destructed. |
+ struct InProgressSyncCalls |
+ : public base::RefCountedThreadSafe<InProgressSyncCalls> { |
+ // |lock| protects access to |pending_responses|. |
+ base::Lock lock; |
+ std::vector<SyncResponseInfo*> pending_responses; |
+ }; |
+ |
class ForwardToCallingThread : public MessageReceiver { |
public: |
explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) |
: responder_(std::move(responder)), |
- caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) { |
- } |
+ caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {} |
private: |
bool Accept(Message* message) { |
// The current instance will be deleted when this method returns, so we |
// have to relinquish the responder's ownership so it does not get |
// deleted. |
- caller_task_runner_->PostTask(FROM_HERE, |
+ caller_task_runner_->PostTask( |
+ FROM_HERE, |
base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, |
base::Passed(std::move(responder_)), |
base::Passed(std::move(*message)))); |
@@ -124,6 +238,7 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
const ForwardMessageCallback forward_; |
const ForwardMessageWithResponderCallback forward_with_responder_; |
AssociatedGroup associated_group_; |
+ scoped_refptr<InProgressSyncCalls> sync_calls_; |
DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); |
}; |