Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(249)

Side by Side Diff: mojo/public/cpp/bindings/thread_safe_interface_ptr.h

Issue 2770153003: mojo: Support sync calls through ThreadSafeInterfacePtr (Closed)
Patch Set: Fix more tests Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ 5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_
6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ 6 #define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_
7 7
8 #include <memory> 8 #include <memory>
9 9
10 #include "base/macros.h" 10 #include "base/macros.h"
11 #include "base/memory/ptr_util.h" 11 #include "base/memory/ptr_util.h"
12 #include "base/memory/ref_counted.h" 12 #include "base/memory/ref_counted.h"
13 #include "base/task_runner.h" 13 #include "base/task_runner.h"
14 #include "base/threading/thread_task_runner_handle.h" 14 #include "base/threading/thread_task_runner_handle.h"
15 #include "mojo/public/cpp/bindings/associated_group.h" 15 #include "mojo/public/cpp/bindings/associated_group.h"
16 #include "mojo/public/cpp/bindings/associated_interface_ptr.h" 16 #include "mojo/public/cpp/bindings/associated_interface_ptr.h"
17 #include "mojo/public/cpp/bindings/interface_ptr.h" 17 #include "mojo/public/cpp/bindings/interface_ptr.h"
18 #include "mojo/public/cpp/bindings/message.h" 18 #include "mojo/public/cpp/bindings/message.h"
19 #include "mojo/public/cpp/bindings/sync_event_watcher.h"
19 20
20 namespace mojo { 21 namespace mojo {
22 namespace {
yzshen1 2017/03/27 22:18:25 It is pretty uncommon and not recommended to use a
watk 2017/03/27 23:45:28 Oops, forgot this was a header.
23 void AssignTrue(bool* b) {
24 *b = true;
25 }
26 } // namespace
21 27
22 // Instances of this class may be used from any thread to serialize |Interface| 28 // Instances of this class may be used from any thread to serialize |Interface|
23 // messages and forward them elsewhere. In general you should use one of the 29 // messages and forward them elsewhere. In general you should use one of the
24 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be 30 // ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be
25 // useful if you need/want to manually manage the lifetime of the underlying 31 // useful if you need/want to manually manage the lifetime of the underlying
26 // proxy object which will be used to ultimately send messages. 32 // proxy object which will be used to ultimately send messages.
27 template <typename Interface> 33 template <typename Interface>
28 class ThreadSafeForwarder : public MessageReceiverWithResponder { 34 class ThreadSafeForwarder : public MessageReceiverWithResponder {
29 public: 35 public:
30 using ProxyType = typename Interface::Proxy_; 36 using ProxyType = typename Interface::Proxy_;
31 using ForwardMessageCallback = base::Callback<void(Message)>; 37 using ForwardMessageCallback = base::Callback<void(Message)>;
32 using ForwardMessageWithResponderCallback = 38 using ForwardMessageWithResponderCallback =
33 base::Callback<void(Message, std::unique_ptr<MessageReceiver>)>; 39 base::Callback<void(Message, std::unique_ptr<MessageReceiver>)>;
34 40
35 // Constructs a ThreadSafeForwarder through which Messages are forwarded to 41 // Constructs a ThreadSafeForwarder through which Messages are forwarded to
36 // |forward| or |forward_with_responder| by posting to |task_runner|. 42 // |forward| or |forward_with_responder| by posting to |task_runner|.
37 // 43 //
38 // Any message sent through this forwarding interface will dispatch its reply, 44 // Any message sent through this forwarding interface will dispatch its reply,
39 // if any, back to the thread which called the corresponding interface method. 45 // if any, back to the thread which called the corresponding interface method.
40 ThreadSafeForwarder( 46 ThreadSafeForwarder(
41 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner, 47 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
42 const ForwardMessageCallback& forward, 48 const ForwardMessageCallback& forward,
43 const ForwardMessageWithResponderCallback& forward_with_responder, 49 const ForwardMessageWithResponderCallback& forward_with_responder,
44 const AssociatedGroup& associated_group) 50 const AssociatedGroup& associated_group)
45 : proxy_(this), 51 : proxy_(this),
46 task_runner_(task_runner), 52 task_runner_(task_runner),
47 forward_(forward), 53 forward_(forward),
48 forward_with_responder_(forward_with_responder), 54 forward_with_responder_(forward_with_responder),
49 associated_group_(associated_group) {} 55 associated_group_(associated_group),
56 sync_message_event_(base::WaitableEvent::ResetPolicy::MANUAL,
57 base::WaitableEvent::InitialState::NOT_SIGNALED),
58 weak_factory_(this) {}
50 59
51 ~ThreadSafeForwarder() override {} 60 ~ThreadSafeForwarder() override {}
52 61
53 ProxyType& proxy() { return proxy_; } 62 ProxyType& proxy() { return proxy_; }
54 63
55 private: 64 private:
56 // MessageReceiverWithResponder implementation: 65 // MessageReceiverWithResponder implementation:
57 bool Accept(Message* message) override { 66 bool Accept(Message* message) override {
58 if (!message->associated_endpoint_handles()->empty()) { 67 if (!message->associated_endpoint_handles()->empty()) {
59 // If this DCHECK fails, it is likely because: 68 // If this DCHECK fails, it is likely because:
60 // - This is a non-associated interface pointer setup using 69 // - This is a non-associated interface pointer setup using
61 // PtrWrapper::BindOnTaskRunner( 70 // PtrWrapper::BindOnTaskRunner(
62 // InterfacePtrInfo<InterfaceType> ptr_info); 71 // InterfacePtrInfo<InterfaceType> ptr_info);
63 // Please see the TODO in that method. 72 // Please see the TODO in that method.
64 // - This is an associated interface which hasn't been associated with a 73 // - This is an associated interface which hasn't been associated with a
65 // message pipe. In other words, the corresponding 74 // message pipe. In other words, the corresponding
66 // AssociatedInterfaceRequest hasn't been sent. 75 // AssociatedInterfaceRequest hasn't been sent.
67 DCHECK(associated_group_.GetController()); 76 DCHECK(associated_group_.GetController());
68 message->SerializeAssociatedEndpointHandles( 77 message->SerializeAssociatedEndpointHandles(
69 associated_group_.GetController()); 78 associated_group_.GetController());
70 } 79 }
71 task_runner_->PostTask(FROM_HERE, 80 task_runner_->PostTask(FROM_HERE,
72 base::Bind(forward_, base::Passed(message))); 81 base::Bind(forward_, base::Passed(message)));
73 return true; 82 return true;
74 } 83 }
75 84
76 bool AcceptWithResponder( 85 bool AcceptWithResponder(
77 Message* message, 86 Message* message,
78 std::unique_ptr<MessageReceiver> response_receiver) override { 87 std::unique_ptr<MessageReceiver> responder) override {
79 if (!message->associated_endpoint_handles()->empty()) { 88 if (!message->associated_endpoint_handles()->empty()) {
80 // Please see comment for the DCHECK in the previous method. 89 // Please see comment for the DCHECK in the previous method.
81 DCHECK(associated_group_.GetController()); 90 DCHECK(associated_group_.GetController());
82 message->SerializeAssociatedEndpointHandles( 91 message->SerializeAssociatedEndpointHandles(
83 associated_group_.GetController()); 92 associated_group_.GetController());
84 } 93 }
85 auto responder = 94
86 base::MakeUnique<ForwardToCallingThread>(std::move(response_receiver)); 95 if (!message->has_flag(Message::kFlagIsSync)) {
96 auto reply_forwarder =
97 base::MakeUnique<ForwardToCallingThread>(std::move(responder));
98 if (task_runner_->RunsTasksOnCurrentThread()) {
99 forward_with_responder_.Run(std::move(*message),
yzshen1 2017/03/27 22:18:24 I think we would like to preserve order between ca
watk 2017/03/27 23:45:29 True. I had assumed there was a contract that TSIP
yzshen1 2017/03/28 00:06:48 What error notification? :)
watk 2017/03/28 00:14:35 Take a look at sync_method_unittest.cc line 853. T
100 std::move(reply_forwarder));
101 } else {
102 task_runner_->PostTask(
103 FROM_HERE,
104 base::Bind(forward_with_responder_, base::Passed(message),
105 base::Passed(&reply_forwarder)));
106 }
107 return true;
108 }
109
110 // Sync call on this thread.
111 if (task_runner_->RunsTasksOnCurrentThread()) {
112 forward_with_responder_.Run(std::move(*message), std::move(responder));
yzshen1 2017/03/27 22:18:24 +CC Ken: At the beginning, I thought this direct
watk 2017/03/27 23:45:29 I did this because I wasn't sure what the alternat
yzshen1 2017/03/28 00:06:48 What I was trying to say is "I thought this doesn'
watk 2017/03/28 00:14:35 Oh, ok :)
113 return true;
114 }
115
116 // Sync call on a different thread.
117 // TODO: allow the call to be async on the other thread.
118 Message response;
119 auto reply_signaler =
120 base::MakeUnique<SyncReplySignaler>(&response, &sync_message_event_);
87 task_runner_->PostTask( 121 task_runner_->PostTask(
88 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message), 122 FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
89 base::Passed(&responder))); 123 base::Passed(&reply_signaler)));
124 bool response_received = false;
125 // XXX: What does it mean to have reentrant sync calls that are all waiting
126 // on the same sync_message_event_? Correct because only reentrant sync
yzshen1 2017/03/27 22:18:25 I am not quite sure I understand the question. Wou
watk 2017/03/27 23:45:29 I think I wrote this before I had the sync_message
127 // calls are allowed?
128 SyncEventWatcher watcher(&sync_message_event_,
129 base::Bind(&AssignTrue, &response_received));
130 watcher.AllowWokenUpBySyncWatchOnSameThread();
yzshen1 2017/03/27 22:18:24 I don't think you need this line. This is for the
watk 2017/03/27 23:45:29 Ah, gotcha.
131 // XXX: Check that this handles the case that sync_message_event_ is already
132 // signaled.
133 auto weak_self = weak_factory_.GetWeakPtr();
yzshen1 2017/03/27 22:18:24 WeakPtrFactory will force this object to be only u
watk 2017/03/27 23:45:29 SG, thanks
134 watcher.SyncWatch(&response_received);
135 if (weak_self) {
136 sync_message_event_.Reset();
137 if (response_received)
138 ignore_result(responder->Accept(&response));
yzshen1 2017/03/27 22:18:24 The response may not correspond to the |responder|
watk 2017/03/27 23:45:28 Ah, yes, thanks. I hadn't even thought about non-n
139 }
140
90 return true; 141 return true;
91 } 142 }
92 143
144 class SyncReplySignaler : public MessageReceiver {
145 public:
146 SyncReplySignaler(Message* sync_response_destination,
147 base::WaitableEvent* sync_response_event)
148 : sync_response_destination_(sync_response_destination),
149 sync_response_event_(sync_response_event) {}
150
151 private:
152 bool Accept(Message* message) {
153 *sync_response_destination_ = std::move(*message);
154 sync_response_event_->Signal();
155 return true;
156 }
157
158 // XXX: What guarantees do we have about the lifetime of these?
yzshen1 2017/03/27 22:18:25 The sync_response_event_ may be destroyed in the m
watk 2017/03/27 23:45:29 Yes, true. I'll make this refcounted then.
159 Message* sync_response_destination_;
160 base::WaitableEvent* sync_response_event_;
161 };
162
93 class ForwardToCallingThread : public MessageReceiver { 163 class ForwardToCallingThread : public MessageReceiver {
94 public: 164 public:
95 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder) 165 explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder)
96 : responder_(std::move(responder)), 166 : responder_(std::move(responder)),
97 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) { 167 caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
98 }
99 168
100 private: 169 private:
101 bool Accept(Message* message) { 170 bool Accept(Message* message) {
102 // The current instance will be deleted when this method returns, so we 171 // The current instance will be deleted when this method returns, so we
103 // have to relinquish the responder's ownership so it does not get 172 // have to relinquish the responder's ownership so it does not get
104 // deleted. 173 // deleted.
105 caller_task_runner_->PostTask(FROM_HERE, 174 if (caller_task_runner_->RunsTasksOnCurrentThread()) {
106 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder, 175 CallAcceptAndDeleteResponder(std::move(responder_),
107 base::Passed(std::move(responder_)), 176 std::move(*message));
108 base::Passed(std::move(*message)))); 177 } else {
178 caller_task_runner_->PostTask(
179 FROM_HERE,
180 base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder,
181 base::Passed(std::move(responder_)),
182 base::Passed(std::move(*message))));
183 }
109 return true; 184 return true;
110 } 185 }
111 186
112 static void CallAcceptAndDeleteResponder( 187 static void CallAcceptAndDeleteResponder(
113 std::unique_ptr<MessageReceiver> responder, 188 std::unique_ptr<MessageReceiver> responder,
114 Message message) { 189 Message message) {
115 ignore_result(responder->Accept(&message)); 190 ignore_result(responder->Accept(&message));
116 } 191 }
117 192
118 std::unique_ptr<MessageReceiver> responder_; 193 std::unique_ptr<MessageReceiver> responder_;
119 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_; 194 scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
120 }; 195 };
121 196
122 ProxyType proxy_; 197 ProxyType proxy_;
123 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 198 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
124 const ForwardMessageCallback forward_; 199 const ForwardMessageCallback forward_;
125 const ForwardMessageWithResponderCallback forward_with_responder_; 200 const ForwardMessageWithResponderCallback forward_with_responder_;
126 AssociatedGroup associated_group_; 201 AssociatedGroup associated_group_;
127 202
203 // An event used to signal that sync messages are available.
204 base::WaitableEvent sync_message_event_;
205 base::WeakPtrFactory<ThreadSafeForwarder> weak_factory_;
206
128 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder); 207 DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder);
129 }; 208 };
130 209
131 template <typename InterfacePtrType> 210 template <typename InterfacePtrType>
132 class ThreadSafeInterfacePtrBase 211 class ThreadSafeInterfacePtrBase
133 : public base::RefCountedThreadSafe< 212 : public base::RefCountedThreadSafe<
134 ThreadSafeInterfacePtrBase<InterfacePtrType>> { 213 ThreadSafeInterfacePtrBase<InterfacePtrType>> {
135 public: 214 public:
136 using InterfaceType = typename InterfacePtrType::InterfaceType; 215 using InterfaceType = typename InterfacePtrType::InterfaceType;
137 using PtrInfoType = typename InterfacePtrType::PtrInfoType; 216 using PtrInfoType = typename InterfacePtrType::PtrInfoType;
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after
270 using ThreadSafeAssociatedInterfacePtr = 349 using ThreadSafeAssociatedInterfacePtr =
271 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>; 350 ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>;
272 351
273 template <typename Interface> 352 template <typename Interface>
274 using ThreadSafeInterfacePtr = 353 using ThreadSafeInterfacePtr =
275 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>; 354 ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>;
276 355
277 } // namespace mojo 356 } // namespace mojo
278 357
279 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_ 358 #endif // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698