Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(27)

Side by Side Diff: mojo/public/cpp/bindings/lib/interface_endpoint_client.cc

Issue 1823683006: Mojo C++ bindings: sync call support for associated interfaces and master interfaces (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" 5 #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/location.h" 11 #include "base/location.h"
12 #include "base/macros.h" 12 #include "base/macros.h"
13 #include "base/message_loop/message_loop.h" 13 #include "base/message_loop/message_loop.h"
14 #include "base/single_thread_task_runner.h" 14 #include "base/single_thread_task_runner.h"
15 #include "base/stl_util.h" 15 #include "base/stl_util.h"
16 #include "base/thread_task_runner_handle.h" 16 #include "base/thread_task_runner_handle.h"
17 #include "mojo/public/cpp/bindings/associated_group.h" 17 #include "mojo/public/cpp/bindings/associated_group.h"
18 #include "mojo/public/cpp/bindings/lib/interface_endpoint_controller.h"
18 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 19 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
19 20
20 namespace mojo { 21 namespace mojo {
21 namespace internal { 22 namespace internal {
22 23
23 // ---------------------------------------------------------------------------- 24 // ----------------------------------------------------------------------------
24 25
25 namespace { 26 namespace {
26 27
27 void DCheckIfInvalid(const base::WeakPtr<InterfaceEndpointClient>& client, 28 void DCheckIfInvalid(const base::WeakPtr<InterfaceEndpointClient>& client,
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
93 bool accept_was_invoked_; 94 bool accept_was_invoked_;
94 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 95 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
95 96
96 DISALLOW_COPY_AND_ASSIGN(ResponderThunk); 97 DISALLOW_COPY_AND_ASSIGN(ResponderThunk);
97 }; 98 };
98 99
99 } // namespace 100 } // namespace
100 101
101 // ---------------------------------------------------------------------------- 102 // ----------------------------------------------------------------------------
102 103
104 InterfaceEndpointClient::SyncResponseInfo::SyncResponseInfo(
105 bool* in_response_received)
106 : response_received(in_response_received) {}
107
108 InterfaceEndpointClient::SyncResponseInfo::~SyncResponseInfo() {}
109
110 // ----------------------------------------------------------------------------
111
103 InterfaceEndpointClient::HandleIncomingMessageThunk::HandleIncomingMessageThunk( 112 InterfaceEndpointClient::HandleIncomingMessageThunk::HandleIncomingMessageThunk(
104 InterfaceEndpointClient* owner) 113 InterfaceEndpointClient* owner)
105 : owner_(owner) {} 114 : owner_(owner) {}
106 115
107 InterfaceEndpointClient::HandleIncomingMessageThunk:: 116 InterfaceEndpointClient::HandleIncomingMessageThunk::
108 ~HandleIncomingMessageThunk() {} 117 ~HandleIncomingMessageThunk() {}
109 118
110 bool InterfaceEndpointClient::HandleIncomingMessageThunk::Accept( 119 bool InterfaceEndpointClient::HandleIncomingMessageThunk::Accept(
111 Message* message) { 120 Message* message) {
112 return owner_->HandleValidatedMessage(message); 121 return owner_->HandleValidatedMessage(message);
113 } 122 }
114 123
115 // ---------------------------------------------------------------------------- 124 // ----------------------------------------------------------------------------
116 125
117 InterfaceEndpointClient::InterfaceEndpointClient( 126 InterfaceEndpointClient::InterfaceEndpointClient(
118 ScopedInterfaceEndpointHandle handle, 127 ScopedInterfaceEndpointHandle handle,
119 MessageReceiverWithResponderStatus* receiver, 128 MessageReceiverWithResponderStatus* receiver,
120 scoped_ptr<MessageFilter> payload_validator) 129 scoped_ptr<MessageFilter> payload_validator,
130 bool expect_sync_requests)
121 : handle_(std::move(handle)), 131 : handle_(std::move(handle)),
122 incoming_receiver_(receiver), 132 incoming_receiver_(receiver),
123 payload_validator_(std::move(payload_validator)), 133 payload_validator_(std::move(payload_validator)),
124 thunk_(this), 134 thunk_(this),
125 next_request_id_(1), 135 next_request_id_(1),
126 encountered_error_(false), 136 encountered_error_(false),
127 weak_ptr_factory_(this) { 137 weak_ptr_factory_(this) {
128 DCHECK(handle_.is_valid()); 138 DCHECK(handle_.is_valid());
129 DCHECK(handle_.is_local()); 139 DCHECK(handle_.is_local());
130 140
131 // TODO(yzshen): the way to use validator (or message filter in general) 141 // TODO(yzshen): the way to use validator (or message filter in general)
132 // directly is a little awkward. 142 // directly is a little awkward.
133 payload_validator_->set_sink(&thunk_); 143 payload_validator_->set_sink(&thunk_);
134 144
135 handle_.router()->AttachEndpointClient(handle_, this); 145 controller_ = handle_.router()->AttachEndpointClient(handle_, this);
146 if (expect_sync_requests)
147 controller_->AllowWokenUpBySyncWatchOnSameThread();
136 } 148 }
137 149
138 InterfaceEndpointClient::~InterfaceEndpointClient() { 150 InterfaceEndpointClient::~InterfaceEndpointClient() {
139 DCHECK(thread_checker_.CalledOnValidThread()); 151 DCHECK(thread_checker_.CalledOnValidThread());
140 152
141 STLDeleteValues(&responders_);
142
143 handle_.router()->DetachEndpointClient(handle_); 153 handle_.router()->DetachEndpointClient(handle_);
144 } 154 }
145 155
146 AssociatedGroup* InterfaceEndpointClient::associated_group() { 156 AssociatedGroup* InterfaceEndpointClient::associated_group() {
147 if (!associated_group_) 157 if (!associated_group_)
148 associated_group_ = handle_.router()->CreateAssociatedGroup(); 158 associated_group_ = handle_.router()->CreateAssociatedGroup();
149 return associated_group_.get(); 159 return associated_group_.get();
150 } 160 }
151 161
152 uint32_t InterfaceEndpointClient::interface_id() const { 162 uint32_t InterfaceEndpointClient::interface_id() const {
153 DCHECK(thread_checker_.CalledOnValidThread()); 163 DCHECK(thread_checker_.CalledOnValidThread());
154 return handle_.id(); 164 return handle_.id();
155 } 165 }
156 166
157 ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() { 167 ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() {
158 DCHECK(thread_checker_.CalledOnValidThread()); 168 DCHECK(thread_checker_.CalledOnValidThread());
159 DCHECK(!has_pending_responders()); 169 DCHECK(!has_pending_responders());
160 170
161 if (!handle_.is_valid()) 171 if (!handle_.is_valid())
162 return ScopedInterfaceEndpointHandle(); 172 return ScopedInterfaceEndpointHandle();
163 173
174 controller_ = nullptr;
164 handle_.router()->DetachEndpointClient(handle_); 175 handle_.router()->DetachEndpointClient(handle_);
165 176
166 return std::move(handle_); 177 return std::move(handle_);
167 } 178 }
168 179
169 void InterfaceEndpointClient::RaiseError() { 180 void InterfaceEndpointClient::RaiseError() {
170 DCHECK(thread_checker_.CalledOnValidThread()); 181 DCHECK(thread_checker_.CalledOnValidThread());
171 182
172 handle_.router()->RaiseError(); 183 handle_.router()->RaiseError();
173 } 184 }
174 185
175 bool InterfaceEndpointClient::Accept(Message* message) { 186 bool InterfaceEndpointClient::Accept(Message* message) {
176 DCHECK(thread_checker_.CalledOnValidThread()); 187 DCHECK(thread_checker_.CalledOnValidThread());
188 DCHECK(controller_);
177 DCHECK(!message->has_flag(kMessageExpectsResponse)); 189 DCHECK(!message->has_flag(kMessageExpectsResponse));
178 190
179 if (encountered_error_) 191 if (encountered_error_)
180 return false; 192 return false;
181 193
182 return handle_.router()->SendMessage(handle_, message); 194 return controller_->SendMessage(message);
183 } 195 }
184 196
185 bool InterfaceEndpointClient::AcceptWithResponder(Message* message, 197 bool InterfaceEndpointClient::AcceptWithResponder(Message* message,
186 MessageReceiver* responder) { 198 MessageReceiver* responder) {
187 DCHECK(thread_checker_.CalledOnValidThread()); 199 DCHECK(thread_checker_.CalledOnValidThread());
200 DCHECK(controller_);
188 DCHECK(message->has_flag(kMessageExpectsResponse)); 201 DCHECK(message->has_flag(kMessageExpectsResponse));
189 202
190 // TODO(yzshen): Sync method call using assoicated interfaces or master
191 // interfaces that serve associated interfaces hasn't been supported yet.
192 if (message->has_flag(kMessageIsSync)) {
193 NOTIMPLEMENTED();
194 return false;
195 }
196
197 if (encountered_error_) 203 if (encountered_error_)
198 return false; 204 return false;
199 205
200 // Reserve 0 in case we want it to convey special meaning in the future. 206 // Reserve 0 in case we want it to convey special meaning in the future.
201 uint64_t request_id = next_request_id_++; 207 uint64_t request_id = next_request_id_++;
202 if (request_id == 0) 208 if (request_id == 0)
203 request_id = next_request_id_++; 209 request_id = next_request_id_++;
204 210
205 message->set_request_id(request_id); 211 message->set_request_id(request_id);
206 212
207 if (!handle_.router()->SendMessage(handle_, message)) 213 if (!controller_->SendMessage(message))
208 return false; 214 return false;
209 215
210 // We assume ownership of |responder|. 216 if (!message->has_flag(kMessageIsSync)) {
211 responders_[request_id] = responder; 217 // We assume ownership of |responder|.
218 async_responders_[request_id] = make_scoped_ptr(responder);
219 return true;
220 }
221
222 bool response_received = false;
223 scoped_ptr<MessageReceiver> sync_responder(responder);
224 sync_responses_.insert(std::make_pair(
225 request_id, make_scoped_ptr(new SyncResponseInfo(&response_received))));
226
227 base::WeakPtr<InterfaceEndpointClient> weak_self =
228 weak_ptr_factory_.GetWeakPtr();
229 controller_->SyncWatch(&response_received);
230 // Make sure that this instance hasn't been destroyed.
231 if (weak_self) {
232 DCHECK(ContainsKey(sync_responses_, request_id));
233 auto iter = sync_responses_.find(request_id);
234 DCHECK_EQ(&response_received, iter->second->response_received);
235 if (response_received) {
236 scoped_ptr<Message> response = std::move(iter->second->response);
237 ignore_result(sync_responder->Accept(response.get()));
238 }
239 sync_responses_.erase(iter);
240 }
241
242 // Return true means that we take ownership of |responder|.
212 return true; 243 return true;
213 } 244 }
214 245
215 bool InterfaceEndpointClient::HandleIncomingMessage(Message* message) { 246 bool InterfaceEndpointClient::HandleIncomingMessage(Message* message) {
216 DCHECK(thread_checker_.CalledOnValidThread()); 247 DCHECK(thread_checker_.CalledOnValidThread());
217 248
218 return payload_validator_->Accept(message); 249 return payload_validator_->Accept(message);
219 } 250 }
220 251
221 void InterfaceEndpointClient::NotifyError() { 252 void InterfaceEndpointClient::NotifyError() {
(...skipping 13 matching lines...) Expand all
235 return false; 266 return false;
236 267
237 MessageReceiverWithStatus* responder = 268 MessageReceiverWithStatus* responder =
238 new ResponderThunk(weak_ptr_factory_.GetWeakPtr()); 269 new ResponderThunk(weak_ptr_factory_.GetWeakPtr());
239 bool ok = incoming_receiver_->AcceptWithResponder(message, responder); 270 bool ok = incoming_receiver_->AcceptWithResponder(message, responder);
240 if (!ok) 271 if (!ok)
241 delete responder; 272 delete responder;
242 return ok; 273 return ok;
243 } else if (message->has_flag(kMessageIsResponse)) { 274 } else if (message->has_flag(kMessageIsResponse)) {
244 uint64_t request_id = message->request_id(); 275 uint64_t request_id = message->request_id();
245 ResponderMap::iterator it = responders_.find(request_id); 276
246 if (it == responders_.end()) 277 if (message->has_flag(kMessageIsSync)) {
278 auto it = sync_responses_.find(request_id);
279 if (it == sync_responses_.end())
280 return false;
281 it->second->response.reset(new Message());
282 message->MoveTo(it->second->response.get());
283 *it->second->response_received = true;
284 return true;
285 }
286
287 auto it = async_responders_.find(request_id);
288 if (it == async_responders_.end())
247 return false; 289 return false;
248 MessageReceiver* responder = it->second; 290 scoped_ptr<MessageReceiver> responder = std::move(it->second);
249 responders_.erase(it); 291 async_responders_.erase(it);
250 bool ok = responder->Accept(message); 292 return responder->Accept(message);
251 delete responder;
252 return ok;
253 } else { 293 } else {
254 if (!incoming_receiver_) 294 if (!incoming_receiver_)
255 return false; 295 return false;
256 296
257 return incoming_receiver_->Accept(message); 297 return incoming_receiver_->Accept(message);
258 } 298 }
259 } 299 }
260 300
261 } // namespace internal 301 } // namespace internal
262 } // namespace mojo 302 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/interface_endpoint_client.h ('k') | mojo/public/cpp/bindings/lib/interface_endpoint_controller.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698