Chromium Code Reviews| Index: mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
| diff --git a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
| index 8b32b30b4fbefe1a5a07e1bb089b97e337d9be74..61ba1989d773a52acf1eaabbf6d4f28e27631518 100644 |
| --- a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
| +++ b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
| @@ -10,12 +10,16 @@ |
| #include "base/macros.h" |
| #include "base/memory/ptr_util.h" |
| #include "base/memory/ref_counted.h" |
| +#include "base/stl_util.h" |
| +#include "base/synchronization/waitable_event.h" |
| #include "base/task_runner.h" |
| #include "base/threading/thread_task_runner_handle.h" |
| #include "mojo/public/cpp/bindings/associated_group.h" |
| #include "mojo/public/cpp/bindings/associated_interface_ptr.h" |
| #include "mojo/public/cpp/bindings/interface_ptr.h" |
| #include "mojo/public/cpp/bindings/message.h" |
| +#include "mojo/public/cpp/bindings/sync_call_restrictions.h" |
| +#include "mojo/public/cpp/bindings/sync_event_watcher.h" |
| namespace mojo { |
| @@ -46,9 +50,15 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
| task_runner_(task_runner), |
| forward_(forward), |
| forward_with_responder_(forward_with_responder), |
| - associated_group_(associated_group) {} |
| + associated_group_(associated_group), |
| + sync_calls_(new InProgressSyncCalls()) {} |
| - ~ThreadSafeForwarder() override {} |
| + ~ThreadSafeForwarder() override { |
| + // If there are ongoing sync calls signal their completion now. |
| + base::AutoLock l(sync_calls_->lock); |
| + for (const auto& pending_response : sync_calls_->pending_responses) |
| + pending_response->event.Signal(); |
| + } |
| ProxyType& proxy() { return proxy_; } |
| @@ -75,34 +85,128 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
| bool AcceptWithResponder( |
| Message* message, |
| - std::unique_ptr<MessageReceiver> response_receiver) override { |
| + std::unique_ptr<MessageReceiver> responder) override { |
| if (!message->associated_endpoint_handles()->empty()) { |
| // Please see comment for the DCHECK in the previous method. |
| DCHECK(associated_group_.GetController()); |
| message->SerializeAssociatedEndpointHandles( |
| associated_group_.GetController()); |
| } |
| - auto responder = |
| - base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver)); |
| + |
| + // Async messages are always posted (even if |task_runner_| runs tasks on |
| + // this thread) to guarantee that two async calls can't be reordered. |
| + if (!message->has_flag(Message::kFlagIsSync)) { |
| + auto reply_forwarder = |
| + base::MakeUnique<ForwardToCallingThread>(std::move(responder)); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
| + base::Passed(&reply_forwarder))); |
| + return true; |
| + } |
| + |
| + SyncCallRestrictions::AssertSyncCallAllowed(); |
| + |
| + // If the InterfacePtr is bound to this thread, dispatch it directly. |
| + if (task_runner_->RunsTasksOnCurrentThread()) { |
| + forward_with_responder_.Run(std::move(*message), std::move(responder)); |
| + return true; |
| + } |
| + |
| + // If the InterfacePtr is bound on another thread, post the call. |
| + // TODO(yzshen, watk): We block both this thread and the InterfacePtr |
| + // thread. Ideally only this thread would block. |
| + auto response = make_scoped_refptr(new SyncResponseInfo()); |
| + auto response_signaler = base::MakeUnique<SyncResponseSignaler>(response); |
| task_runner_->PostTask( |
| FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
| - base::Passed(&responder))); |
| + base::Passed(&response_signaler))); |
| + |
| + // Save the pending SyncResponseInfo so that if the sync call deletes |
| + // |this|, we can signal the completion of the call to return from |
| + // SyncWatch(). |
| + auto sync_calls = sync_calls_; |
| + { |
| + base::AutoLock l(sync_calls->lock); |
| + sync_calls->pending_responses.push_back(response.get()); |
| + } |
| + |
| + auto assign_true = [](bool* b) { *b = true; }; |
| + bool event_signaled = false; |
| + SyncEventWatcher watcher(&response->event, |
| + base::Bind(assign_true, &event_signaled)); |
| + watcher.SyncWatch(&event_signaled); |
| + |
| + { |
| + base::AutoLock l(sync_calls->lock); |
| + base::Erase(sync_calls->pending_responses, response.get()); |
| + } |
| + |
| + if (event_signaled && response->received) |
|
yzshen1
2017/03/30 20:53:06
Do we need to check |event_signaled|?
watk
2017/03/31 00:27:27
Can't think of a reason. Removed.
|
| + ignore_result(responder->Accept(&response->message)); |
| + |
| return true; |
| } |
| + // Data that we need to share between the threads involved in a sync call. |
| + struct SyncResponseInfo |
| + : public base::RefCountedThreadSafe<SyncResponseInfo> { |
| + Message message; |
| + bool received = false; |
| + base::WaitableEvent event{base::WaitableEvent::ResetPolicy::MANUAL, |
| + base::WaitableEvent::InitialState::NOT_SIGNALED}; |
| + |
| + private: |
| + friend class base::RefCountedThreadSafe<SyncResponseInfo>; |
| + }; |
| + |
| + // A MessageReceiver that signals |response| when it either accepts the |
| + // response message, or is destructed. |
| + class SyncResponseSignaler : public MessageReceiver { |
| + public: |
| + explicit SyncResponseSignaler(scoped_refptr<SyncResponseInfo> response) |
| + : response_(response) {} |
| + |
| + ~SyncResponseSignaler() override { |
| + // If Accept() was not called we must still notify the waiter that the |
| + // sync call is finished. |
| + if (response_) |
| + response_->event.Signal(); |
| + } |
| + |
| + bool Accept(Message* message) { |
| + response_->message = std::move(*message); |
| + response_->received = true; |
| + response_->event.Signal(); |
| + response_ = nullptr; |
| + return true; |
| + } |
| + |
| + private: |
| + scoped_refptr<SyncResponseInfo> response_; |
| + }; |
| + |
| + // A record of the pending sync responses for canceling pending sync calls |
| + // when the owning ThreadSafeForwarder is destructed. |
| + struct InProgressSyncCalls |
| + : public base::RefCountedThreadSafe<InProgressSyncCalls> { |
| + // |lock| protects access to |pending_responses|. |
| + base::Lock lock; |
| + std::vector<SyncResponseInfo*> pending_responses; |
| + }; |
| + |
| class ForwardToCallingThread : public MessageReceiver { |
| public: |
| explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) |
| : responder_(std::move(responder)), |
| - caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) { |
| - } |
| + caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {} |
| private: |
| bool Accept(Message* message) { |
| // The current instance will be deleted when this method returns, so we |
| // have to relinquish the responder's ownership so it does not get |
| // deleted. |
| - caller_task_runner_->PostTask(FROM_HERE, |
| + caller_task_runner_->PostTask( |
| + FROM_HERE, |
| base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, |
| base::Passed(std::move(responder_)), |
| base::Passed(std::move(*message)))); |
| @@ -124,6 +228,7 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
| const ForwardMessageCallback forward_; |
| const ForwardMessageWithResponderCallback forward_with_responder_; |
| AssociatedGroup associated_group_; |
| + scoped_refptr<InProgressSyncCalls> sync_calls_; |
| DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); |
| }; |