OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
7 | 7 |
8 #include <memory> | 8 #include <memory> |
9 | 9 |
10 #include "base/macros.h" | 10 #include "base/macros.h" |
11 #include "base/memory/ptr_util.h" | 11 #include "base/memory/ptr_util.h" |
12 #include "base/memory/ref_counted.h" | 12 #include "base/memory/ref_counted.h" |
13 #include "base/task_runner.h" | 13 #include "base/task_runner.h" |
14 #include "base/threading/thread_task_runner_handle.h" | 14 #include "base/threading/thread_task_runner_handle.h" |
15 #include "mojo/public/cpp/bindings/associated_group.h" | 15 #include "mojo/public/cpp/bindings/associated_group.h" |
16 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" | 16 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" |
17 #include "mojo/public/cpp/bindings/interface_ptr.h" | 17 #include "mojo/public/cpp/bindings/interface_ptr.h" |
18 #include "mojo/public/cpp/bindings/message.h" | 18 #include "mojo/public/cpp/bindings/message.h" |
19 #include "mojo/public/cpp/bindings/sync_event_watcher.h" | |
19 | 20 |
20 namespace mojo { | 21 namespace mojo { |
22 namespace { | |
yzshen1
2017/03/27 22:18:25
It is pretty uncommon and not recommended to use a
watk
2017/03/27 23:45:28
Oops, forgot this was a header.
| |
23 void AssignTrue(bool* b) { | |
24 *b = true; | |
25 } | |
26 } // namespace | |
21 | 27 |
22 // Instances of this class may be used from any thread to serialize |Interface| | 28 // Instances of this class may be used from any thread to serialize |Interface| |
23 // messages and forward them elsewhere. In general you should use one of the | 29 // messages and forward them elsewhere. In general you should use one of the |
24 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be | 30 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be |
25 // useful if you need/want to manually manage the lifetime of the underlying | 31 // useful if you need/want to manually manage the lifetime of the underlying |
26 // proxy object which will be used to ultimately send messages. | 32 // proxy object which will be used to ultimately send messages. |
27 template <typename Interface> | 33 template <typename Interface> |
28 class ThreadSafeForwarder : public MessageReceiverWithResponder { | 34 class ThreadSafeForwarder : public MessageReceiverWithResponder { |
29 public: | 35 public: |
30 using ProxyType = typename Interface::Proxy_; | 36 using ProxyType = typename Interface::Proxy_; |
31 using ForwardMessageCallback = base::Callback<void(Message)>; | 37 using ForwardMessageCallback = base::Callback<void(Message)>; |
32 using ForwardMessageWithResponderCallback = | 38 using ForwardMessageWithResponderCallback = |
33 base::Callback<void(Message, std::unique_ptr<MessageReceiver>)>; | 39 base::Callback<void(Message, std::unique_ptr<MessageReceiver>)>; |
34 | 40 |
35 // Constructs a ThreadSafeForwarder through which Messages are forwarded to | 41 // Constructs a ThreadSafeForwarder through which Messages are forwarded to |
36 // |forward| or |forward_with_responder| by posting to |task_runner|. | 42 // |forward| or |forward_with_responder| by posting to |task_runner|. |
37 // | 43 // |
38 // Any message sent through this forwarding interface will dispatch its reply, | 44 // Any message sent through this forwarding interface will dispatch its reply, |
39 // if any, back to the thread which called the corresponding interface method. | 45 // if any, back to the thread which called the corresponding interface method. |
40 ThreadSafeForwarder( | 46 ThreadSafeForwarder( |
41 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, | 47 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, |
42 const ForwardMessageCallback& forward, | 48 const ForwardMessageCallback& forward, |
43 const ForwardMessageWithResponderCallback& forward_with_responder, | 49 const ForwardMessageWithResponderCallback& forward_with_responder, |
44 const AssociatedGroup& associated_group) | 50 const AssociatedGroup& associated_group) |
45 : proxy_(this), | 51 : proxy_(this), |
46 task_runner_(task_runner), | 52 task_runner_(task_runner), |
47 forward_(forward), | 53 forward_(forward), |
48 forward_with_responder_(forward_with_responder), | 54 forward_with_responder_(forward_with_responder), |
49 associated_group_(associated_group) {} | 55 associated_group_(associated_group), |
56 sync_message_event_(base::WaitableEvent::ResetPolicy::MANUAL, | |
57 base::WaitableEvent::InitialState::NOT_SIGNALED), | |
58 weak_factory_(this) {} | |
50 | 59 |
51 ~ThreadSafeForwarder() override {} | 60 ~ThreadSafeForwarder() override {} |
52 | 61 |
53 ProxyType& proxy() { return proxy_; } | 62 ProxyType& proxy() { return proxy_; } |
54 | 63 |
55 private: | 64 private: |
56 // MessageReceiverWithResponder implementation: | 65 // MessageReceiverWithResponder implementation: |
57 bool Accept(Message* message) override { | 66 bool Accept(Message* message) override { |
58 if (!message->associated_endpoint_handles()->empty()) { | 67 if (!message->associated_endpoint_handles()->empty()) { |
59 // If this DCHECK fails, it is likely because: | 68 // If this DCHECK fails, it is likely because: |
60 // - This is a non-associated interface pointer setup using | 69 // - This is a non-associated interface pointer setup using |
61 // PtrWrapper::BindOnTaskRunner( | 70 // PtrWrapper::BindOnTaskRunner( |
62 // InterfacePtrInfo<InterfaceType> ptr_info); | 71 // InterfacePtrInfo<InterfaceType> ptr_info); |
63 // Please see the TODO in that method. | 72 // Please see the TODO in that method. |
64 // - This is an associated interface which hasn't been associated with a | 73 // - This is an associated interface which hasn't been associated with a |
65 // message pipe. In other words, the corresponding | 74 // message pipe. In other words, the corresponding |
66 // AssociatedInterfaceRequest hasn't been sent. | 75 // AssociatedInterfaceRequest hasn't been sent. |
67 DCHECK(associated_group_.GetController()); | 76 DCHECK(associated_group_.GetController()); |
68 message->SerializeAssociatedEndpointHandles( | 77 message->SerializeAssociatedEndpointHandles( |
69 associated_group_.GetController()); | 78 associated_group_.GetController()); |
70 } | 79 } |
71 task_runner_->PostTask(FROM_HERE, | 80 task_runner_->PostTask(FROM_HERE, |
72 base::Bind(forward_, base::Passed(message))); | 81 base::Bind(forward_, base::Passed(message))); |
73 return true; | 82 return true; |
74 } | 83 } |
75 | 84 |
76 bool AcceptWithResponder( | 85 bool AcceptWithResponder( |
77 Message* message, | 86 Message* message, |
78 std::unique_ptr<MessageReceiver> response_receiver) override { | 87 std::unique_ptr<MessageReceiver> responder) override { |
79 if (!message->associated_endpoint_handles()->empty()) { | 88 if (!message->associated_endpoint_handles()->empty()) { |
80 // Please see comment for the DCHECK in the previous method. | 89 // Please see comment for the DCHECK in the previous method. |
81 DCHECK(associated_group_.GetController()); | 90 DCHECK(associated_group_.GetController()); |
82 message->SerializeAssociatedEndpointHandles( | 91 message->SerializeAssociatedEndpointHandles( |
83 associated_group_.GetController()); | 92 associated_group_.GetController()); |
84 } | 93 } |
85 auto responder = | 94 |
86 base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver)); | 95 if (!message->has_flag(Message::kFlagIsSync)) { |
96 auto reply_forwarder = | |
97 base::MakeUnique<ForwardToCallingThread>(std::move(responder)); | |
98 if (task_runner_->RunsTasksOnCurrentThread()) { | |
99 forward_with_responder_.Run(std::move(*message), | |
yzshen1
2017/03/27 22:18:24
I think we would like to preserve order between ca
watk
2017/03/27 23:45:29
True. I had assumed there was a contract that TSIP
yzshen1
2017/03/28 00:06:48
What error notification? :)
watk
2017/03/28 00:14:35
Take a look at sync_method_unittest.cc line 853. T
| |
100 std::move(reply_forwarder)); | |
101 } else { | |
102 task_runner_->PostTask( | |
103 FROM_HERE, | |
104 base::Bind(forward_with_responder_, base::Passed(message), | |
105 base::Passed(&reply_forwarder))); | |
106 } | |
107 return true; | |
108 } | |
109 | |
110 // Sync call on this thread. | |
111 if (task_runner_->RunsTasksOnCurrentThread()) { | |
112 forward_with_responder_.Run(std::move(*message), std::move(responder)); | |
yzshen1
2017/03/27 22:18:24
+CC Ken:
At the beginning, I thought this direct
watk
2017/03/27 23:45:29
I did this because I wasn't sure what the alternat
yzshen1
2017/03/28 00:06:48
What I was trying to say is "I thought this doesn'
watk
2017/03/28 00:14:35
Oh, ok :)
| |
113 return true; | |
114 } | |
115 | |
116 // Sync call on a different thread. | |
117 // TODO: allow the call to be async on the other thread. | |
118 Message response; | |
119 auto reply_signaler = | |
120 base::MakeUnique<SyncReplySignaler>(&response, &sync_message_event_); | |
87 task_runner_->PostTask( | 121 task_runner_->PostTask( |
88 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), | 122 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), |
89 base::Passed(&responder))); | 123 base::Passed(&reply_signaler))); |
124 bool response_received = false; | |
125 // XXX: What does it mean to have reentrant sync calls that are all waiting | |
126 // on the same sync_message_event_? Correct because only reentrant sync | |
yzshen1
2017/03/27 22:18:25
I am not quite sure I understand the question. Wou
watk
2017/03/27 23:45:29
I think I wrote this before I had the sync_message
| |
127 // calls are allowed? | |
128 SyncEventWatcher watcher(&sync_message_event_, | |
129 base::Bind(&AssignTrue, &response_received)); | |
130 watcher.AllowWokenUpBySyncWatchOnSameThread(); | |
yzshen1
2017/03/27 22:18:24
I don't think you need this line.
This is for the
watk
2017/03/27 23:45:29
Ah, gotcha.
| |
131 // XXX: Check that this handles the case that sync_message_event_ is already | |
132 // signaled. | |
133 auto weak_self = weak_factory_.GetWeakPtr(); | |
yzshen1
2017/03/27 22:18:24
WeakPtrFactory will force this object to be only u
watk
2017/03/27 23:45:29
SG, thanks
| |
134 watcher.SyncWatch(&response_received); | |
135 if (weak_self) { | |
136 sync_message_event_.Reset(); | |
137 if (response_received) | |
138 ignore_result(responder->Accept(&response)); | |
yzshen1
2017/03/27 22:18:24
The response may not correspond to the |responder|
watk
2017/03/27 23:45:28
Ah, yes, thanks. I hadn't even thought about non-n
| |
139 } | |
140 | |
90 return true; | 141 return true; |
91 } | 142 } |
92 | 143 |
144 class SyncReplySignaler : public MessageReceiver { | |
145 public: | |
146 SyncReplySignaler(Message* sync_response_destination, | |
147 base::WaitableEvent* sync_response_event) | |
148 : sync_response_destination_(sync_response_destination), | |
149 sync_response_event_(sync_response_event) {} | |
150 | |
151 private: | |
152 bool Accept(Message* message) { | |
153 *sync_response_destination_ = std::move(*message); | |
154 sync_response_event_->Signal(); | |
155 return true; | |
156 } | |
157 | |
158 // XXX: What guarantees do we have about the lifetime of these? | |
yzshen1
2017/03/27 22:18:25
The sync_response_event_ may be destroyed in the m
watk
2017/03/27 23:45:29
Yes, true. I'll make this refcounted then.
| |
159 Message* sync_response_destination_; | |
160 base::WaitableEvent* sync_response_event_; | |
161 }; | |
162 | |
93 class ForwardToCallingThread : public MessageReceiver { | 163 class ForwardToCallingThread : public MessageReceiver { |
94 public: | 164 public: |
95 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) | 165 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) |
96 : responder_(std::move(responder)), | 166 : responder_(std::move(responder)), |
97 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) { | 167 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {} |
98 } | |
99 | 168 |
100 private: | 169 private: |
101 bool Accept(Message* message) { | 170 bool Accept(Message* message) { |
102 // The current instance will be deleted when this method returns, so we | 171 // The current instance will be deleted when this method returns, so we |
103 // have to relinquish the responder's ownership so it does not get | 172 // have to relinquish the responder's ownership so it does not get |
104 // deleted. | 173 // deleted. |
105 caller_task_runner_->PostTask(FROM_HERE, | 174 if (caller_task_runner_->RunsTasksOnCurrentThread()) { |
106 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, | 175 CallAcceptAndDeleteResponder(std::move(responder_), |
107 base::Passed(std::move(responder_)), | 176 std::move(*message)); |
108 base::Passed(std::move(*message)))); | 177 } else { |
178 caller_task_runner_->PostTask( | |
179 FROM_HERE, | |
180 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, | |
181 base::Passed(std::move(responder_)), | |
182 base::Passed(std::move(*message)))); | |
183 } | |
109 return true; | 184 return true; |
110 } | 185 } |
111 | 186 |
112 static void CallAcceptAndDeleteResponder( | 187 static void CallAcceptAndDeleteResponder( |
113 std::unique_ptr<MessageReceiver> responder, | 188 std::unique_ptr<MessageReceiver> responder, |
114 Message message) { | 189 Message message) { |
115 ignore_result(responder->Accept(&message)); | 190 ignore_result(responder->Accept(&message)); |
116 } | 191 } |
117 | 192 |
118 std::unique_ptr<MessageReceiver> responder_; | 193 std::unique_ptr<MessageReceiver> responder_; |
119 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_; | 194 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_; |
120 }; | 195 }; |
121 | 196 |
122 ProxyType proxy_; | 197 ProxyType proxy_; |
123 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 198 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
124 const ForwardMessageCallback forward_; | 199 const ForwardMessageCallback forward_; |
125 const ForwardMessageWithResponderCallback forward_with_responder_; | 200 const ForwardMessageWithResponderCallback forward_with_responder_; |
126 AssociatedGroup associated_group_; | 201 AssociatedGroup associated_group_; |
127 | 202 |
203 // An event used to signal that sync messages are available. | |
204 base::WaitableEvent sync_message_event_; | |
205 base::WeakPtrFactory<ThreadSafeForwarder> weak_factory_; | |
206 | |
128 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); | 207 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); |
129 }; | 208 }; |
130 | 209 |
131 template <typename InterfacePtrType> | 210 template <typename InterfacePtrType> |
132 class ThreadSafeInterfacePtrBase | 211 class ThreadSafeInterfacePtrBase |
133 : public base::RefCountedThreadSafe< | 212 : public base::RefCountedThreadSafe< |
134 ThreadSafeInterfacePtrBase<InterfacePtrType>> { | 213 ThreadSafeInterfacePtrBase<InterfacePtrType>> { |
135 public: | 214 public: |
136 using InterfaceType = typename InterfacePtrType::InterfaceType; | 215 using InterfaceType = typename InterfacePtrType::InterfaceType; |
137 using PtrInfoType = typename InterfacePtrType::PtrInfoType; | 216 using PtrInfoType = typename InterfacePtrType::PtrInfoType; |
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
270 using ThreadSafeAssociatedInterfacePtr = | 349 using ThreadSafeAssociatedInterfacePtr = |
271 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>; | 350 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>; |
272 | 351 |
273 template <typename Interface> | 352 template <typename Interface> |
274 using ThreadSafeInterfacePtr = | 353 using ThreadSafeInterfacePtr = |
275 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>; | 354 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>; |
276 | 355 |
277 } // namespace mojo | 356 } // namespace mojo |
278 | 357 |
279 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ | 358 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ |
OLD | NEW |