Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(410)

Side by Side Diff: mojo/public/cpp/bindings/thread_safe_interface_ptr.h

Issue 2770153003: mojo: Support sync calls through ThreadSafeInterfacePtr (Closed)
Patch Set: afds Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/bindings/tests/sync_method_unittest.cc ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_
6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ 6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_
7 7
8 #include <memory> 8 #include <memory>
9 9
10 #include "base/macros.h" 10 #include "base/macros.h"
11 #include "base/memory/ptr_util.h" 11 #include "base/memory/ptr_util.h"
12 #include "base/memory/ref_counted.h" 12 #include "base/memory/ref_counted.h"
13 #include "base/stl_util.h"
14 #include "base/synchronization/waitable_event.h"
13 #include "base/task_runner.h" 15 #include "base/task_runner.h"
14 #include "base/threading/thread_task_runner_handle.h" 16 #include "base/threading/thread_task_runner_handle.h"
15 #include "mojo/public/cpp/bindings/associated_group.h" 17 #include "mojo/public/cpp/bindings/associated_group.h"
16 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" 18 #include "mojo/public/cpp/bindings/associated_interface_ptr.h"
17 #include "mojo/public/cpp/bindings/interface_ptr.h" 19 #include "mojo/public/cpp/bindings/interface_ptr.h"
18 #include "mojo/public/cpp/bindings/message.h" 20 #include "mojo/public/cpp/bindings/message.h"
21 #include "mojo/public/cpp/bindings/sync_call_restrictions.h"
22 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
23
24 // ThreadSafeInterfacePtr wraps a non-thread-safe InterfacePtr and proxies
25 // messages to it. Async calls are posted to the thread that the InteracePtr is
26 // bound to, and the responses are posted back. Sync calls are dispatched
27 // directly if the call is made on the thread that the wrapped InterfacePtr is
28 // bound to, or posted otherwise. It's important to be aware that sync calls
29 // block both the calling thread and the InterfacePtr thread. That means that
30 // you cannot make sync calls through a ThreadSafeInterfacePtr if the
31 // underlying InterfacePtr is bound to a thread that cannot block, like the IO
32 // thread.
19 33
20 namespace mojo { 34 namespace mojo {
21 35
22 // Instances of this class may be used from any thread to serialize |Interface| 36 // Instances of this class may be used from any thread to serialize |Interface|
23 // messages and forward them elsewhere. In general you should use one of the 37 // messages and forward them elsewhere. In general you should use one of the
24 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be 38 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be
25 // useful if you need/want to manually manage the lifetime of the underlying 39 // useful if you need/want to manually manage the lifetime of the underlying
26 // proxy object which will be used to ultimately send messages. 40 // proxy object which will be used to ultimately send messages.
27 template <typename Interface> 41 template <typename Interface>
28 class ThreadSafeForwarder : public MessageReceiverWithResponder { 42 class ThreadSafeForwarder : public MessageReceiverWithResponder {
(...skipping 10 matching lines...) Expand all
39 // if any, back to the thread which called the corresponding interface method. 53 // if any, back to the thread which called the corresponding interface method.
40 ThreadSafeForwarder( 54 ThreadSafeForwarder(
41 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 55 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
42 const ForwardMessageCallback& forward, 56 const ForwardMessageCallback& forward,
43 const ForwardMessageWithResponderCallback& forward_with_responder, 57 const ForwardMessageWithResponderCallback& forward_with_responder,
44 const AssociatedGroup& associated_group) 58 const AssociatedGroup& associated_group)
45 : proxy_(this), 59 : proxy_(this),
46 task_runner_(task_runner), 60 task_runner_(task_runner),
47 forward_(forward), 61 forward_(forward),
48 forward_with_responder_(forward_with_responder), 62 forward_with_responder_(forward_with_responder),
49 associated_group_(associated_group) {} 63 associated_group_(associated_group),
64 sync_calls_(new InProgressSyncCalls()) {}
50 65
51 ~ThreadSafeForwarder() override {} 66 ~ThreadSafeForwarder() override {
67 // If there are ongoing sync calls signal their completion now.
68 base::AutoLock l(sync_calls_->lock);
69 for (const auto& pending_response : sync_calls_->pending_responses)
70 pending_response->event.Signal();
71 }
52 72
53 ProxyType& proxy() { return proxy_; } 73 ProxyType& proxy() { return proxy_; }
54 74
55 private: 75 private:
56 // MessageReceiverWithResponder implementation: 76 // MessageReceiverWithResponder implementation:
57 bool Accept(Message* message) override { 77 bool Accept(Message* message) override {
58 if (!message->associated_endpoint_handles()->empty()) { 78 if (!message->associated_endpoint_handles()->empty()) {
59 // If this DCHECK fails, it is likely because: 79 // If this DCHECK fails, it is likely because:
60 // - This is a non-associated interface pointer setup using 80 // - This is a non-associated interface pointer setup using
61 // PtrWrapper::BindOnTaskRunner( 81 // PtrWrapper::BindOnTaskRunner(
62 // InterfacePtrInfo<InterfaceType> ptr_info); 82 // InterfacePtrInfo<InterfaceType> ptr_info);
63 // Please see the TODO in that method. 83 // Please see the TODO in that method.
64 // - This is an associated interface which hasn't been associated with a 84 // - This is an associated interface which hasn't been associated with a
65 // message pipe. In other words, the corresponding 85 // message pipe. In other words, the corresponding
66 // AssociatedInterfaceRequest hasn't been sent. 86 // AssociatedInterfaceRequest hasn't been sent.
67 DCHECK(associated_group_.GetController()); 87 DCHECK(associated_group_.GetController());
68 message->SerializeAssociatedEndpointHandles( 88 message->SerializeAssociatedEndpointHandles(
69 associated_group_.GetController()); 89 associated_group_.GetController());
70 } 90 }
71 task_runner_->PostTask(FROM_HERE, 91 task_runner_->PostTask(FROM_HERE,
72 base::Bind(forward_, base::Passed(message))); 92 base::Bind(forward_, base::Passed(message)));
73 return true; 93 return true;
74 } 94 }
75 95
76 bool AcceptWithResponder( 96 bool AcceptWithResponder(
77 Message* message, 97 Message* message,
78 std::unique_ptr<MessageReceiver> response_receiver) override { 98 std::unique_ptr<MessageReceiver> responder) override {
79 if (!message->associated_endpoint_handles()->empty()) { 99 if (!message->associated_endpoint_handles()->empty()) {
80 // Please see comment for the DCHECK in the previous method. 100 // Please see comment for the DCHECK in the previous method.
81 DCHECK(associated_group_.GetController()); 101 DCHECK(associated_group_.GetController());
82 message->SerializeAssociatedEndpointHandles( 102 message->SerializeAssociatedEndpointHandles(
83 associated_group_.GetController()); 103 associated_group_.GetController());
84 } 104 }
85 auto responder = 105
86 base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver)); 106 // Async messages are always posted (even if |task_runner_| runs tasks on
107 // this thread) to guarantee that two async calls can't be reordered.
108 if (!message->has_flag(Message::kFlagIsSync)) {
109 auto reply_forwarder =
110 base::MakeUnique<ForwardToCallingThread>(std::move(responder));
111 task_runner_->PostTask(
112 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
113 base::Passed(&reply_forwarder)));
114 return true;
115 }
116
117 SyncCallRestrictions::AssertSyncCallAllowed();
118
119 // If the InterfacePtr is bound to this thread, dispatch it directly.
120 if (task_runner_->RunsTasksOnCurrentThread()) {
121 forward_with_responder_.Run(std::move(*message), std::move(responder));
122 return true;
123 }
124
125 // If the InterfacePtr is bound on another thread, post the call.
126 // TODO(yzshen, watk): We block both this thread and the InterfacePtr
127 // thread. Ideally only this thread would block.
128 auto response = make_scoped_refptr(new SyncResponseInfo());
129 auto response_signaler = base::MakeUnique<SyncResponseSignaler>(response);
87 task_runner_->PostTask( 130 task_runner_->PostTask(
88 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), 131 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
89 base::Passed(&responder))); 132 base::Passed(&response_signaler)));
133
134 // Save the pending SyncResponseInfo so that if the sync call deletes
135 // |this|, we can signal the completion of the call to return from
136 // SyncWatch().
137 auto sync_calls = sync_calls_;
138 {
139 base::AutoLock l(sync_calls->lock);
140 sync_calls->pending_responses.push_back(response.get());
141 }
142
143 auto assign_true = [](bool* b) { *b = true; };
144 bool event_signaled = false;
145 SyncEventWatcher watcher(&response->event,
146 base::Bind(assign_true, &event_signaled));
147 watcher.SyncWatch(&event_signaled);
148
149 {
150 base::AutoLock l(sync_calls->lock);
151 base::Erase(sync_calls->pending_responses, response.get());
152 }
153
154 if (response->received)
155 ignore_result(responder->Accept(&response->message));
156
90 return true; 157 return true;
91 } 158 }
92 159
160 // Data that we need to share between the threads involved in a sync call.
161 struct SyncResponseInfo
162 : public base::RefCountedThreadSafe<SyncResponseInfo> {
163 Message message;
164 bool received = false;
165 base::WaitableEvent event{base::WaitableEvent::ResetPolicy::MANUAL,
166 base::WaitableEvent::InitialState::NOT_SIGNALED};
167
168 private:
169 friend class base::RefCountedThreadSafe<SyncResponseInfo>;
170 };
171
172 // A MessageReceiver that signals |response| when it either accepts the
173 // response message, or is destructed.
174 class SyncResponseSignaler : public MessageReceiver {
175 public:
176 explicit SyncResponseSignaler(scoped_refptr<SyncResponseInfo> response)
177 : response_(response) {}
178
179 ~SyncResponseSignaler() override {
180 // If Accept() was not called we must still notify the waiter that the
181 // sync call is finished.
182 if (response_)
183 response_->event.Signal();
184 }
185
186 bool Accept(Message* message) {
187 response_->message = std::move(*message);
188 response_->received = true;
189 response_->event.Signal();
190 response_ = nullptr;
191 return true;
192 }
193
194 private:
195 scoped_refptr<SyncResponseInfo> response_;
196 };
197
198 // A record of the pending sync responses for canceling pending sync calls
199 // when the owning ThreadSafeForwarder is destructed.
200 struct InProgressSyncCalls
201 : public base::RefCountedThreadSafe<InProgressSyncCalls> {
202 // |lock| protects access to |pending_responses|.
203 base::Lock lock;
204 std::vector<SyncResponseInfo*> pending_responses;
205 };
206
93 class ForwardToCallingThread : public MessageReceiver { 207 class ForwardToCallingThread : public MessageReceiver {
94 public: 208 public:
95 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) 209 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder)
96 : responder_(std::move(responder)), 210 : responder_(std::move(responder)),
97 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) { 211 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
98 }
99 212
100 private: 213 private:
101 bool Accept(Message* message) { 214 bool Accept(Message* message) {
102 // The current instance will be deleted when this method returns, so we 215 // The current instance will be deleted when this method returns, so we
103 // have to relinquish the responder's ownership so it does not get 216 // have to relinquish the responder's ownership so it does not get
104 // deleted. 217 // deleted.
105 caller_task_runner_->PostTask(FROM_HERE, 218 caller_task_runner_->PostTask(
219 FROM_HERE,
106 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, 220 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder,
107 base::Passed(std::move(responder_)), 221 base::Passed(std::move(responder_)),
108 base::Passed(std::move(*message)))); 222 base::Passed(std::move(*message))));
109 return true; 223 return true;
110 } 224 }
111 225
112 static void CallAcceptAndDeleteResponder( 226 static void CallAcceptAndDeleteResponder(
113 std::unique_ptr<MessageReceiver> responder, 227 std::unique_ptr<MessageReceiver> responder,
114 Message message) { 228 Message message) {
115 ignore_result(responder->Accept(&message)); 229 ignore_result(responder->Accept(&message));
116 } 230 }
117 231
118 std::unique_ptr<MessageReceiver> responder_; 232 std::unique_ptr<MessageReceiver> responder_;
119 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_; 233 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
120 }; 234 };
121 235
122 ProxyType proxy_; 236 ProxyType proxy_;
123 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 237 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
124 const ForwardMessageCallback forward_; 238 const ForwardMessageCallback forward_;
125 const ForwardMessageWithResponderCallback forward_with_responder_; 239 const ForwardMessageWithResponderCallback forward_with_responder_;
126 AssociatedGroup associated_group_; 240 AssociatedGroup associated_group_;
241 scoped_refptr<InProgressSyncCalls> sync_calls_;
127 242
128 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); 243 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder);
129 }; 244 };
130 245
131 template <typename InterfacePtrType> 246 template <typename InterfacePtrType>
132 class ThreadSafeInterfacePtrBase 247 class ThreadSafeInterfacePtrBase
133 : public base::RefCountedThreadSafe< 248 : public base::RefCountedThreadSafe<
134 ThreadSafeInterfacePtrBase<InterfacePtrType>> { 249 ThreadSafeInterfacePtrBase<InterfacePtrType>> {
135 public: 250 public:
136 using InterfaceType = typename InterfacePtrType::InterfaceType; 251 using InterfaceType = typename InterfacePtrType::InterfaceType;
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
270 using ThreadSafeAssociatedInterfacePtr = 385 using ThreadSafeAssociatedInterfacePtr =
271 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>; 386 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>;
272 387
273 template <typename Interface> 388 template <typename Interface>
274 using ThreadSafeInterfacePtr = 389 using ThreadSafeInterfacePtr =
275 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>; 390 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>;
276 391
277 } // namespace mojo 392 } // namespace mojo
278 393
279 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ 394 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/tests/sync_method_unittest.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698