Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(92)

Side by Side Diff: mojo/public/cpp/bindings/lib/interface_endpoint_client.cc

Issue 1823683006: Mojo C++ bindings: sync call support for associated interfaces and master interfaces (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" 5 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/location.h" 11 #include "base/location.h"
12 #include "base/macros.h" 12 #include "base/macros.h"
13 #include "base/message_loop/message_loop.h" 13 #include "base/message_loop/message_loop.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "base/stl_util.h" 15 #include "base/stl_util.h"
16 #include "base/thread_task_runner_handle.h" 16 #include "base/thread_task_runner_handle.h"
17 #include "mojo/public/cpp/bindings/associated_group.h" 17 #include "mojo/public/cpp/bindings/associated_group.h"
18 #include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h"
18 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 19 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
19 20
20 namespace mojo { 21 namespace mojo {
21 namespace internal { 22 namespace internal {
22 23
23 // ---------------------------------------------------------------------------- 24 // ----------------------------------------------------------------------------
24 25
25 namespace { 26 namespace {
26 27
27 void DCheckIfInvalid(const base::WeakPtr<InterfaceEndpointClient>& client, 28 void DCheckIfInvalid(const base::WeakPtr<InterfaceEndpointClient>& client,
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 91
91 private: 92 private:
92 base::WeakPtr<InterfaceEndpointClient> endpoint_client_; 93 base::WeakPtr<InterfaceEndpointClient> endpoint_client_;
93 bool accept_was_invoked_; 94 bool accept_was_invoked_;
94 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 95 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
95 96
96 DISALLOW_COPY_AND_ASSIGN(ResponderThunk); 97 DISALLOW_COPY_AND_ASSIGN(ResponderThunk);
97 }; 98 };
98 99
99 } // namespace 100 } // namespace
101 // ----------------------------------------------------------------------------
Ken Rockot(use gerrit already) 2016/03/29 06:36:49 nit: vertical space above?
yzshen1 2016/03/29 16:19:01 Done.
102
103 InterfaceEndpointClient::SyncResponseInfo::SyncResponseInfo(
104 bool* in_response_received)
105 : response_received(in_response_received) {}
106
107 InterfaceEndpointClient::SyncResponseInfo::~SyncResponseInfo() {}
100 108
101 // ---------------------------------------------------------------------------- 109 // ----------------------------------------------------------------------------
102 110
103 InterfaceEndpointClient::HandleIncomingMessageThunk::HandleIncomingMessageThunk( 111 InterfaceEndpointClient::HandleIncomingMessageThunk::HandleIncomingMessageThunk(
104 InterfaceEndpointClient* owner) 112 InterfaceEndpointClient* owner)
105 : owner_(owner) {} 113 : owner_(owner) {}
106 114
107 InterfaceEndpointClient::HandleIncomingMessageThunk:: 115 InterfaceEndpointClient::HandleIncomingMessageThunk::
108 ~HandleIncomingMessageThunk() {} 116 ~HandleIncomingMessageThunk() {}
109 117
110 bool InterfaceEndpointClient::HandleIncomingMessageThunk::Accept( 118 bool InterfaceEndpointClient::HandleIncomingMessageThunk::Accept(
111 Message* message) { 119 Message* message) {
112 return owner_->HandleValidatedMessage(message); 120 return owner_->HandleValidatedMessage(message);
113 } 121 }
114 122
115 // ---------------------------------------------------------------------------- 123 // ----------------------------------------------------------------------------
116 124
117 InterfaceEndpointClient::InterfaceEndpointClient( 125 InterfaceEndpointClient::InterfaceEndpointClient(
118 ScopedInterfaceEndpointHandle handle, 126 ScopedInterfaceEndpointHandle handle,
119 MessageReceiverWithResponderStatus* receiver, 127 MessageReceiverWithResponderStatus* receiver,
120 scoped_ptr<MessageFilter> payload_validator) 128 scoped_ptr<MessageFilter> payload_validator,
129 bool expect_sync_requests)
121 : handle_(std::move(handle)), 130 : handle_(std::move(handle)),
122 incoming_receiver_(receiver), 131 incoming_receiver_(receiver),
123 payload_validator_(std::move(payload_validator)), 132 payload_validator_(std::move(payload_validator)),
124 thunk_(this), 133 thunk_(this),
125 next_request_id_(1), 134 next_request_id_(1),
126 encountered_error_(false), 135 encountered_error_(false),
127 weak_ptr_factory_(this) { 136 weak_ptr_factory_(this) {
128 DCHECK(handle_.is_valid()); 137 DCHECK(handle_.is_valid());
129 DCHECK(handle_.is_local()); 138 DCHECK(handle_.is_local());
130 139
131 // TODO(yzshen): the way to use validator (or message filter in general) 140 // TODO(yzshen): the way to use validator (or message filter in general)
132 // directly is a little awkward. 141 // directly is a little awkward.
133 payload_validator_->set_sink(&thunk_); 142 payload_validator_->set_sink(&thunk_);
134 143
135 handle_.router()->AttachEndpointClient(handle_, this); 144 controller_ = handle_.router()->AttachEndpointClient(handle_, this);
145 if (expect_sync_requests)
146 controller_->AllowWokenUpBySyncWatchOnSameThread();
136 } 147 }
137 148
138 InterfaceEndpointClient::~InterfaceEndpointClient() { 149 InterfaceEndpointClient::~InterfaceEndpointClient() {
139 DCHECK(thread_checker_.CalledOnValidThread()); 150 DCHECK(thread_checker_.CalledOnValidThread());
140 151
141 STLDeleteValues(&responders_);
142
143 handle_.router()->DetachEndpointClient(handle_); 152 handle_.router()->DetachEndpointClient(handle_);
144 } 153 }
145 154
146 AssociatedGroup* InterfaceEndpointClient::associated_group() { 155 AssociatedGroup* InterfaceEndpointClient::associated_group() {
147 if (!associated_group_) 156 if (!associated_group_)
148 associated_group_ = handle_.router()->CreateAssociatedGroup(); 157 associated_group_ = handle_.router()->CreateAssociatedGroup();
149 return associated_group_.get(); 158 return associated_group_.get();
150 } 159 }
151 160
152 uint32_t InterfaceEndpointClient::interface_id() const { 161 uint32_t InterfaceEndpointClient::interface_id() const {
153 DCHECK(thread_checker_.CalledOnValidThread()); 162 DCHECK(thread_checker_.CalledOnValidThread());
154 return handle_.id(); 163 return handle_.id();
155 } 164 }
156 165
157 ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() { 166 ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() {
158 DCHECK(thread_checker_.CalledOnValidThread()); 167 DCHECK(thread_checker_.CalledOnValidThread());
159 DCHECK(!has_pending_responders()); 168 DCHECK(!has_pending_responders());
160 169
161 if (!handle_.is_valid()) 170 if (!handle_.is_valid())
162 return ScopedInterfaceEndpointHandle(); 171 return ScopedInterfaceEndpointHandle();
163 172
173 controller_ = nullptr;
164 handle_.router()->DetachEndpointClient(handle_); 174 handle_.router()->DetachEndpointClient(handle_);
165 175
166 return std::move(handle_); 176 return std::move(handle_);
167 } 177 }
168 178
169 void InterfaceEndpointClient::RaiseError() { 179 void InterfaceEndpointClient::RaiseError() {
170 DCHECK(thread_checker_.CalledOnValidThread()); 180 DCHECK(thread_checker_.CalledOnValidThread());
171 181
172 handle_.router()->RaiseError(); 182 handle_.router()->RaiseError();
173 } 183 }
174 184
175 bool InterfaceEndpointClient::Accept(Message* message) { 185 bool InterfaceEndpointClient::Accept(Message* message) {
176 DCHECK(thread_checker_.CalledOnValidThread()); 186 DCHECK(thread_checker_.CalledOnValidThread());
187 DCHECK(controller_);
177 DCHECK(!message->has_flag(kMessageExpectsResponse)); 188 DCHECK(!message->has_flag(kMessageExpectsResponse));
178 189
179 if (encountered_error_) 190 if (encountered_error_)
180 return false; 191 return false;
181 192
182 return handle_.router()->SendMessage(handle_, message); 193 return controller_->SendMessage(message);
183 } 194 }
184 195
185 bool InterfaceEndpointClient::AcceptWithResponder(Message* message, 196 bool InterfaceEndpointClient::AcceptWithResponder(Message* message,
186 MessageReceiver* responder) { 197 MessageReceiver* responder) {
187 DCHECK(thread_checker_.CalledOnValidThread()); 198 DCHECK(thread_checker_.CalledOnValidThread());
199 DCHECK(controller_);
188 DCHECK(message->has_flag(kMessageExpectsResponse)); 200 DCHECK(message->has_flag(kMessageExpectsResponse));
189 201
190 // TODO(yzshen): Sync method call using assoicated interfaces or master
191 // interfaces that serve associated interfaces hasn't been supported yet.
192 if (message->has_flag(kMessageIsSync)) {
193 NOTIMPLEMENTED();
194 return false;
195 }
196
197 if (encountered_error_) 202 if (encountered_error_)
198 return false; 203 return false;
199 204
200 // Reserve 0 in case we want it to convey special meaning in the future. 205 // Reserve 0 in case we want it to convey special meaning in the future.
201 uint64_t request_id = next_request_id_++; 206 uint64_t request_id = next_request_id_++;
202 if (request_id == 0) 207 if (request_id == 0)
203 request_id = next_request_id_++; 208 request_id = next_request_id_++;
204 209
205 message->set_request_id(request_id); 210 message->set_request_id(request_id);
206 211
207 if (!handle_.router()->SendMessage(handle_, message)) 212 if (!controller_->SendMessage(message))
208 return false; 213 return false;
209 214
210 // We assume ownership of |responder|. 215 if (!message->has_flag(kMessageIsSync)) {
211 responders_[request_id] = responder; 216 // We assume ownership of |responder|.
217 async_responders_[request_id] = make_scoped_ptr(responder);
218 return true;
219 }
220
221 bool response_received = false;
222 scoped_ptr<MessageReceiver> sync_responder(responder);
223 sync_responses_.insert(std::make_pair(
224 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
225
226 base::WeakPtr<InterfaceEndpointClient> weak_self =
227 weak_ptr_factory_.GetWeakPtr();
228 controller_->SyncWatch(&response_received);
229 // Make sure that this instance hasn't been destroyed.
230 if (weak_self) {
231 DCHECK(ContainsKey(sync_responses_, request_id));
232 auto iter = sync_responses_.find(request_id);
233 DCHECK_EQ(&response_received, iter->second->response_received);
234 if (response_received) {
235 scoped_ptr<Message> response = std::move(iter->second->response);
236 ignore_result(sync_responder->Accept(response.get()));
237 }
238 sync_responses_.erase(iter);
239 }
240
241 // Return true means that we take ownership of |responder|.
212 return true; 242 return true;
213 } 243 }
214 244
215 bool InterfaceEndpointClient::HandleIncomingMessage(Message* message) { 245 bool InterfaceEndpointClient::HandleIncomingMessage(Message* message) {
216 DCHECK(thread_checker_.CalledOnValidThread()); 246 DCHECK(thread_checker_.CalledOnValidThread());
217 247
218 return payload_validator_->Accept(message); 248 return payload_validator_->Accept(message);
219 } 249 }
220 250
221 void InterfaceEndpointClient::NotifyError() { 251 void InterfaceEndpointClient::NotifyError() {
(...skipping 13 matching lines...) Expand all
235 return false; 265 return false;
236 266
237 MessageReceiverWithStatus* responder = 267 MessageReceiverWithStatus* responder =
238 new ResponderThunk(weak_ptr_factory_.GetWeakPtr()); 268 new ResponderThunk(weak_ptr_factory_.GetWeakPtr());
239 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); 269 bool ok = incoming_receiver_->AcceptWithResponder(message, responder);
240 if (!ok) 270 if (!ok)
241 delete responder; 271 delete responder;
242 return ok; 272 return ok;
243 } else if (message->has_flag(kMessageIsResponse)) { 273 } else if (message->has_flag(kMessageIsResponse)) {
244 uint64_t request_id = message->request_id(); 274 uint64_t request_id = message->request_id();
245 ResponderMap::iterator it = responders_.find(request_id); 275
246 if (it == responders_.end()) 276 if (message->has_flag(kMessageIsSync)) {
277 auto it = sync_responses_.find(request_id);
278 if (it == sync_responses_.end())
279 return false;
280 it->second->response.reset(new Message());
281 message->MoveTo(it->second->response.get());
282 *it->second->response_received = true;
283 return true;
284 }
285
286 auto it = async_responders_.find(request_id);
287 if (it == async_responders_.end())
247 return false; 288 return false;
248 MessageReceiver* responder = it->second; 289 scoped_ptr<MessageReceiver> responder = std::move(it->second);
249 responders_.erase(it); 290 async_responders_.erase(it);
250 bool ok = responder->Accept(message); 291 return responder->Accept(message);
251 delete responder;
252 return ok;
253 } else { 292 } else {
254 if (!incoming_receiver_) 293 if (!incoming_receiver_)
255 return false; 294 return false;
256 295
257 return incoming_receiver_->Accept(message); 296 return incoming_receiver_->Accept(message);
258 } 297 }
259 } 298 }
260 299
261 } // namespace internal 300 } // namespace internal
262 } // namespace mojo 301 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698