| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
| 6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
| 7 | 7 |
| 8 #include <memory> | 8 #include <memory> |
| 9 | 9 |
| 10 #include "base/macros.h" | 10 #include "base/macros.h" |
| 11 #include "base/memory/ptr_util.h" | 11 #include "base/memory/ptr_util.h" |
| 12 #include "base/memory/ref_counted.h" | 12 #include "base/memory/ref_counted.h" |
| 13 #include "base/task_runner.h" | 13 #include "base/task_runner.h" |
| 14 #include "base/threading/thread_task_runner_handle.h" | 14 #include "base/threading/thread_task_runner_handle.h" |
| 15 #include "mojo/public/cpp/bindings/associated_group.h" | 15 #include "mojo/public/cpp/bindings/associated_group.h" |
| 16 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" | 16 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" |
| 17 #include "mojo/public/cpp/bindings/interface_ptr.h" | 17 #include "mojo/public/cpp/bindings/interface_ptr.h" |
| 18 #include "mojo/public/cpp/bindings/message.h" | 18 #include "mojo/public/cpp/bindings/message.h" |
| 19 #include "mojo/public/cpp/bindings/sync_event_watcher.h" |
| 19 | 20 |
| 20 namespace mojo { | 21 namespace mojo { |
| 22 namespace { |
| 23 void AssignTrue(bool* b) { |
| 24 *b = true; |
| 25 } |
| 26 } // namespace |
| 21 | 27 |
| 22 // Instances of this class may be used from any thread to serialize |Interface| | 28 // Instances of this class may be used from any thread to serialize |Interface| |
| 23 // messages and forward them elsewhere. In general you should use one of the | 29 // messages and forward them elsewhere. In general you should use one of the |
| 24 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be | 30 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be |
| 25 // useful if you need/want to manually manage the lifetime of the underlying | 31 // useful if you need/want to manually manage the lifetime of the underlying |
| 26 // proxy object which will be used to ultimately send messages. | 32 // proxy object which will be used to ultimately send messages. |
| 27 template <typename Interface> | 33 template <typename Interface> |
| 28 class ThreadSafeForwarder : public MessageReceiverWithResponder { | 34 class ThreadSafeForwarder : public MessageReceiverWithResponder { |
| 29 public: | 35 public: |
| 30 using ProxyType = typename Interface::Proxy_; | 36 using ProxyType = typename Interface::Proxy_; |
| 31 using ForwardMessageCallback = base::Callback<void(Message)>; | 37 using ForwardMessageCallback = base::Callback<void(Message)>; |
| 32 using ForwardMessageWithResponderCallback = | 38 using ForwardMessageWithResponderCallback = |
| 33 base::Callback<void(Message, std::unique_ptr<MessageReceiver>)>; | 39 base::Callback<void(Message, std::unique_ptr<MessageReceiver>)>; |
| 34 | 40 |
| 35 // Constructs a ThreadSafeForwarder through which Messages are forwarded to | 41 // Constructs a ThreadSafeForwarder through which Messages are forwarded to |
| 36 // |forward| or |forward_with_responder| by posting to |task_runner|. | 42 // |forward| or |forward_with_responder| by posting to |task_runner|. |
| 37 // | 43 // |
| 38 // Any message sent through this forwarding interface will dispatch its reply, | 44 // Any message sent through this forwarding interface will dispatch its reply, |
| 39 // if any, back to the thread which called the corresponding interface method. | 45 // if any, back to the thread which called the corresponding interface method. |
| 40 ThreadSafeForwarder( | 46 ThreadSafeForwarder( |
| 41 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, | 47 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, |
| 42 const ForwardMessageCallback& forward, | 48 const ForwardMessageCallback& forward, |
| 43 const ForwardMessageWithResponderCallback& forward_with_responder, | 49 const ForwardMessageWithResponderCallback& forward_with_responder, |
| 44 const AssociatedGroup& associated_group) | 50 const AssociatedGroup& associated_group) |
| 45 : proxy_(this), | 51 : proxy_(this), |
| 46 task_runner_(task_runner), | 52 task_runner_(task_runner), |
| 47 forward_(forward), | 53 forward_(forward), |
| 48 forward_with_responder_(forward_with_responder), | 54 forward_with_responder_(forward_with_responder), |
| 49 associated_group_(associated_group) {} | 55 associated_group_(associated_group), |
| 56 sync_message_event_(base::WaitableEvent::ResetPolicy::MANUAL, |
| 57 base::WaitableEvent::InitialState::NOT_SIGNALED), |
| 58 weak_factory_(this) {} |
| 50 | 59 |
| 51 ~ThreadSafeForwarder() override {} | 60 ~ThreadSafeForwarder() override {} |
| 52 | 61 |
| 53 ProxyType& proxy() { return proxy_; } | 62 ProxyType& proxy() { return proxy_; } |
| 54 | 63 |
| 55 private: | 64 private: |
| 56 // MessageReceiverWithResponder implementation: | 65 // MessageReceiverWithResponder implementation: |
| 57 bool Accept(Message* message) override { | 66 bool Accept(Message* message) override { |
| 58 if (!message->associated_endpoint_handles()->empty()) { | 67 if (!message->associated_endpoint_handles()->empty()) { |
| 59 // If this DCHECK fails, it is likely because: | 68 // If this DCHECK fails, it is likely because: |
| 60 // - This is a non-associated interface pointer setup using | 69 // - This is a non-associated interface pointer setup using |
| 61 // PtrWrapper::BindOnTaskRunner( | 70 // PtrWrapper::BindOnTaskRunner( |
| 62 // InterfacePtrInfo<InterfaceType> ptr_info); | 71 // InterfacePtrInfo<InterfaceType> ptr_info); |
| 63 // Please see the TODO in that method. | 72 // Please see the TODO in that method. |
| 64 // - This is an associated interface which hasn't been associated with a | 73 // - This is an associated interface which hasn't been associated with a |
| 65 // message pipe. In other words, the corresponding | 74 // message pipe. In other words, the corresponding |
| 66 // AssociatedInterfaceRequest hasn't been sent. | 75 // AssociatedInterfaceRequest hasn't been sent. |
| 67 DCHECK(associated_group_.GetController()); | 76 DCHECK(associated_group_.GetController()); |
| 68 message->SerializeAssociatedEndpointHandles( | 77 message->SerializeAssociatedEndpointHandles( |
| 69 associated_group_.GetController()); | 78 associated_group_.GetController()); |
| 70 } | 79 } |
| 71 task_runner_->PostTask(FROM_HERE, | 80 task_runner_->PostTask(FROM_HERE, |
| 72 base::Bind(forward_, base::Passed(message))); | 81 base::Bind(forward_, base::Passed(message))); |
| 73 return true; | 82 return true; |
| 74 } | 83 } |
| 75 | 84 |
| 76 bool AcceptWithResponder( | 85 bool AcceptWithResponder( |
| 77 Message* message, | 86 Message* message, |
| 78 std::unique_ptr<MessageReceiver> response_receiver) override { | 87 std::unique_ptr<MessageReceiver> responder) override { |
| 79 if (!message->associated_endpoint_handles()->empty()) { | 88 if (!message->associated_endpoint_handles()->empty()) { |
| 80 // Please see comment for the DCHECK in the previous method. | 89 // Please see comment for the DCHECK in the previous method. |
| 81 DCHECK(associated_group_.GetController()); | 90 DCHECK(associated_group_.GetController()); |
| 82 message->SerializeAssociatedEndpointHandles( | 91 message->SerializeAssociatedEndpointHandles( |
| 83 associated_group_.GetController()); | 92 associated_group_.GetController()); |
| 84 } | 93 } |
| 85 auto responder = | 94 |
| 86 base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver)); | 95 if (!message->has_flag(Message::kFlagIsSync)) { |
| 96 auto reply_forwarder = |
| 97 base::MakeUnique<ForwardToCallingThread>(std::move(responder)); |
| 98 task_runner_->PostTask( |
| 99 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
| 100 base::Passed(&reply_forwarder))); |
| 101 return true; |
| 102 } |
| 103 |
| 104 // Sync call on this thread. |
| 105 if (task_runner_->RunsTasksOnCurrentThread()) { |
| 106 forward_with_responder_.Run(std::move(*message), std::move(responder)); |
| 107 return true; |
| 108 } |
| 109 |
| 110 // Sync call on a different thread. |
| 111 // TODO: allow the call to be async on the other thread. |
| 112 Message response; |
| 113 auto reply_signaler = |
| 114 base::MakeUnique<SyncReplySignaler>(&response, &sync_message_event_); |
| 87 task_runner_->PostTask( | 115 task_runner_->PostTask( |
| 88 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), | 116 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
| 89 base::Passed(&responder))); | 117 base::Passed(&reply_signaler))); |
| 118 bool response_received = false; |
| 119 // XXX: What does it mean to have reentrant sync calls that are all waiting |
| 120 // on the same sync_message_event_? Correct because only reentrant sync |
| 121 // calls are allowed? |
| 122 SyncEventWatcher watcher(&sync_message_event_, |
| 123 base::Bind(&AssignTrue, &response_received)); |
| 124 watcher.AllowWokenUpBySyncWatchOnSameThread(); |
| 125 // XXX: Check that this handles the case that sync_message_event_ is already |
| 126 // signaled. |
| 127 auto weak_self = weak_factory_.GetWeakPtr(); |
| 128 watcher.SyncWatch(&response_received); |
| 129 if (weak_self) { |
| 130 sync_message_event_.Reset(); |
| 131 if (response_received) |
| 132 ignore_result(responder->Accept(&response)); |
| 133 } |
| 134 |
| 90 return true; | 135 return true; |
| 91 } | 136 } |
| 92 | 137 |
| 138 class SyncReplySignaler : public MessageReceiver { |
| 139 public: |
| 140 SyncReplySignaler(Message* sync_response_destination, |
| 141 base::WaitableEvent* sync_response_event) |
| 142 : sync_response_destination_(sync_response_destination), |
| 143 sync_response_event_(sync_response_event) {} |
| 144 |
| 145 private: |
| 146 bool Accept(Message* message) { |
| 147 *sync_response_destination_ = std::move(*message); |
| 148 sync_response_event_->Signal(); |
| 149 return true; |
| 150 } |
| 151 |
| 152 // XXX: What guarantees do we have about the lifetime of these? |
| 153 Message* sync_response_destination_; |
| 154 base::WaitableEvent* sync_response_event_; |
| 155 }; |
| 156 |
| 93 class ForwardToCallingThread : public MessageReceiver { | 157 class ForwardToCallingThread : public MessageReceiver { |
| 94 public: | 158 public: |
| 95 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) | 159 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) |
| 96 : responder_(std::move(responder)), | 160 : responder_(std::move(responder)), |
| 97 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) { | 161 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {} |
| 98 } | |
| 99 | 162 |
| 100 private: | 163 private: |
| 101 bool Accept(Message* message) { | 164 bool Accept(Message* message) { |
| 102 // The current instance will be deleted when this method returns, so we | 165 // The current instance will be deleted when this method returns, so we |
| 103 // have to relinquish the responder's ownership so it does not get | 166 // have to relinquish the responder's ownership so it does not get |
| 104 // deleted. | 167 // deleted. |
| 105 caller_task_runner_->PostTask(FROM_HERE, | 168 caller_task_runner_->PostTask(FROM_HERE, |
| 106 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, | 169 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, |
| 107 base::Passed(std::move(responder_)), | 170 base::Passed(std::move(responder_)), |
| 108 base::Passed(std::move(*message)))); | 171 base::Passed(std::move(*message)))); |
| 109 return true; | 172 return true; |
| 110 } | 173 } |
| 111 | 174 |
| 112 static void CallAcceptAndDeleteResponder( | 175 static void CallAcceptAndDeleteResponder( |
| 113 std::unique_ptr<MessageReceiver> responder, | 176 std::unique_ptr<MessageReceiver> responder, |
| 114 Message message) { | 177 Message message) { |
| 115 ignore_result(responder->Accept(&message)); | 178 ignore_result(responder->Accept(&message)); |
| 116 } | 179 } |
| 117 | 180 |
| 118 std::unique_ptr<MessageReceiver> responder_; | 181 std::unique_ptr<MessageReceiver> responder_; |
| 119 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_; | 182 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_; |
| 120 }; | 183 }; |
| 121 | 184 |
| 122 ProxyType proxy_; | 185 ProxyType proxy_; |
| 123 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 186 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 124 const ForwardMessageCallback forward_; | 187 const ForwardMessageCallback forward_; |
| 125 const ForwardMessageWithResponderCallback forward_with_responder_; | 188 const ForwardMessageWithResponderCallback forward_with_responder_; |
| 126 AssociatedGroup associated_group_; | 189 AssociatedGroup associated_group_; |
| 127 | 190 |
| 191 // An event used to signal that sync messages are available. |
| 192 base::WaitableEvent sync_message_event_; |
| 193 base::WeakPtrFactory<ThreadSafeForwarder> weak_factory_; |
| 194 |
| 128 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); | 195 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); |
| 129 }; | 196 }; |
| 130 | 197 |
| 131 template <typename InterfacePtrType> | 198 template <typename InterfacePtrType> |
| 132 class ThreadSafeInterfacePtrBase | 199 class ThreadSafeInterfacePtrBase |
| 133 : public base::RefCountedThreadSafe< | 200 : public base::RefCountedThreadSafe< |
| 134 ThreadSafeInterfacePtrBase<InterfacePtrType>> { | 201 ThreadSafeInterfacePtrBase<InterfacePtrType>> { |
| 135 public: | 202 public: |
| 136 using InterfaceType = typename InterfacePtrType::InterfaceType; | 203 using InterfaceType = typename InterfacePtrType::InterfaceType; |
| 137 using PtrInfoType = typename InterfacePtrType::PtrInfoType; | 204 using PtrInfoType = typename InterfacePtrType::PtrInfoType; |
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 270 using ThreadSafeAssociatedInterfacePtr = | 337 using ThreadSafeAssociatedInterfacePtr = |
| 271 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>; | 338 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>; |
| 272 | 339 |
| 273 template <typename Interface> | 340 template <typename Interface> |
| 274 using ThreadSafeInterfacePtr = | 341 using ThreadSafeInterfacePtr = |
| 275 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>; | 342 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>; |
| 276 | 343 |
| 277 } // namespace mojo | 344 } // namespace mojo |
| 278 | 345 |
| 279 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 346 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
| OLD | NEW |