Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(502)

Side by Side Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 1713203002: Mojo C++ bindings: support sync methods - part 2 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/connector.h" 5 #include "mojo/public/cpp/bindings/lib/connector.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h"
10 #include "base/logging.h" 11 #include "base/logging.h"
11 #include "base/macros.h" 12 #include "base/macros.h"
12 #include "base/synchronization/lock.h" 13 #include "base/synchronization/lock.h"
14 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
13 15
14 namespace mojo { 16 namespace mojo {
15 namespace internal { 17 namespace internal {
16 18
17 namespace { 19 namespace {
18 20
19 // Similar to base::AutoLock, except that it does nothing if |lock| passed into 21 // Similar to base::AutoLock, except that it does nothing if |lock| passed into
20 // the constructor is null. 22 // the constructor is null.
21 class MayAutoLock { 23 class MayAutoLock {
22 public: 24 public:
(...skipping 22 matching lines...) Expand all
45 ConnectorConfig config, 47 ConnectorConfig config,
46 const MojoAsyncWaiter* waiter) 48 const MojoAsyncWaiter* waiter)
47 : waiter_(waiter), 49 : waiter_(waiter),
48 message_pipe_(std::move(message_pipe)), 50 message_pipe_(std::move(message_pipe)),
49 incoming_receiver_(nullptr), 51 incoming_receiver_(nullptr),
50 async_wait_id_(0), 52 async_wait_id_(0),
51 error_(false), 53 error_(false),
52 drop_writes_(false), 54 drop_writes_(false),
53 enforce_errors_from_incoming_receiver_(true), 55 enforce_errors_from_incoming_receiver_(true),
54 paused_(false), 56 paused_(false),
55 destroyed_flag_(nullptr), 57 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr),
56 lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr) { 58 register_sync_handle_watch_count_(0),
59 registered_with_sync_handle_watcher_(false),
60 sync_handle_watcher_callback_count_(0),
61 weak_factory_(this) {
57 // Even though we don't have an incoming receiver, we still want to monitor 62 // Even though we don't have an incoming receiver, we still want to monitor
58 // the message pipe to know if is closed or encounters an error. 63 // the message pipe to know if is closed or encounters an error.
59 WaitToReadMore(); 64 WaitToReadMore();
60 } 65 }
61 66
62 Connector::~Connector() { 67 Connector::~Connector() {
63 DCHECK(thread_checker_.CalledOnValidThread()); 68 DCHECK(thread_checker_.CalledOnValidThread());
64 69
65 if (destroyed_flag_)
66 *destroyed_flag_ = true;
67
68 CancelWait(); 70 CancelWait();
69 } 71 }
70 72
71 void Connector::CloseMessagePipe() { 73 void Connector::CloseMessagePipe() {
72 DCHECK(thread_checker_.CalledOnValidThread()); 74 DCHECK(thread_checker_.CalledOnValidThread());
73 75
74 CancelWait(); 76 CancelWait();
75 MayAutoLock locker(lock_.get()); 77 MayAutoLock locker(lock_.get());
76 Close(std::move(message_pipe_)); 78 Close(std::move(message_pipe_));
77 } 79 }
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
184 CHECK(false) << "Race condition or other bug detected"; 186 CHECK(false) << "Race condition or other bug detected";
185 return false; 187 return false;
186 default: 188 default:
187 // This particular write was rejected, presumably because of bad input. 189 // This particular write was rejected, presumably because of bad input.
188 // The pipe is not necessarily in a bad state. 190 // The pipe is not necessarily in a bad state.
189 return false; 191 return false;
190 } 192 }
191 return true; 193 return true;
192 } 194 }
193 195
196 bool Connector::RegisterSyncHandleWatch() {
197 DCHECK(thread_checker_.CalledOnValidThread());
198
199 if (error_)
200 return false;
201
202 register_sync_handle_watch_count_++;
203
204 if (!registered_with_sync_handle_watcher_ && !paused_) {
205 registered_with_sync_handle_watcher_ =
206 SyncHandleWatcher::current()->RegisterHandle(
207 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
208 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
209 base::Unretained(this)));
210 }
211 return true;
212 }
213
214 void Connector::UnregisterSyncHandleWatch() {
215 DCHECK(thread_checker_.CalledOnValidThread());
216
217 if (register_sync_handle_watch_count_ == 0) {
218 NOTREACHED();
219 return;
220 }
221
222 register_sync_handle_watch_count_--;
223 if (register_sync_handle_watch_count_ > 0)
224 return;
225
226 if (registered_with_sync_handle_watcher_) {
227 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get());
228 registered_with_sync_handle_watcher_ = false;
229 }
230 }
231
232 bool Connector::RunSyncHandleWatch(const bool* should_stop) {
233 DCHECK(thread_checker_.CalledOnValidThread());
234 DCHECK_GT(register_sync_handle_watch_count_, 0u);
235
236 if (error_)
237 return false;
238
239 ResumeIncomingMethodCallProcessing();
240
241 return SyncHandleWatcher::current()->WatchAllHandles(message_pipe_.get(),
242 should_stop);
243 }
244
194 // static 245 // static
195 void Connector::CallOnHandleReady(void* closure, MojoResult result) { 246 void Connector::CallOnHandleReady(void* closure, MojoResult result) {
196 Connector* self = static_cast<Connector*>(closure); 247 Connector* self = static_cast<Connector*>(closure);
197 self->OnHandleReady(result); 248 CHECK(self->async_wait_id_ != 0);
249 self->async_wait_id_ = 0;
250 self->OnHandleReadyInternal(result);
198 } 251 }
199 252
200 void Connector::OnHandleReady(MojoResult result) { 253 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
254 base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr());
255
256 sync_handle_watcher_callback_count_++;
257 OnHandleReadyInternal(result);
258 // At this point, this object might have been deleted.
259 if (weak_self)
260 sync_handle_watcher_callback_count_--;
261 }
262
263 void Connector::OnHandleReadyInternal(MojoResult result) {
201 DCHECK(thread_checker_.CalledOnValidThread()); 264 DCHECK(thread_checker_.CalledOnValidThread());
202 265
203 CHECK(async_wait_id_ != 0);
204 async_wait_id_ = 0;
205 if (result != MOJO_RESULT_OK) { 266 if (result != MOJO_RESULT_OK) {
206 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 267 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
207 return; 268 return;
208 } 269 }
209 ReadAllAvailableMessages(); 270 ReadAllAvailableMessages();
210 // At this point, this object might have been deleted. Return. 271 // At this point, this object might have been deleted. Return.
211 } 272 }
212 273
213 void Connector::WaitToReadMore() { 274 void Connector::WaitToReadMore() {
214 CHECK(!async_wait_id_); 275 CHECK(!async_wait_id_);
215 CHECK(!paused_); 276 CHECK(!paused_);
216 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(), 277 async_wait_id_ = waiter_->AsyncWait(message_pipe_.get().value(),
217 MOJO_HANDLE_SIGNAL_READABLE, 278 MOJO_HANDLE_SIGNAL_READABLE,
218 MOJO_DEADLINE_INDEFINITE, 279 MOJO_DEADLINE_INDEFINITE,
219 &Connector::CallOnHandleReady, 280 &Connector::CallOnHandleReady,
220 this); 281 this);
282
283 if (register_sync_handle_watch_count_ > 0 &&
284 !registered_with_sync_handle_watcher_) {
285 registered_with_sync_handle_watcher_ =
286 SyncHandleWatcher::current()->RegisterHandle(
287 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
288 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
289 base::Unretained(this)));
290 }
221 } 291 }
222 292
223 bool Connector::ReadSingleMessage(MojoResult* read_result) { 293 bool Connector::ReadSingleMessage(MojoResult* read_result) {
224 CHECK(!paused_); 294 CHECK(!paused_);
225 295
226 bool receiver_result = false; 296 bool receiver_result = false;
227 297
228 // Detect if |this| was destroyed during message dispatch. Allow for the 298 // Detect if |this| was destroyed during message dispatch. Allow for the
229 // possibility of re-entering ReadMore() through message dispatch. 299 // possibility of re-entering ReadMore() through message dispatch.
230 bool was_destroyed_during_dispatch = false; 300 base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr();
231 bool* previous_destroyed_flag = destroyed_flag_;
232 destroyed_flag_ = &was_destroyed_during_dispatch;
233 301
234 Message message; 302 Message message;
235 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); 303 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
236 *read_result = rv; 304 *read_result = rv;
237 305
238 if (rv == MOJO_RESULT_OK) { 306 if (rv == MOJO_RESULT_OK) {
239 // Dispatching the message may spin in a nested message loop. To ensure we 307 // Dispatching the message may spin in a nested message loop. To ensure we
240 // continue dispatching messages when this happens start listening for 308 // continue dispatching messages when this happens start listening for
241 // messagse now. 309 // messagse now.
242 if (!async_wait_id_) { 310 if (!async_wait_id_) {
243 // TODO: Need to evaluate the perf impact of this. 311 // TODO: Need to evaluate the perf impact of this.
244 WaitToReadMore(); 312 WaitToReadMore();
245 } 313 }
246 receiver_result = 314 receiver_result =
247 incoming_receiver_ && incoming_receiver_->Accept(&message); 315 incoming_receiver_ && incoming_receiver_->Accept(&message);
248 } 316 }
249 317
250 if (was_destroyed_during_dispatch) { 318 if (!weak_self)
251 if (previous_destroyed_flag)
252 *previous_destroyed_flag = true; // Propagate flag.
253 return false; 319 return false;
254 }
255
256 destroyed_flag_ = previous_destroyed_flag;
257 320
258 if (rv == MOJO_RESULT_SHOULD_WAIT) 321 if (rv == MOJO_RESULT_SHOULD_WAIT)
259 return true; 322 return true;
260 323
261 if (rv != MOJO_RESULT_OK) { 324 if (rv != MOJO_RESULT_OK) {
262 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); 325 HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
263 return false; 326 return false;
264 } 327 }
265 328
266 if (enforce_errors_from_incoming_receiver_ && !receiver_result) { 329 if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
(...skipping 21 matching lines...) Expand all
288 // because of thread hops. In that case, there'll still be an async 351 // because of thread hops. In that case, there'll still be an async
289 // waiter. 352 // waiter.
290 if (!async_wait_id_) 353 if (!async_wait_id_)
291 WaitToReadMore(); 354 WaitToReadMore();
292 break; 355 break;
293 } 356 }
294 } 357 }
295 } 358 }
296 359
297 void Connector::CancelWait() { 360 void Connector::CancelWait() {
298 if (!async_wait_id_) 361 if (async_wait_id_) {
299 return; 362 waiter_->CancelWait(async_wait_id_);
363 async_wait_id_ = 0;
364 }
300 365
301 waiter_->CancelWait(async_wait_id_); 366 if (registered_with_sync_handle_watcher_) {
302 async_wait_id_ = 0; 367 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get());
368 registered_with_sync_handle_watcher_ = false;
369 }
303 } 370 }
304 371
305 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { 372 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
306 if (error_ || !message_pipe_.is_valid()) 373 if (error_ || !message_pipe_.is_valid())
307 return; 374 return;
308 375
309 if (!force_pipe_reset && force_async_handler) 376 if (!force_pipe_reset && force_async_handler)
310 force_pipe_reset = true; 377 force_pipe_reset = true;
311 378
312 if (paused_) { 379 if (paused_) {
(...skipping 19 matching lines...) Expand all
332 if (!paused_) 399 if (!paused_)
333 WaitToReadMore(); 400 WaitToReadMore();
334 } else { 401 } else {
335 error_ = true; 402 error_ = true;
336 connection_error_handler_.Run(); 403 connection_error_handler_.Run();
337 } 404 }
338 } 405 }
339 406
340 } // namespace internal 407 } // namespace internal
341 } // namespace mojo 408 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/connector.h ('k') | mojo/public/cpp/bindings/lib/interface_ptr_state.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698