Chromium Code Reviews| Index: mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
| diff --git a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
| index 8b32b30b4fbefe1a5a07e1bb089b97e337d9be74..d39f969b1d635f403005fe6bd25729021361579f 100644 |
| --- a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
| +++ b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
| @@ -16,8 +16,14 @@ |
| #include "mojo/public/cpp/bindings/associated_interface_ptr.h" |
| #include "mojo/public/cpp/bindings/interface_ptr.h" |
| #include "mojo/public/cpp/bindings/message.h" |
| +#include "mojo/public/cpp/bindings/sync_event_watcher.h" |
| namespace mojo { |
| +namespace { |
|
yzshen1
2017/03/27 22:18:25
It is pretty uncommon and not recommended to use a
watk
2017/03/27 23:45:28
Oops, forgot this was a header.
|
| +void AssignTrue(bool* b) { |
| + *b = true; |
| +} |
| +} // namespace |
| // Instances of this class may be used from any thread to serialize |Interface| |
| // messages and forward them elsewhere. In general you should use one of the |
| @@ -46,7 +52,10 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
| task_runner_(task_runner), |
| forward_(forward), |
| forward_with_responder_(forward_with_responder), |
| - associated_group_(associated_group) {} |
| + associated_group_(associated_group), |
| + sync_message_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
| + base::WaitableEvent::InitialState::NOT_SIGNALED), |
| + weak_factory_(this) {} |
| ~ThreadSafeForwarder() override {} |
| @@ -75,37 +84,103 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
| bool AcceptWithResponder( |
| Message* message, |
| - std::unique_ptr<MessageReceiver> response_receiver) override { |
| + std::unique_ptr<MessageReceiver> responder) override { |
| if (!message->associated_endpoint_handles()->empty()) { |
| // Please see comment for the DCHECK in the previous method. |
| DCHECK(associated_group_.GetController()); |
| message->SerializeAssociatedEndpointHandles( |
| associated_group_.GetController()); |
| } |
| - auto responder = |
| - base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver)); |
| + |
| + if (!message->has_flag(Message::kFlagIsSync)) { |
| + auto reply_forwarder = |
| + base::MakeUnique<ForwardToCallingThread>(std::move(responder)); |
| + if (task_runner_->RunsTasksOnCurrentThread()) { |
| + forward_with_responder_.Run(std::move(*message), |
|
yzshen1
2017/03/27 22:18:24
I think we would like to preserve order between ca
watk
2017/03/27 23:45:29
True. I had assumed there was a contract that TSIP
yzshen1
2017/03/28 00:06:48
What error notification? :)
watk
2017/03/28 00:14:35
Take a look at sync_method_unittest.cc line 853. T
|
| + std::move(reply_forwarder)); |
| + } else { |
| + task_runner_->PostTask( |
| + FROM_HERE, |
| + base::Bind(forward_with_responder_, base::Passed(message), |
| + base::Passed(&reply_forwarder))); |
| + } |
| + return true; |
| + } |
| + |
| + // Sync call on this thread. |
| + if (task_runner_->RunsTasksOnCurrentThread()) { |
| + forward_with_responder_.Run(std::move(*message), std::move(responder)); |
|
yzshen1
2017/03/27 22:18:24
+CC Ken:
At the beginning, I thought this direct
watk
2017/03/27 23:45:29
I did this because I wasn't sure what the alternat
yzshen1
2017/03/28 00:06:48
What I was trying to say is "I thought this doesn'
watk
2017/03/28 00:14:35
Oh, ok :)
|
| + return true; |
| + } |
| + |
| + // Sync call on a different thread. |
| + // TODO: allow the call to be async on the other thread. |
| + Message response; |
| + auto reply_signaler = |
| + base::MakeUnique<SyncReplySignaler>(&response, &sync_message_event_); |
| task_runner_->PostTask( |
| FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
| - base::Passed(&responder))); |
| + base::Passed(&reply_signaler))); |
| + bool response_received = false; |
| + // XXX: What does it mean to have reentrant sync calls that are all waiting |
| + // on the same sync_message_event_? Correct because only reentrant sync |
|
yzshen1
2017/03/27 22:18:25
I am not quite sure I understand the question. Wou
watk
2017/03/27 23:45:29
I think I wrote this before I had the sync_message
|
| + // calls are allowed? |
| + SyncEventWatcher watcher(&sync_message_event_, |
| + base::Bind(&AssignTrue, &response_received)); |
| + watcher.AllowWokenUpBySyncWatchOnSameThread(); |
|
yzshen1
2017/03/27 22:18:24
I don't think you need this line.
This is for the
watk
2017/03/27 23:45:29
Ah, gotcha.
|
| + // XXX: Check that this handles the case that sync_message_event_ is already |
| + // signaled. |
| + auto weak_self = weak_factory_.GetWeakPtr(); |
|
yzshen1
2017/03/27 22:18:24
WeakPtrFactory will force this object to be only u
watk
2017/03/27 23:45:29
SG, thanks
|
| + watcher.SyncWatch(&response_received); |
| + if (weak_self) { |
| + sync_message_event_.Reset(); |
| + if (response_received) |
| + ignore_result(responder->Accept(&response)); |
|
yzshen1
2017/03/27 22:18:24
The response may not correspond to the |responder|
watk
2017/03/27 23:45:28
Ah, yes, thanks. I hadn't even thought about non-n
|
| + } |
| + |
| return true; |
| } |
| + class SyncReplySignaler : public MessageReceiver { |
| + public: |
| + SyncReplySignaler(Message* sync_response_destination, |
| + base::WaitableEvent* sync_response_event) |
| + : sync_response_destination_(sync_response_destination), |
| + sync_response_event_(sync_response_event) {} |
| + |
| + private: |
| + bool Accept(Message* message) { |
| + *sync_response_destination_ = std::move(*message); |
| + sync_response_event_->Signal(); |
| + return true; |
| + } |
| + |
| + // XXX: What guarantees do we have about the lifetime of these? |
|
yzshen1
2017/03/27 22:18:25
The sync_response_event_ may be destroyed in the m
watk
2017/03/27 23:45:29
Yes, true. I'll make this refcounted then.
|
| + Message* sync_response_destination_; |
| + base::WaitableEvent* sync_response_event_; |
| + }; |
| + |
| class ForwardToCallingThread : public MessageReceiver { |
| public: |
| explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) |
| : responder_(std::move(responder)), |
| - caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) { |
| - } |
| + caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {} |
| private: |
| bool Accept(Message* message) { |
| // The current instance will be deleted when this method returns, so we |
| // have to relinquish the responder's ownership so it does not get |
| // deleted. |
| - caller_task_runner_->PostTask(FROM_HERE, |
| - base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, |
| - base::Passed(std::move(responder_)), |
| - base::Passed(std::move(*message)))); |
| + if (caller_task_runner_->RunsTasksOnCurrentThread()) { |
| + CallAcceptAndDeleteResponder(std::move(responder_), |
| + std::move(*message)); |
| + } else { |
| + caller_task_runner_->PostTask( |
| + FROM_HERE, |
| + base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, |
| + base::Passed(std::move(responder_)), |
| + base::Passed(std::move(*message)))); |
| + } |
| return true; |
| } |
| @@ -125,6 +200,10 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
| const ForwardMessageWithResponderCallback forward_with_responder_; |
| AssociatedGroup associated_group_; |
| + // An event used to signal that sync messages are available. |
| + base::WaitableEvent sync_message_event_; |
| + base::WeakPtrFactory<ThreadSafeForwarder> weak_factory_; |
| + |
| DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); |
| }; |