Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
| 6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
| 7 | 7 |
| 8 #include <memory> | 8 #include <memory> |
| 9 | 9 |
| 10 #include "base/macros.h" | 10 #include "base/macros.h" |
| 11 #include "base/memory/ptr_util.h" | 11 #include "base/memory/ptr_util.h" |
| 12 #include "base/memory/ref_counted.h" | 12 #include "base/memory/ref_counted.h" |
| 13 #include "base/stl_util.h" | |
| 14 #include "base/synchronization/waitable_event.h" | |
| 13 #include "base/task_runner.h" | 15 #include "base/task_runner.h" |
| 14 #include "base/threading/thread_task_runner_handle.h" | 16 #include "base/threading/thread_task_runner_handle.h" |
| 15 #include "mojo/public/cpp/bindings/associated_group.h" | 17 #include "mojo/public/cpp/bindings/associated_group.h" |
| 16 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" | 18 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" |
| 17 #include "mojo/public/cpp/bindings/interface_ptr.h" | 19 #include "mojo/public/cpp/bindings/interface_ptr.h" |
| 18 #include "mojo/public/cpp/bindings/message.h" | 20 #include "mojo/public/cpp/bindings/message.h" |
| 21 #include "mojo/public/cpp/bindings/sync_call_restrictions.h" | |
| 22 #include "mojo/public/cpp/bindings/sync_event_watcher.h" | |
| 19 | 23 |
| 20 namespace mojo { | 24 namespace mojo { |
| 21 | 25 |
| 22 // Instances of this class may be used from any thread to serialize |Interface| | 26 // Instances of this class may be used from any thread to serialize |Interface| |
| 23 // messages and forward them elsewhere. In general you should use one of the | 27 // messages and forward them elsewhere. In general you should use one of the |
| 24 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be | 28 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be |
| 25 // useful if you need/want to manually manage the lifetime of the underlying | 29 // useful if you need/want to manually manage the lifetime of the underlying |
| 26 // proxy object which will be used to ultimately send messages. | 30 // proxy object which will be used to ultimately send messages. |
| 27 template <typename Interface> | 31 template <typename Interface> |
| 28 class ThreadSafeForwarder : public MessageReceiverWithResponder { | 32 class ThreadSafeForwarder : public MessageReceiverWithResponder { |
| (...skipping 12 matching lines...) Expand all Loading... | |
| 41 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, | 45 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, |
| 42 const ForwardMessageCallback& forward, | 46 const ForwardMessageCallback& forward, |
| 43 const ForwardMessageWithResponderCallback& forward_with_responder, | 47 const ForwardMessageWithResponderCallback& forward_with_responder, |
| 44 const AssociatedGroup& associated_group) | 48 const AssociatedGroup& associated_group) |
| 45 : proxy_(this), | 49 : proxy_(this), |
| 46 task_runner_(task_runner), | 50 task_runner_(task_runner), |
| 47 forward_(forward), | 51 forward_(forward), |
| 48 forward_with_responder_(forward_with_responder), | 52 forward_with_responder_(forward_with_responder), |
| 49 associated_group_(associated_group) {} | 53 associated_group_(associated_group) {} |
| 50 | 54 |
| 51 ~ThreadSafeForwarder() override {} | 55 ~ThreadSafeForwarder() override { |
| 56 // If there's an ongoing sync call (which deleted |this|), signal its | |
| 57 // completion now. | |
| 58 { | |
| 59 base::AutoLock l(lock_); | |
| 60 if (!pending_sync_responses_.empty()) | |
| 61 pending_sync_responses_.front()->event.Signal(); | |
|
yzshen1
2017/03/29 23:16:47
Shouldn't we signal all the events?
watk
2017/03/30 02:26:05
Done.
| |
| 62 } | |
| 63 } | |
| 52 | 64 |
| 53 ProxyType& proxy() { return proxy_; } | 65 ProxyType& proxy() { return proxy_; } |
| 54 | 66 |
| 55 private: | 67 private: |
| 56 // MessageReceiverWithResponder implementation: | 68 // MessageReceiverWithResponder implementation: |
| 57 bool Accept(Message* message) override { | 69 bool Accept(Message* message) override { |
| 58 if (!message->associated_endpoint_handles()->empty()) { | 70 if (!message->associated_endpoint_handles()->empty()) { |
| 59 // If this DCHECK fails, it is likely because: | 71 // If this DCHECK fails, it is likely because: |
| 60 // - This is a non-associated interface pointer setup using | 72 // - This is a non-associated interface pointer setup using |
| 61 // PtrWrapper::BindOnTaskRunner( | 73 // PtrWrapper::BindOnTaskRunner( |
| 62 // InterfacePtrInfo<InterfaceType> ptr_info); | 74 // InterfacePtrInfo<InterfaceType> ptr_info); |
| 63 // Please see the TODO in that method. | 75 // Please see the TODO in that method. |
| 64 // - This is an associated interface which hasn't been associated with a | 76 // - This is an associated interface which hasn't been associated with a |
| 65 // message pipe. In other words, the corresponding | 77 // message pipe. In other words, the corresponding |
| 66 // AssociatedInterfaceRequest hasn't been sent. | 78 // AssociatedInterfaceRequest hasn't been sent. |
| 67 DCHECK(associated_group_.GetController()); | 79 DCHECK(associated_group_.GetController()); |
| 68 message->SerializeAssociatedEndpointHandles( | 80 message->SerializeAssociatedEndpointHandles( |
| 69 associated_group_.GetController()); | 81 associated_group_.GetController()); |
| 70 } | 82 } |
| 71 task_runner_->PostTask(FROM_HERE, | 83 task_runner_->PostTask(FROM_HERE, |
| 72 base::Bind(forward_, base::Passed(message))); | 84 base::Bind(forward_, base::Passed(message))); |
| 73 return true; | 85 return true; |
| 74 } | 86 } |
| 75 | 87 |
| 76 bool AcceptWithResponder( | 88 bool AcceptWithResponder( |
| 77 Message* message, | 89 Message* message, |
| 78 std::unique_ptr<MessageReceiver> response_receiver) override { | 90 std::unique_ptr<MessageReceiver> responder) override { |
| 79 if (!message->associated_endpoint_handles()->empty()) { | 91 if (!message->associated_endpoint_handles()->empty()) { |
| 80 // Please see comment for the DCHECK in the previous method. | 92 // Please see comment for the DCHECK in the previous method. |
| 81 DCHECK(associated_group_.GetController()); | 93 DCHECK(associated_group_.GetController()); |
| 82 message->SerializeAssociatedEndpointHandles( | 94 message->SerializeAssociatedEndpointHandles( |
| 83 associated_group_.GetController()); | 95 associated_group_.GetController()); |
| 84 } | 96 } |
| 85 auto responder = | 97 |
| 86 base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver)); | 98 // Async messages are always posted (even if |task_runner_| runs tasks on |
| 99 // this thread) to guarantee that two async calls can't be reordered. | |
| 100 if (!message->has_flag(Message::kFlagIsSync)) { | |
| 101 auto reply_forwarder = | |
| 102 base::MakeUnique<ForwardToCallingThread>(std::move(responder)); | |
| 103 task_runner_->PostTask( | |
| 104 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), | |
| 105 base::Passed(&reply_forwarder))); | |
| 106 return true; | |
| 107 } | |
| 108 | |
| 109 SyncCallRestrictions::AssertSyncCallAllowed(); | |
| 110 | |
| 111 // If the InterfacePtr is bound to this thread, dispatch it directly. | |
| 112 if (task_runner_->RunsTasksOnCurrentThread()) { | |
| 113 forward_with_responder_.Run(std::move(*message), std::move(responder)); | |
| 114 return true; | |
| 115 } | |
| 116 | |
| 117 // If the InterfacePtr is bound on another thread, post the call. | |
| 118 // TODO(yzshen, watk): We block both this thread and the InterfacePtr | |
|
yzshen1
2017/03/29 23:16:47
IO thread must not be blocked. This prevents users
watk
2017/03/30 02:26:05
Yeah this is a shame :( I just ran out of time to
yzshen1
2017/03/30 20:53:05
Please document this restriction WRT IO thread in
watk
2017/03/31 00:27:27
I added a file-level comment. Is that OK?
| |
| 119 // thread. Ideally only this thread would block. | |
| 120 auto response = | |
| 121 make_scoped_refptr(new base::RefCountedData<SyncResponseInfo>()); | |
| 122 auto response_signaler = base::MakeUnique<SyncResponseSignaler>(response); | |
| 87 task_runner_->PostTask( | 123 task_runner_->PostTask( |
| 88 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), | 124 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
| 89 base::Passed(&responder))); | 125 base::Passed(&response_signaler))); |
| 126 | |
| 127 // Save the pending SyncResponseInfo so that if the sync call manages to | |
| 128 // delete |this|, we can signal the completion of the call to return from | |
| 129 // SyncWatch(). | |
| 130 { | |
| 131 base::AutoLock l(lock_); | |
| 132 pending_sync_responses_.push_back(&response->data); | |
| 133 } | |
| 134 | |
| 135 auto assign_true = [](bool* b) { *b = true; }; | |
| 136 bool event_signaled = false; | |
| 137 SyncEventWatcher watcher(&response->data.event, | |
| 138 base::Bind(assign_true, &event_signaled)); | |
| 139 watcher.SyncWatch(&event_signaled); | |
| 140 | |
| 141 if (event_signaled && response->data.received) | |
| 142 ignore_result(responder->Accept(&response->data.message)); | |
| 143 | |
| 144 { | |
| 145 base::AutoLock l(lock_); | |
|
yzshen1
2017/03/29 23:16:47
The object may be destroyed, so it is not safe to
watk
2017/03/30 02:26:04
Oops! Made it refcounted.
| |
| 146 base::Erase(pending_sync_responses_, &response->data); | |
| 147 } | |
| 148 | |
| 90 return true; | 149 return true; |
| 91 } | 150 } |
| 92 | 151 |
| 152 // Data that we need to share between the threads involved in a sync call. | |
| 153 struct SyncResponseInfo { | |
|
yzshen1
2017/03/29 23:16:47
nit: because you have full control over this type,
watk
2017/03/30 02:26:04
Done.
| |
| 154 Message message; | |
| 155 bool received = false; | |
| 156 base::WaitableEvent event{base::WaitableEvent::ResetPolicy::MANUAL, | |
| 157 base::WaitableEvent::InitialState::NOT_SIGNALED}; | |
| 158 }; | |
| 159 | |
| 160 // A MessageReceiver that signals its SyncResponse as complete when it either | |
| 161 // accepts the response message, or is destructed. | |
| 162 class SyncResponseSignaler : public MessageReceiver { | |
| 163 public: | |
| 164 explicit SyncResponseSignaler( | |
| 165 scoped_refptr<base::RefCountedData<SyncResponseInfo>> response) | |
| 166 : response_(response) {} | |
| 167 | |
| 168 ~SyncResponseSignaler() override { | |
| 169 // If Accept() was not called we must still notify the waiter that the | |
| 170 // sync call is finished. | |
| 171 if (response_) | |
| 172 response_->data.event.Signal(); | |
| 173 } | |
| 174 | |
| 175 bool Accept(Message* message) { | |
| 176 response_->data.message = std::move(*message); | |
| 177 response_->data.received = true; | |
| 178 response_->data.event.Signal(); | |
| 179 response_ = nullptr; | |
| 180 return true; | |
| 181 } | |
| 182 | |
| 183 private: | |
| 184 scoped_refptr<base::RefCountedData<SyncResponseInfo>> response_; | |
| 185 }; | |
| 186 | |
| 93 class ForwardToCallingThread : public MessageReceiver { | 187 class ForwardToCallingThread : public MessageReceiver { |
| 94 public: | 188 public: |
| 95 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) | 189 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) |
| 96 : responder_(std::move(responder)), | 190 : responder_(std::move(responder)), |
| 97 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) { | 191 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {} |
| 98 } | |
| 99 | 192 |
| 100 private: | 193 private: |
| 101 bool Accept(Message* message) { | 194 bool Accept(Message* message) { |
| 102 // The current instance will be deleted when this method returns, so we | 195 // The current instance will be deleted when this method returns, so we |
| 103 // have to relinquish the responder's ownership so it does not get | 196 // have to relinquish the responder's ownership so it does not get |
| 104 // deleted. | 197 // deleted. |
| 105 caller_task_runner_->PostTask(FROM_HERE, | 198 caller_task_runner_->PostTask( |
| 199 FROM_HERE, | |
| 106 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, | 200 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, |
| 107 base::Passed(std::move(responder_)), | 201 base::Passed(std::move(responder_)), |
| 108 base::Passed(std::move(*message)))); | 202 base::Passed(std::move(*message)))); |
| 109 return true; | 203 return true; |
| 110 } | 204 } |
| 111 | 205 |
| 112 static void CallAcceptAndDeleteResponder( | 206 static void CallAcceptAndDeleteResponder( |
| 113 std::unique_ptr<MessageReceiver> responder, | 207 std::unique_ptr<MessageReceiver> responder, |
| 114 Message message) { | 208 Message message) { |
| 115 ignore_result(responder->Accept(&message)); | 209 ignore_result(responder->Accept(&message)); |
| 116 } | 210 } |
| 117 | 211 |
| 118 std::unique_ptr<MessageReceiver> responder_; | 212 std::unique_ptr<MessageReceiver> responder_; |
| 119 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_; | 213 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_; |
| 120 }; | 214 }; |
| 121 | 215 |
| 122 ProxyType proxy_; | 216 ProxyType proxy_; |
| 123 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 217 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 124 const ForwardMessageCallback forward_; | 218 const ForwardMessageCallback forward_; |
| 125 const ForwardMessageWithResponderCallback forward_with_responder_; | 219 const ForwardMessageWithResponderCallback forward_with_responder_; |
| 126 AssociatedGroup associated_group_; | 220 AssociatedGroup associated_group_; |
| 127 | 221 |
| 222 // |lock_| protects access to |pending_sync_responses_|. | |
| 223 base::Lock lock_; | |
| 224 std::vector<SyncResponseInfo*> pending_sync_responses_; | |
| 225 | |
| 128 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); | 226 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); |
| 129 }; | 227 }; |
| 130 | 228 |
| 131 template <typename InterfacePtrType> | 229 template <typename InterfacePtrType> |
| 132 class ThreadSafeInterfacePtrBase | 230 class ThreadSafeInterfacePtrBase |
| 133 : public base::RefCountedThreadSafe< | 231 : public base::RefCountedThreadSafe< |
| 134 ThreadSafeInterfacePtrBase<InterfacePtrType>> { | 232 ThreadSafeInterfacePtrBase<InterfacePtrType>> { |
| 135 public: | 233 public: |
| 136 using InterfaceType = typename InterfacePtrType::InterfaceType; | 234 using InterfaceType = typename InterfacePtrType::InterfaceType; |
| 137 using PtrInfoType = typename InterfacePtrType::PtrInfoType; | 235 using PtrInfoType = typename InterfacePtrType::PtrInfoType; |
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 270 using ThreadSafeAssociatedInterfacePtr = | 368 using ThreadSafeAssociatedInterfacePtr = |
| 271 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>; | 369 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>; |
| 272 | 370 |
| 273 template <typename Interface> | 371 template <typename Interface> |
| 274 using ThreadSafeInterfacePtr = | 372 using ThreadSafeInterfacePtr = |
| 275 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>; | 373 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>; |
| 276 | 374 |
| 277 } // namespace mojo | 375 } // namespace mojo |
| 278 | 376 |
| 279 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 377 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
| OLD | NEW |