Index: mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
diff --git a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
index 8b32b30b4fbefe1a5a07e1bb089b97e337d9be74..d39f969b1d635f403005fe6bd25729021361579f 100644 |
--- a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
+++ b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h |
@@ -16,8 +16,14 @@ |
#include "mojo/public/cpp/bindings/associated_interface_ptr.h" |
#include "mojo/public/cpp/bindings/interface_ptr.h" |
#include "mojo/public/cpp/bindings/message.h" |
+#include "mojo/public/cpp/bindings/sync_event_watcher.h" |
namespace mojo { |
+namespace { |
yzshen1
2017/03/27 22:18:25
It is pretty uncommon and not recommended to use a
watk
2017/03/27 23:45:28
Oops, forgot this was a header.
|
+void AssignTrue(bool* b) { |
+ *b = true; |
+} |
+} // namespace |
// Instances of this class may be used from any thread to serialize |Interface| |
// messages and forward them elsewhere. In general you should use one of the |
@@ -46,7 +52,10 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
task_runner_(task_runner), |
forward_(forward), |
forward_with_responder_(forward_with_responder), |
- associated_group_(associated_group) {} |
+ associated_group_(associated_group), |
+ sync_message_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
+ base::WaitableEvent::InitialState::NOT_SIGNALED), |
+ weak_factory_(this) {} |
~ThreadSafeForwarder() override {} |
@@ -75,37 +84,103 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
bool AcceptWithResponder( |
Message* message, |
- std::unique_ptr<MessageReceiver> response_receiver) override { |
+ std::unique_ptr<MessageReceiver> responder) override { |
if (!message->associated_endpoint_handles()->empty()) { |
// Please see comment for the DCHECK in the previous method. |
DCHECK(associated_group_.GetController()); |
message->SerializeAssociatedEndpointHandles( |
associated_group_.GetController()); |
} |
- auto responder = |
- base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver)); |
+ |
+ if (!message->has_flag(Message::kFlagIsSync)) { |
+ auto reply_forwarder = |
+ base::MakeUnique<ForwardToCallingThread>(std::move(responder)); |
+ if (task_runner_->RunsTasksOnCurrentThread()) { |
+ forward_with_responder_.Run(std::move(*message), |
yzshen1
2017/03/27 22:18:24
I think we would like to preserve order between ca
watk
2017/03/27 23:45:29
True. I had assumed there was a contract that TSIP
yzshen1
2017/03/28 00:06:48
What error notification? :)
watk
2017/03/28 00:14:35
Take a look at sync_method_unittest.cc line 853. T
|
+ std::move(reply_forwarder)); |
+ } else { |
+ task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind(forward_with_responder_, base::Passed(message), |
+ base::Passed(&reply_forwarder))); |
+ } |
+ return true; |
+ } |
+ |
+ // Sync call on this thread. |
+ if (task_runner_->RunsTasksOnCurrentThread()) { |
+ forward_with_responder_.Run(std::move(*message), std::move(responder)); |
yzshen1
2017/03/27 22:18:24
+CC Ken:
At the beginning, I thought this direct
watk
2017/03/27 23:45:29
I did this because I wasn't sure what the alternat
yzshen1
2017/03/28 00:06:48
What I was trying to say is "I thought this doesn'
watk
2017/03/28 00:14:35
Oh, ok :)
|
+ return true; |
+ } |
+ |
+ // Sync call on a different thread. |
+ // TODO: allow the call to be async on the other thread. |
+ Message response; |
+ auto reply_signaler = |
+ base::MakeUnique<SyncReplySignaler>(&response, &sync_message_event_); |
task_runner_->PostTask( |
FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
- base::Passed(&responder))); |
+ base::Passed(&reply_signaler))); |
+ bool response_received = false; |
+ // XXX: What does it mean to have reentrant sync calls that are all waiting |
+ // on the same sync_message_event_? Correct because only reentrant sync |
yzshen1
2017/03/27 22:18:25
I am not quite sure I understand the question. Wou
watk
2017/03/27 23:45:29
I think I wrote this before I had the sync_message
|
+ // calls are allowed? |
+ SyncEventWatcher watcher(&sync_message_event_, |
+ base::Bind(&AssignTrue, &response_received)); |
+ watcher.AllowWokenUpBySyncWatchOnSameThread(); |
yzshen1
2017/03/27 22:18:24
I don't think you need this line.
This is for the
watk
2017/03/27 23:45:29
Ah, gotcha.
|
+ // XXX: Check that this handles the case that sync_message_event_ is already |
+ // signaled. |
+ auto weak_self = weak_factory_.GetWeakPtr(); |
yzshen1
2017/03/27 22:18:24
WeakPtrFactory will force this object to be only u
watk
2017/03/27 23:45:29
SG, thanks
|
+ watcher.SyncWatch(&response_received); |
+ if (weak_self) { |
+ sync_message_event_.Reset(); |
+ if (response_received) |
+ ignore_result(responder->Accept(&response)); |
yzshen1
2017/03/27 22:18:24
The response may not correspond to the |responder|
watk
2017/03/27 23:45:28
Ah, yes, thanks. I hadn't even thought about non-n
|
+ } |
+ |
return true; |
} |
+ class SyncReplySignaler : public MessageReceiver { |
+ public: |
+ SyncReplySignaler(Message* sync_response_destination, |
+ base::WaitableEvent* sync_response_event) |
+ : sync_response_destination_(sync_response_destination), |
+ sync_response_event_(sync_response_event) {} |
+ |
+ private: |
+ bool Accept(Message* message) { |
+ *sync_response_destination_ = std::move(*message); |
+ sync_response_event_->Signal(); |
+ return true; |
+ } |
+ |
+ // XXX: What guarantees do we have about the lifetime of these? |
yzshen1
2017/03/27 22:18:25
The sync_response_event_ may be destroyed in the m
watk
2017/03/27 23:45:29
Yes, true. I'll make this refcounted then.
|
+ Message* sync_response_destination_; |
+ base::WaitableEvent* sync_response_event_; |
+ }; |
+ |
class ForwardToCallingThread : public MessageReceiver { |
public: |
explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) |
: responder_(std::move(responder)), |
- caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) { |
- } |
+ caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {} |
private: |
bool Accept(Message* message) { |
// The current instance will be deleted when this method returns, so we |
// have to relinquish the responder's ownership so it does not get |
// deleted. |
- caller_task_runner_->PostTask(FROM_HERE, |
- base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, |
- base::Passed(std::move(responder_)), |
- base::Passed(std::move(*message)))); |
+ if (caller_task_runner_->RunsTasksOnCurrentThread()) { |
+ CallAcceptAndDeleteResponder(std::move(responder_), |
+ std::move(*message)); |
+ } else { |
+ caller_task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, |
+ base::Passed(std::move(responder_)), |
+ base::Passed(std::move(*message)))); |
+ } |
return true; |
} |
@@ -125,6 +200,10 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder { |
const ForwardMessageWithResponderCallback forward_with_responder_; |
AssociatedGroup associated_group_; |
+ // An event used to signal that sync messages are available. |
+ base::WaitableEvent sync_message_event_; |
+ base::WeakPtrFactory<ThreadSafeForwarder> weak_factory_; |
+ |
DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); |
}; |