OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" | 5 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 #include <utility> | 8 #include <utility> |
9 | 9 |
10 #include "base/bind.h" | 10 #include "base/bind.h" |
11 #include "base/location.h" | 11 #include "base/location.h" |
12 #include "base/macros.h" | 12 #include "base/macros.h" |
13 #include "base/message_loop/message_loop.h" | 13 #include "base/message_loop/message_loop.h" |
14 #include "base/single_thread_task_runner.h" | 14 #include "base/single_thread_task_runner.h" |
15 #include "base/stl_util.h" | 15 #include "base/stl_util.h" |
16 #include "base/thread_task_runner_handle.h" | 16 #include "base/thread_task_runner_handle.h" |
17 #include "mojo/public/cpp/bindings/associated_group.h" | 17 #include "mojo/public/cpp/bindings/associated_group.h" |
| 18 #include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h" |
18 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 19 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
19 | 20 |
20 namespace mojo { | 21 namespace mojo { |
21 namespace internal { | 22 namespace internal { |
22 | 23 |
23 // ---------------------------------------------------------------------------- | 24 // ---------------------------------------------------------------------------- |
24 | 25 |
25 namespace { | 26 namespace { |
26 | 27 |
27 void DCheckIfInvalid(const base::WeakPtr<InterfaceEndpointClient>& client, | 28 void DCheckIfInvalid(const base::WeakPtr<InterfaceEndpointClient>& client, |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
93 bool accept_was_invoked_; | 94 bool accept_was_invoked_; |
94 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 95 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
95 | 96 |
96 DISALLOW_COPY_AND_ASSIGN(ResponderThunk); | 97 DISALLOW_COPY_AND_ASSIGN(ResponderThunk); |
97 }; | 98 }; |
98 | 99 |
99 } // namespace | 100 } // namespace |
100 | 101 |
101 // ---------------------------------------------------------------------------- | 102 // ---------------------------------------------------------------------------- |
102 | 103 |
| 104 InterfaceEndpointClient::SyncResponseInfo::SyncResponseInfo( |
| 105 bool* in_response_received) |
| 106 : response_received(in_response_received) {} |
| 107 |
| 108 InterfaceEndpointClient::SyncResponseInfo::~SyncResponseInfo() {} |
| 109 |
| 110 // ---------------------------------------------------------------------------- |
| 111 |
103 InterfaceEndpointClient::HandleIncomingMessageThunk::HandleIncomingMessageThunk( | 112 InterfaceEndpointClient::HandleIncomingMessageThunk::HandleIncomingMessageThunk( |
104 InterfaceEndpointClient* owner) | 113 InterfaceEndpointClient* owner) |
105 : owner_(owner) {} | 114 : owner_(owner) {} |
106 | 115 |
107 InterfaceEndpointClient::HandleIncomingMessageThunk:: | 116 InterfaceEndpointClient::HandleIncomingMessageThunk:: |
108 ~HandleIncomingMessageThunk() {} | 117 ~HandleIncomingMessageThunk() {} |
109 | 118 |
110 bool InterfaceEndpointClient::HandleIncomingMessageThunk::Accept( | 119 bool InterfaceEndpointClient::HandleIncomingMessageThunk::Accept( |
111 Message* message) { | 120 Message* message) { |
112 return owner_->HandleValidatedMessage(message); | 121 return owner_->HandleValidatedMessage(message); |
113 } | 122 } |
114 | 123 |
115 // ---------------------------------------------------------------------------- | 124 // ---------------------------------------------------------------------------- |
116 | 125 |
117 InterfaceEndpointClient::InterfaceEndpointClient( | 126 InterfaceEndpointClient::InterfaceEndpointClient( |
118 ScopedInterfaceEndpointHandle handle, | 127 ScopedInterfaceEndpointHandle handle, |
119 MessageReceiverWithResponderStatus* receiver, | 128 MessageReceiverWithResponderStatus* receiver, |
120 scoped_ptr<MessageFilter> payload_validator) | 129 scoped_ptr<MessageFilter> payload_validator, |
| 130 bool expect_sync_requests) |
121 : handle_(std::move(handle)), | 131 : handle_(std::move(handle)), |
122 incoming_receiver_(receiver), | 132 incoming_receiver_(receiver), |
123 payload_validator_(std::move(payload_validator)), | 133 payload_validator_(std::move(payload_validator)), |
124 thunk_(this), | 134 thunk_(this), |
125 next_request_id_(1), | 135 next_request_id_(1), |
126 encountered_error_(false), | 136 encountered_error_(false), |
127 weak_ptr_factory_(this) { | 137 weak_ptr_factory_(this) { |
128 DCHECK(handle_.is_valid()); | 138 DCHECK(handle_.is_valid()); |
129 DCHECK(handle_.is_local()); | 139 DCHECK(handle_.is_local()); |
130 | 140 |
131 // TODO(yzshen): the way to use validator (or message filter in general) | 141 // TODO(yzshen): the way to use validator (or message filter in general) |
132 // directly is a little awkward. | 142 // directly is a little awkward. |
133 payload_validator_->set_sink(&thunk_); | 143 payload_validator_->set_sink(&thunk_); |
134 | 144 |
135 handle_.router()->AttachEndpointClient(handle_, this); | 145 controller_ = handle_.router()->AttachEndpointClient(handle_, this); |
| 146 if (expect_sync_requests) |
| 147 controller_->AllowWokenUpBySyncWatchOnSameThread(); |
136 } | 148 } |
137 | 149 |
138 InterfaceEndpointClient::~InterfaceEndpointClient() { | 150 InterfaceEndpointClient::~InterfaceEndpointClient() { |
139 DCHECK(thread_checker_.CalledOnValidThread()); | 151 DCHECK(thread_checker_.CalledOnValidThread()); |
140 | 152 |
141 STLDeleteValues(&responders_); | |
142 | |
143 handle_.router()->DetachEndpointClient(handle_); | 153 handle_.router()->DetachEndpointClient(handle_); |
144 } | 154 } |
145 | 155 |
146 AssociatedGroup* InterfaceEndpointClient::associated_group() { | 156 AssociatedGroup* InterfaceEndpointClient::associated_group() { |
147 if (!associated_group_) | 157 if (!associated_group_) |
148 associated_group_ = handle_.router()->CreateAssociatedGroup(); | 158 associated_group_ = handle_.router()->CreateAssociatedGroup(); |
149 return associated_group_.get(); | 159 return associated_group_.get(); |
150 } | 160 } |
151 | 161 |
152 uint32_t InterfaceEndpointClient::interface_id() const { | 162 uint32_t InterfaceEndpointClient::interface_id() const { |
153 DCHECK(thread_checker_.CalledOnValidThread()); | 163 DCHECK(thread_checker_.CalledOnValidThread()); |
154 return handle_.id(); | 164 return handle_.id(); |
155 } | 165 } |
156 | 166 |
157 ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() { | 167 ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() { |
158 DCHECK(thread_checker_.CalledOnValidThread()); | 168 DCHECK(thread_checker_.CalledOnValidThread()); |
159 DCHECK(!has_pending_responders()); | 169 DCHECK(!has_pending_responders()); |
160 | 170 |
161 if (!handle_.is_valid()) | 171 if (!handle_.is_valid()) |
162 return ScopedInterfaceEndpointHandle(); | 172 return ScopedInterfaceEndpointHandle(); |
163 | 173 |
| 174 controller_ = nullptr; |
164 handle_.router()->DetachEndpointClient(handle_); | 175 handle_.router()->DetachEndpointClient(handle_); |
165 | 176 |
166 return std::move(handle_); | 177 return std::move(handle_); |
167 } | 178 } |
168 | 179 |
169 void InterfaceEndpointClient::RaiseError() { | 180 void InterfaceEndpointClient::RaiseError() { |
170 DCHECK(thread_checker_.CalledOnValidThread()); | 181 DCHECK(thread_checker_.CalledOnValidThread()); |
171 | 182 |
172 handle_.router()->RaiseError(); | 183 handle_.router()->RaiseError(); |
173 } | 184 } |
174 | 185 |
175 bool InterfaceEndpointClient::Accept(Message* message) { | 186 bool InterfaceEndpointClient::Accept(Message* message) { |
176 DCHECK(thread_checker_.CalledOnValidThread()); | 187 DCHECK(thread_checker_.CalledOnValidThread()); |
| 188 DCHECK(controller_); |
177 DCHECK(!message->has_flag(kMessageExpectsResponse)); | 189 DCHECK(!message->has_flag(kMessageExpectsResponse)); |
178 | 190 |
179 if (encountered_error_) | 191 if (encountered_error_) |
180 return false; | 192 return false; |
181 | 193 |
182 return handle_.router()->SendMessage(handle_, message); | 194 return controller_->SendMessage(message); |
183 } | 195 } |
184 | 196 |
185 bool InterfaceEndpointClient::AcceptWithResponder(Message* message, | 197 bool InterfaceEndpointClient::AcceptWithResponder(Message* message, |
186 MessageReceiver* responder) { | 198 MessageReceiver* responder) { |
187 DCHECK(thread_checker_.CalledOnValidThread()); | 199 DCHECK(thread_checker_.CalledOnValidThread()); |
| 200 DCHECK(controller_); |
188 DCHECK(message->has_flag(kMessageExpectsResponse)); | 201 DCHECK(message->has_flag(kMessageExpectsResponse)); |
189 | 202 |
190 // TODO(yzshen): Sync method call using assoicated interfaces or master | |
191 // interfaces that serve associated interfaces hasn't been supported yet. | |
192 if (message->has_flag(kMessageIsSync)) { | |
193 NOTIMPLEMENTED(); | |
194 return false; | |
195 } | |
196 | |
197 if (encountered_error_) | 203 if (encountered_error_) |
198 return false; | 204 return false; |
199 | 205 |
200 // Reserve 0 in case we want it to convey special meaning in the future. | 206 // Reserve 0 in case we want it to convey special meaning in the future. |
201 uint64_t request_id = next_request_id_++; | 207 uint64_t request_id = next_request_id_++; |
202 if (request_id == 0) | 208 if (request_id == 0) |
203 request_id = next_request_id_++; | 209 request_id = next_request_id_++; |
204 | 210 |
205 message->set_request_id(request_id); | 211 message->set_request_id(request_id); |
206 | 212 |
207 if (!handle_.router()->SendMessage(handle_, message)) | 213 if (!controller_->SendMessage(message)) |
208 return false; | 214 return false; |
209 | 215 |
210 // We assume ownership of |responder|. | 216 if (!message->has_flag(kMessageIsSync)) { |
211 responders_[request_id] = responder; | 217 // We assume ownership of |responder|. |
| 218 async_responders_[request_id] = make_scoped_ptr(responder); |
| 219 return true; |
| 220 } |
| 221 |
| 222 bool response_received = false; |
| 223 scoped_ptr<MessageReceiver> sync_responder(responder); |
| 224 sync_responses_.insert(std::make_pair( |
| 225 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received)))); |
| 226 |
| 227 base::WeakPtr<InterfaceEndpointClient> weak_self = |
| 228 weak_ptr_factory_.GetWeakPtr(); |
| 229 controller_->SyncWatch(&response_received); |
| 230 // Make sure that this instance hasn't been destroyed. |
| 231 if (weak_self) { |
| 232 DCHECK(ContainsKey(sync_responses_, request_id)); |
| 233 auto iter = sync_responses_.find(request_id); |
| 234 DCHECK_EQ(&response_received, iter->second->response_received); |
| 235 if (response_received) { |
| 236 scoped_ptr<Message> response = std::move(iter->second->response); |
| 237 ignore_result(sync_responder->Accept(response.get())); |
| 238 } |
| 239 sync_responses_.erase(iter); |
| 240 } |
| 241 |
| 242 // Return true means that we take ownership of |responder|. |
212 return true; | 243 return true; |
213 } | 244 } |
214 | 245 |
215 bool InterfaceEndpointClient::HandleIncomingMessage(Message* message) { | 246 bool InterfaceEndpointClient::HandleIncomingMessage(Message* message) { |
216 DCHECK(thread_checker_.CalledOnValidThread()); | 247 DCHECK(thread_checker_.CalledOnValidThread()); |
217 | 248 |
218 return payload_validator_->Accept(message); | 249 return payload_validator_->Accept(message); |
219 } | 250 } |
220 | 251 |
221 void InterfaceEndpointClient::NotifyError() { | 252 void InterfaceEndpointClient::NotifyError() { |
(...skipping 13 matching lines...) Expand all Loading... |
235 return false; | 266 return false; |
236 | 267 |
237 MessageReceiverWithStatus* responder = | 268 MessageReceiverWithStatus* responder = |
238 new ResponderThunk(weak_ptr_factory_.GetWeakPtr()); | 269 new ResponderThunk(weak_ptr_factory_.GetWeakPtr()); |
239 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); | 270 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); |
240 if (!ok) | 271 if (!ok) |
241 delete responder; | 272 delete responder; |
242 return ok; | 273 return ok; |
243 } else if (message->has_flag(kMessageIsResponse)) { | 274 } else if (message->has_flag(kMessageIsResponse)) { |
244 uint64_t request_id = message->request_id(); | 275 uint64_t request_id = message->request_id(); |
245 ResponderMap::iterator it = responders_.find(request_id); | 276 |
246 if (it == responders_.end()) | 277 if (message->has_flag(kMessageIsSync)) { |
| 278 auto it = sync_responses_.find(request_id); |
| 279 if (it == sync_responses_.end()) |
| 280 return false; |
| 281 it->second->response.reset(new Message()); |
| 282 message->MoveTo(it->second->response.get()); |
| 283 *it->second->response_received = true; |
| 284 return true; |
| 285 } |
| 286 |
| 287 auto it = async_responders_.find(request_id); |
| 288 if (it == async_responders_.end()) |
247 return false; | 289 return false; |
248 MessageReceiver* responder = it->second; | 290 scoped_ptr<MessageReceiver> responder = std::move(it->second); |
249 responders_.erase(it); | 291 async_responders_.erase(it); |
250 bool ok = responder->Accept(message); | 292 return responder->Accept(message); |
251 delete responder; | |
252 return ok; | |
253 } else { | 293 } else { |
254 if (!incoming_receiver_) | 294 if (!incoming_receiver_) |
255 return false; | 295 return false; |
256 | 296 |
257 return incoming_receiver_->Accept(message); | 297 return incoming_receiver_->Accept(message); |
258 } | 298 } |
259 } | 299 } |
260 | 300 |
261 } // namespace internal | 301 } // namespace internal |
262 } // namespace mojo | 302 } // namespace mojo |
OLD | NEW |