OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
7 | 7 |
8 #include <memory> | 8 #include <memory> |
9 | 9 |
10 #include "base/macros.h" | 10 #include "base/macros.h" |
11 #include "base/memory/ptr_util.h" | 11 #include "base/memory/ptr_util.h" |
12 #include "base/memory/ref_counted.h" | 12 #include "base/memory/ref_counted.h" |
| 13 #include "base/stl_util.h" |
| 14 #include "base/synchronization/waitable_event.h" |
13 #include "base/task_runner.h" | 15 #include "base/task_runner.h" |
14 #include "base/threading/thread_task_runner_handle.h" | 16 #include "base/threading/thread_task_runner_handle.h" |
15 #include "mojo/public/cpp/bindings/associated_group.h" | 17 #include "mojo/public/cpp/bindings/associated_group.h" |
16 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" | 18 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" |
17 #include "mojo/public/cpp/bindings/interface_ptr.h" | 19 #include "mojo/public/cpp/bindings/interface_ptr.h" |
18 #include "mojo/public/cpp/bindings/message.h" | 20 #include "mojo/public/cpp/bindings/message.h" |
| 21 #include "mojo/public/cpp/bindings/sync_call_restrictions.h" |
| 22 #include "mojo/public/cpp/bindings/sync_event_watcher.h" |
| 23 |
| 24 // ThreadSafeInterfacePtr wraps a non-thread-safe InterfacePtr and proxies |
| 25 // messages to it. Async calls are posted to the thread that the InteracePtr is |
| 26 // bound to, and the responses are posted back. Sync calls are dispatched |
| 27 // directly if the call is made on the thread that the wrapped InterfacePtr is |
| 28 // bound to, or posted otherwise. It's important to be aware that sync calls |
| 29 // block both the calling thread and the InterfacePtr thread. That means that |
| 30 // you cannot make sync calls through a ThreadSafeInterfacePtr if the |
| 31 // underlying InterfacePtr is bound to a thread that cannot block, like the IO |
| 32 // thread. |
19 | 33 |
20 namespace mojo { | 34 namespace mojo { |
21 | 35 |
22 // Instances of this class may be used from any thread to serialize |Interface| | 36 // Instances of this class may be used from any thread to serialize |Interface| |
23 // messages and forward them elsewhere. In general you should use one of the | 37 // messages and forward them elsewhere. In general you should use one of the |
24 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be | 38 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be |
25 // useful if you need/want to manually manage the lifetime of the underlying | 39 // useful if you need/want to manually manage the lifetime of the underlying |
26 // proxy object which will be used to ultimately send messages. | 40 // proxy object which will be used to ultimately send messages. |
27 template <typename Interface> | 41 template <typename Interface> |
28 class ThreadSafeForwarder : public MessageReceiverWithResponder { | 42 class ThreadSafeForwarder : public MessageReceiverWithResponder { |
(...skipping 10 matching lines...) Expand all Loading... |
39 // if any, back to the thread which called the corresponding interface method. | 53 // if any, back to the thread which called the corresponding interface method. |
40 ThreadSafeForwarder( | 54 ThreadSafeForwarder( |
41 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, | 55 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, |
42 const ForwardMessageCallback& forward, | 56 const ForwardMessageCallback& forward, |
43 const ForwardMessageWithResponderCallback& forward_with_responder, | 57 const ForwardMessageWithResponderCallback& forward_with_responder, |
44 const AssociatedGroup& associated_group) | 58 const AssociatedGroup& associated_group) |
45 : proxy_(this), | 59 : proxy_(this), |
46 task_runner_(task_runner), | 60 task_runner_(task_runner), |
47 forward_(forward), | 61 forward_(forward), |
48 forward_with_responder_(forward_with_responder), | 62 forward_with_responder_(forward_with_responder), |
49 associated_group_(associated_group) {} | 63 associated_group_(associated_group), |
| 64 sync_calls_(new InProgressSyncCalls()) {} |
50 | 65 |
51 ~ThreadSafeForwarder() override {} | 66 ~ThreadSafeForwarder() override { |
| 67 // If there are ongoing sync calls signal their completion now. |
| 68 base::AutoLock l(sync_calls_->lock); |
| 69 for (const auto& pending_response : sync_calls_->pending_responses) |
| 70 pending_response->event.Signal(); |
| 71 } |
52 | 72 |
53 ProxyType& proxy() { return proxy_; } | 73 ProxyType& proxy() { return proxy_; } |
54 | 74 |
55 private: | 75 private: |
56 // MessageReceiverWithResponder implementation: | 76 // MessageReceiverWithResponder implementation: |
57 bool Accept(Message* message) override { | 77 bool Accept(Message* message) override { |
58 if (!message->associated_endpoint_handles()->empty()) { | 78 if (!message->associated_endpoint_handles()->empty()) { |
59 // If this DCHECK fails, it is likely because: | 79 // If this DCHECK fails, it is likely because: |
60 // - This is a non-associated interface pointer setup using | 80 // - This is a non-associated interface pointer setup using |
61 // PtrWrapper::BindOnTaskRunner( | 81 // PtrWrapper::BindOnTaskRunner( |
62 // InterfacePtrInfo<InterfaceType> ptr_info); | 82 // InterfacePtrInfo<InterfaceType> ptr_info); |
63 // Please see the TODO in that method. | 83 // Please see the TODO in that method. |
64 // - This is an associated interface which hasn't been associated with a | 84 // - This is an associated interface which hasn't been associated with a |
65 // message pipe. In other words, the corresponding | 85 // message pipe. In other words, the corresponding |
66 // AssociatedInterfaceRequest hasn't been sent. | 86 // AssociatedInterfaceRequest hasn't been sent. |
67 DCHECK(associated_group_.GetController()); | 87 DCHECK(associated_group_.GetController()); |
68 message->SerializeAssociatedEndpointHandles( | 88 message->SerializeAssociatedEndpointHandles( |
69 associated_group_.GetController()); | 89 associated_group_.GetController()); |
70 } | 90 } |
71 task_runner_->PostTask(FROM_HERE, | 91 task_runner_->PostTask(FROM_HERE, |
72 base::Bind(forward_, base::Passed(message))); | 92 base::Bind(forward_, base::Passed(message))); |
73 return true; | 93 return true; |
74 } | 94 } |
75 | 95 |
76 bool AcceptWithResponder( | 96 bool AcceptWithResponder( |
77 Message* message, | 97 Message* message, |
78 std::unique_ptr<MessageReceiver> response_receiver) override { | 98 std::unique_ptr<MessageReceiver> responder) override { |
79 if (!message->associated_endpoint_handles()->empty()) { | 99 if (!message->associated_endpoint_handles()->empty()) { |
80 // Please see comment for the DCHECK in the previous method. | 100 // Please see comment for the DCHECK in the previous method. |
81 DCHECK(associated_group_.GetController()); | 101 DCHECK(associated_group_.GetController()); |
82 message->SerializeAssociatedEndpointHandles( | 102 message->SerializeAssociatedEndpointHandles( |
83 associated_group_.GetController()); | 103 associated_group_.GetController()); |
84 } | 104 } |
85 auto responder = | 105 |
86 base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver)); | 106 // Async messages are always posted (even if |task_runner_| runs tasks on |
| 107 // this thread) to guarantee that two async calls can't be reordered. |
| 108 if (!message->has_flag(Message::kFlagIsSync)) { |
| 109 auto reply_forwarder = |
| 110 base::MakeUnique<ForwardToCallingThread>(std::move(responder)); |
| 111 task_runner_->PostTask( |
| 112 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
| 113 base::Passed(&reply_forwarder))); |
| 114 return true; |
| 115 } |
| 116 |
| 117 SyncCallRestrictions::AssertSyncCallAllowed(); |
| 118 |
| 119 // If the InterfacePtr is bound to this thread, dispatch it directly. |
| 120 if (task_runner_->RunsTasksOnCurrentThread()) { |
| 121 forward_with_responder_.Run(std::move(*message), std::move(responder)); |
| 122 return true; |
| 123 } |
| 124 |
| 125 // If the InterfacePtr is bound on another thread, post the call. |
| 126 // TODO(yzshen, watk): We block both this thread and the InterfacePtr |
| 127 // thread. Ideally only this thread would block. |
| 128 auto response = make_scoped_refptr(new SyncResponseInfo()); |
| 129 auto response_signaler = base::MakeUnique<SyncResponseSignaler>(response); |
87 task_runner_->PostTask( | 130 task_runner_->PostTask( |
88 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), | 131 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
89 base::Passed(&responder))); | 132 base::Passed(&response_signaler))); |
| 133 |
| 134 // Save the pending SyncResponseInfo so that if the sync call deletes |
| 135 // |this|, we can signal the completion of the call to return from |
| 136 // SyncWatch(). |
| 137 auto sync_calls = sync_calls_; |
| 138 { |
| 139 base::AutoLock l(sync_calls->lock); |
| 140 sync_calls->pending_responses.push_back(response.get()); |
| 141 } |
| 142 |
| 143 auto assign_true = [](bool* b) { *b = true; }; |
| 144 bool event_signaled = false; |
| 145 SyncEventWatcher watcher(&response->event, |
| 146 base::Bind(assign_true, &event_signaled)); |
| 147 watcher.SyncWatch(&event_signaled); |
| 148 |
| 149 { |
| 150 base::AutoLock l(sync_calls->lock); |
| 151 base::Erase(sync_calls->pending_responses, response.get()); |
| 152 } |
| 153 |
| 154 if (response->received) |
| 155 ignore_result(responder->Accept(&response->message)); |
| 156 |
90 return true; | 157 return true; |
91 } | 158 } |
92 | 159 |
| 160 // Data that we need to share between the threads involved in a sync call. |
| 161 struct SyncResponseInfo |
| 162 : public base::RefCountedThreadSafe<SyncResponseInfo> { |
| 163 Message message; |
| 164 bool received = false; |
| 165 base::WaitableEvent event{base::WaitableEvent::ResetPolicy::MANUAL, |
| 166 base::WaitableEvent::InitialState::NOT_SIGNALED}; |
| 167 |
| 168 private: |
| 169 friend class base::RefCountedThreadSafe<SyncResponseInfo>; |
| 170 }; |
| 171 |
| 172 // A MessageReceiver that signals |response| when it either accepts the |
| 173 // response message, or is destructed. |
| 174 class SyncResponseSignaler : public MessageReceiver { |
| 175 public: |
| 176 explicit SyncResponseSignaler(scoped_refptr<SyncResponseInfo> response) |
| 177 : response_(response) {} |
| 178 |
| 179 ~SyncResponseSignaler() override { |
| 180 // If Accept() was not called we must still notify the waiter that the |
| 181 // sync call is finished. |
| 182 if (response_) |
| 183 response_->event.Signal(); |
| 184 } |
| 185 |
| 186 bool Accept(Message* message) { |
| 187 response_->message = std::move(*message); |
| 188 response_->received = true; |
| 189 response_->event.Signal(); |
| 190 response_ = nullptr; |
| 191 return true; |
| 192 } |
| 193 |
| 194 private: |
| 195 scoped_refptr<SyncResponseInfo> response_; |
| 196 }; |
| 197 |
| 198 // A record of the pending sync responses for canceling pending sync calls |
| 199 // when the owning ThreadSafeForwarder is destructed. |
| 200 struct InProgressSyncCalls |
| 201 : public base::RefCountedThreadSafe<InProgressSyncCalls> { |
| 202 // |lock| protects access to |pending_responses|. |
| 203 base::Lock lock; |
| 204 std::vector<SyncResponseInfo*> pending_responses; |
| 205 }; |
| 206 |
93 class ForwardToCallingThread : public MessageReceiver { | 207 class ForwardToCallingThread : public MessageReceiver { |
94 public: | 208 public: |
95 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) | 209 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) |
96 : responder_(std::move(responder)), | 210 : responder_(std::move(responder)), |
97 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) { | 211 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {} |
98 } | |
99 | 212 |
100 private: | 213 private: |
101 bool Accept(Message* message) { | 214 bool Accept(Message* message) { |
102 // The current instance will be deleted when this method returns, so we | 215 // The current instance will be deleted when this method returns, so we |
103 // have to relinquish the responder's ownership so it does not get | 216 // have to relinquish the responder's ownership so it does not get |
104 // deleted. | 217 // deleted. |
105 caller_task_runner_->PostTask(FROM_HERE, | 218 caller_task_runner_->PostTask( |
| 219 FROM_HERE, |
106 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, | 220 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, |
107 base::Passed(std::move(responder_)), | 221 base::Passed(std::move(responder_)), |
108 base::Passed(std::move(*message)))); | 222 base::Passed(std::move(*message)))); |
109 return true; | 223 return true; |
110 } | 224 } |
111 | 225 |
112 static void CallAcceptAndDeleteResponder( | 226 static void CallAcceptAndDeleteResponder( |
113 std::unique_ptr<MessageReceiver> responder, | 227 std::unique_ptr<MessageReceiver> responder, |
114 Message message) { | 228 Message message) { |
115 ignore_result(responder->Accept(&message)); | 229 ignore_result(responder->Accept(&message)); |
116 } | 230 } |
117 | 231 |
118 std::unique_ptr<MessageReceiver> responder_; | 232 std::unique_ptr<MessageReceiver> responder_; |
119 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_; | 233 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_; |
120 }; | 234 }; |
121 | 235 |
122 ProxyType proxy_; | 236 ProxyType proxy_; |
123 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 237 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
124 const ForwardMessageCallback forward_; | 238 const ForwardMessageCallback forward_; |
125 const ForwardMessageWithResponderCallback forward_with_responder_; | 239 const ForwardMessageWithResponderCallback forward_with_responder_; |
126 AssociatedGroup associated_group_; | 240 AssociatedGroup associated_group_; |
| 241 scoped_refptr<InProgressSyncCalls> sync_calls_; |
127 | 242 |
128 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); | 243 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); |
129 }; | 244 }; |
130 | 245 |
131 template <typename InterfacePtrType> | 246 template <typename InterfacePtrType> |
132 class ThreadSafeInterfacePtrBase | 247 class ThreadSafeInterfacePtrBase |
133 : public base::RefCountedThreadSafe< | 248 : public base::RefCountedThreadSafe< |
134 ThreadSafeInterfacePtrBase<InterfacePtrType>> { | 249 ThreadSafeInterfacePtrBase<InterfacePtrType>> { |
135 public: | 250 public: |
136 using InterfaceType = typename InterfacePtrType::InterfaceType; | 251 using InterfaceType = typename InterfacePtrType::InterfaceType; |
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
270 using ThreadSafeAssociatedInterfacePtr = | 385 using ThreadSafeAssociatedInterfacePtr = |
271 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>; | 386 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>; |
272 | 387 |
273 template <typename Interface> | 388 template <typename Interface> |
274 using ThreadSafeInterfacePtr = | 389 using ThreadSafeInterfacePtr = |
275 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>; | 390 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>; |
276 | 391 |
277 } // namespace mojo | 392 } // namespace mojo |
278 | 393 |
279 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 394 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
OLD | NEW |