Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Unified Diff: mojo/public/cpp/bindings/thread_safe_interface_ptr.h

Issue 2770153003: mojo: Support sync calls through ThreadSafeInterfacePtr (Closed)
Patch Set: Fix more tests Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/public/cpp/bindings/thread_safe_interface_ptr.h
diff --git a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
index 8b32b30b4fbefe1a5a07e1bb089b97e337d9be74..d39f969b1d635f403005fe6bd25729021361579f 100644
--- a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
+++ b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
@@ -16,8 +16,14 @@
#include "mojo/public/cpp/bindings/associated_interface_ptr.h"
#include "mojo/public/cpp/bindings/interface_ptr.h"
#include "mojo/public/cpp/bindings/message.h"
+#include "mojo/public/cpp/bindings/sync_event_watcher.h"
namespace mojo {
+namespace {
yzshen1 2017/03/27 22:18:25 It is pretty uncommon and not recommended to use a
watk 2017/03/27 23:45:28 Oops, forgot this was a header.
+void AssignTrue(bool* b) {
+ *b = true;
+}
+} // namespace
// Instances of this class may be used from any thread to serialize |Interface|
// messages and forward them elsewhere. In general you should use one of the
@@ -46,7 +52,10 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
task_runner_(task_runner),
forward_(forward),
forward_with_responder_(forward_with_responder),
- associated_group_(associated_group) {}
+ associated_group_(associated_group),
+ sync_message_event_(base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED),
+ weak_factory_(this) {}
~ThreadSafeForwarder() override {}
@@ -75,37 +84,103 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
bool AcceptWithResponder(
Message* message,
- std::unique_ptr<MessageReceiver> response_receiver) override {
+ std::unique_ptr<MessageReceiver> responder) override {
if (!message->associated_endpoint_handles()->empty()) {
// Please see comment for the DCHECK in the previous method.
DCHECK(associated_group_.GetController());
message->SerializeAssociatedEndpointHandles(
associated_group_.GetController());
}
- auto responder =
- base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver));
+
+ if (!message->has_flag(Message::kFlagIsSync)) {
+ auto reply_forwarder =
+ base::MakeUnique<ForwardToCallingThread>(std::move(responder));
+ if (task_runner_->RunsTasksOnCurrentThread()) {
+ forward_with_responder_.Run(std::move(*message),
yzshen1 2017/03/27 22:18:24 I think we would like to preserve order between ca
watk 2017/03/27 23:45:29 True. I had assumed there was a contract that TSIP
yzshen1 2017/03/28 00:06:48 What error notification? :)
watk 2017/03/28 00:14:35 Take a look at sync_method_unittest.cc line 853. T
+ std::move(reply_forwarder));
+ } else {
+ task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(forward_with_responder_, base::Passed(message),
+ base::Passed(&reply_forwarder)));
+ }
+ return true;
+ }
+
+ // Sync call on this thread.
+ if (task_runner_->RunsTasksOnCurrentThread()) {
+ forward_with_responder_.Run(std::move(*message), std::move(responder));
yzshen1 2017/03/27 22:18:24 +CC Ken: At the beginning, I thought this direct
watk 2017/03/27 23:45:29 I did this because I wasn't sure what the alternat
yzshen1 2017/03/28 00:06:48 What I was trying to say is "I thought this doesn'
watk 2017/03/28 00:14:35 Oh, ok :)
+ return true;
+ }
+
+ // Sync call on a different thread.
+ // TODO: allow the call to be async on the other thread.
+ Message response;
+ auto reply_signaler =
+ base::MakeUnique<SyncReplySignaler>(&response, &sync_message_event_);
task_runner_->PostTask(
FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
- base::Passed(&responder)));
+ base::Passed(&reply_signaler)));
+ bool response_received = false;
+ // XXX: What does it mean to have reentrant sync calls that are all waiting
+ // on the same sync_message_event_? Correct because only reentrant sync
yzshen1 2017/03/27 22:18:25 I am not quite sure I understand the question. Wou
watk 2017/03/27 23:45:29 I think I wrote this before I had the sync_message
+ // calls are allowed?
+ SyncEventWatcher watcher(&sync_message_event_,
+ base::Bind(&AssignTrue, &response_received));
+ watcher.AllowWokenUpBySyncWatchOnSameThread();
yzshen1 2017/03/27 22:18:24 I don't think you need this line. This is for the
watk 2017/03/27 23:45:29 Ah, gotcha.
+ // XXX: Check that this handles the case that sync_message_event_ is already
+ // signaled.
+ auto weak_self = weak_factory_.GetWeakPtr();
yzshen1 2017/03/27 22:18:24 WeakPtrFactory will force this object to be only u
watk 2017/03/27 23:45:29 SG, thanks
+ watcher.SyncWatch(&response_received);
+ if (weak_self) {
+ sync_message_event_.Reset();
+ if (response_received)
+ ignore_result(responder->Accept(&response));
yzshen1 2017/03/27 22:18:24 The response may not correspond to the |responder|
watk 2017/03/27 23:45:28 Ah, yes, thanks. I hadn't even thought about non-n
+ }
+
return true;
}
+ class SyncReplySignaler : public MessageReceiver {
+ public:
+ SyncReplySignaler(Message* sync_response_destination,
+ base::WaitableEvent* sync_response_event)
+ : sync_response_destination_(sync_response_destination),
+ sync_response_event_(sync_response_event) {}
+
+ private:
+ bool Accept(Message* message) {
+ *sync_response_destination_ = std::move(*message);
+ sync_response_event_->Signal();
+ return true;
+ }
+
+ // XXX: What guarantees do we have about the lifetime of these?
yzshen1 2017/03/27 22:18:25 The sync_response_event_ may be destroyed in the m
watk 2017/03/27 23:45:29 Yes, true. I'll make this refcounted then.
+ Message* sync_response_destination_;
+ base::WaitableEvent* sync_response_event_;
+ };
+
class ForwardToCallingThread : public MessageReceiver {
public:
explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder)
: responder_(std::move(responder)),
- caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
- }
+ caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
private:
bool Accept(Message* message) {
// The current instance will be deleted when this method returns, so we
// have to relinquish the responder's ownership so it does not get
// deleted.
- caller_task_runner_->PostTask(FROM_HERE,
- base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder,
- base::Passed(std::move(responder_)),
- base::Passed(std::move(*message))));
+ if (caller_task_runner_->RunsTasksOnCurrentThread()) {
+ CallAcceptAndDeleteResponder(std::move(responder_),
+ std::move(*message));
+ } else {
+ caller_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder,
+ base::Passed(std::move(responder_)),
+ base::Passed(std::move(*message))));
+ }
return true;
}
@@ -125,6 +200,10 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
const ForwardMessageWithResponderCallback forward_with_responder_;
AssociatedGroup associated_group_;
+ // An event used to signal that sync messages are available.
+ base::WaitableEvent sync_message_event_;
+ base::WeakPtrFactory<ThreadSafeForwarder> weak_factory_;
+
DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder);
};

Powered by Google App Engine
This is Rietveld 408576698