Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(511)

Unified Diff: mojo/public/cpp/bindings/thread_safe_interface_ptr.h

Issue 2770153003: mojo: Support sync calls through ThreadSafeInterfacePtr (Closed)
Patch Set: afds Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/public/cpp/bindings/tests/sync_method_unittest.cc ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/public/cpp/bindings/thread_safe_interface_ptr.h
diff --git a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
index 8b32b30b4fbefe1a5a07e1bb089b97e337d9be74..740687f37984f4864f9ddebf445e897bb346b8f1 100644
--- a/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
+++ b/mojo/public/cpp/bindings/thread_safe_interface_ptr.h
@@ -10,12 +10,26 @@
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
+#include "base/stl_util.h"
+#include "base/synchronization/waitable_event.h"
#include "base/task_runner.h"
#include "base/threading/thread_task_runner_handle.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/associated_interface_ptr.h"
#include "mojo/public/cpp/bindings/interface_ptr.h"
#include "mojo/public/cpp/bindings/message.h"
+#include "mojo/public/cpp/bindings/sync_call_restrictions.h"
+#include "mojo/public/cpp/bindings/sync_event_watcher.h"
+
+// ThreadSafeInterfacePtr wraps a non-thread-safe InterfacePtr and proxies
+// messages to it. Async calls are posted to the thread that the InteracePtr is
+// bound to, and the responses are posted back. Sync calls are dispatched
+// directly if the call is made on the thread that the wrapped InterfacePtr is
+// bound to, or posted otherwise. It's important to be aware that sync calls
+// block both the calling thread and the InterfacePtr thread. That means that
+// you cannot make sync calls through a ThreadSafeInterfacePtr if the
+// underlying InterfacePtr is bound to a thread that cannot block, like the IO
+// thread.
namespace mojo {
@@ -46,9 +60,15 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
task_runner_(task_runner),
forward_(forward),
forward_with_responder_(forward_with_responder),
- associated_group_(associated_group) {}
+ associated_group_(associated_group),
+ sync_calls_(new InProgressSyncCalls()) {}
- ~ThreadSafeForwarder() override {}
+ ~ThreadSafeForwarder() override {
+ // If there are ongoing sync calls signal their completion now.
+ base::AutoLock l(sync_calls_->lock);
+ for (const auto& pending_response : sync_calls_->pending_responses)
+ pending_response->event.Signal();
+ }
ProxyType& proxy() { return proxy_; }
@@ -75,34 +95,128 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
bool AcceptWithResponder(
Message* message,
- std::unique_ptr<MessageReceiver> response_receiver) override {
+ std::unique_ptr<MessageReceiver> responder) override {
if (!message->associated_endpoint_handles()->empty()) {
// Please see comment for the DCHECK in the previous method.
DCHECK(associated_group_.GetController());
message->SerializeAssociatedEndpointHandles(
associated_group_.GetController());
}
- auto responder =
- base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver));
+
+ // Async messages are always posted (even if |task_runner_| runs tasks on
+ // this thread) to guarantee that two async calls can't be reordered.
+ if (!message->has_flag(Message::kFlagIsSync)) {
+ auto reply_forwarder =
+ base::MakeUnique<ForwardToCallingThread>(std::move(responder));
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
+ base::Passed(&reply_forwarder)));
+ return true;
+ }
+
+ SyncCallRestrictions::AssertSyncCallAllowed();
+
+ // If the InterfacePtr is bound to this thread, dispatch it directly.
+ if (task_runner_->RunsTasksOnCurrentThread()) {
+ forward_with_responder_.Run(std::move(*message), std::move(responder));
+ return true;
+ }
+
+ // If the InterfacePtr is bound on another thread, post the call.
+ // TODO(yzshen, watk): We block both this thread and the InterfacePtr
+ // thread. Ideally only this thread would block.
+ auto response = make_scoped_refptr(new SyncResponseInfo());
+ auto response_signaler = base::MakeUnique<SyncResponseSignaler>(response);
task_runner_->PostTask(
FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
- base::Passed(&responder)));
+ base::Passed(&response_signaler)));
+
+ // Save the pending SyncResponseInfo so that if the sync call deletes
+ // |this|, we can signal the completion of the call to return from
+ // SyncWatch().
+ auto sync_calls = sync_calls_;
+ {
+ base::AutoLock l(sync_calls->lock);
+ sync_calls->pending_responses.push_back(response.get());
+ }
+
+ auto assign_true = [](bool* b) { *b = true; };
+ bool event_signaled = false;
+ SyncEventWatcher watcher(&response->event,
+ base::Bind(assign_true, &event_signaled));
+ watcher.SyncWatch(&event_signaled);
+
+ {
+ base::AutoLock l(sync_calls->lock);
+ base::Erase(sync_calls->pending_responses, response.get());
+ }
+
+ if (response->received)
+ ignore_result(responder->Accept(&response->message));
+
return true;
}
+ // Data that we need to share between the threads involved in a sync call.
+ struct SyncResponseInfo
+ : public base::RefCountedThreadSafe<SyncResponseInfo> {
+ Message message;
+ bool received = false;
+ base::WaitableEvent event{base::WaitableEvent::ResetPolicy::MANUAL,
+ base::WaitableEvent::InitialState::NOT_SIGNALED};
+
+ private:
+ friend class base::RefCountedThreadSafe<SyncResponseInfo>;
+ };
+
+ // A MessageReceiver that signals |response| when it either accepts the
+ // response message, or is destructed.
+ class SyncResponseSignaler : public MessageReceiver {
+ public:
+ explicit SyncResponseSignaler(scoped_refptr<SyncResponseInfo> response)
+ : response_(response) {}
+
+ ~SyncResponseSignaler() override {
+ // If Accept() was not called we must still notify the waiter that the
+ // sync call is finished.
+ if (response_)
+ response_->event.Signal();
+ }
+
+ bool Accept(Message* message) {
+ response_->message = std::move(*message);
+ response_->received = true;
+ response_->event.Signal();
+ response_ = nullptr;
+ return true;
+ }
+
+ private:
+ scoped_refptr<SyncResponseInfo> response_;
+ };
+
+ // A record of the pending sync responses for canceling pending sync calls
+ // when the owning ThreadSafeForwarder is destructed.
+ struct InProgressSyncCalls
+ : public base::RefCountedThreadSafe<InProgressSyncCalls> {
+ // |lock| protects access to |pending_responses|.
+ base::Lock lock;
+ std::vector<SyncResponseInfo*> pending_responses;
+ };
+
class ForwardToCallingThread : public MessageReceiver {
public:
explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder)
: responder_(std::move(responder)),
- caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
- }
+ caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
private:
bool Accept(Message* message) {
// The current instance will be deleted when this method returns, so we
// have to relinquish the responder's ownership so it does not get
// deleted.
- caller_task_runner_->PostTask(FROM_HERE,
+ caller_task_runner_->PostTask(
+ FROM_HERE,
base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder,
base::Passed(std::move(responder_)),
base::Passed(std::move(*message))));
@@ -124,6 +238,7 @@ class ThreadSafeForwarder : public MessageReceiverWithResponder {
const ForwardMessageCallback forward_;
const ForwardMessageWithResponderCallback forward_with_responder_;
AssociatedGroup associated_group_;
+ scoped_refptr<InProgressSyncCalls> sync_calls_;
DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder);
};
« no previous file with comments | « mojo/public/cpp/bindings/tests/sync_method_unittest.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698