Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(108)

Side by Side Diff: mojo/public/cpp/bindings/lib/connector.cc

Issue 1768443004: Revert of [mojo-bindings] Use Watch API instead of MessagePumpMojo (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/bindings/lib/connector.h ('k') | mojo/public/cpp/system/BUILD.gn » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/connector.h" 5 #include "mojo/public/cpp/bindings/lib/connector.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 #include <utility> 8 #include <utility>
9 9
10 #include "base/bind.h" 10 #include "base/bind.h"
11 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "base/macros.h" 12 #include "base/macros.h"
13 #include "base/synchronization/lock.h" 13 #include "base/synchronization/lock.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h" 14 #include "mojo/public/cpp/bindings/lib/sync_handle_watcher.h"
16 15
17 namespace mojo { 16 namespace mojo {
18 namespace internal { 17 namespace internal {
19 18
20 namespace { 19 namespace {
21 20
22 // Similar to base::AutoLock, except that it does nothing if |lock| passed into 21 // Similar to base::AutoLock, except that it does nothing if |lock| passed into
23 // the constructor is null. 22 // the constructor is null.
24 class MayAutoLock { 23 class MayAutoLock {
(...skipping 216 matching lines...) Expand 10 before | Expand all | Expand 10 after
241 240
242 // This object may be destroyed during the WatchAllHandles() call. So we have 241 // This object may be destroyed during the WatchAllHandles() call. So we have
243 // to preserve the boolean that WatchAllHandles uses. 242 // to preserve the boolean that WatchAllHandles uses.
244 scoped_refptr<base::RefCountedData<bool>> preserver = 243 scoped_refptr<base::RefCountedData<bool>> preserver =
245 should_stop_sync_handle_watch_; 244 should_stop_sync_handle_watch_;
246 const bool* should_stop_array[] = {should_stop, 245 const bool* should_stop_array[] = {should_stop,
247 &should_stop_sync_handle_watch_->data}; 246 &should_stop_sync_handle_watch_->data};
248 return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2); 247 return SyncHandleWatcher::current()->WatchAllHandles(should_stop_array, 2);
249 } 248 }
250 249
251 void Connector::OnWatcherHandleReady(MojoResult result) { 250 void Connector::OnHandleWatcherHandleReady(MojoResult result) {
252 OnHandleReadyInternal(result); 251 OnHandleReadyInternal(result);
253 } 252 }
254 253
255 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { 254 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
256 base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr()); 255 base::WeakPtr<Connector> weak_self(weak_factory_.GetWeakPtr());
257 256
258 sync_handle_watcher_callback_count_++; 257 sync_handle_watcher_callback_count_++;
259 OnHandleReadyInternal(result); 258 OnHandleReadyInternal(result);
260 // At this point, this object might have been deleted. 259 // At this point, this object might have been deleted.
261 if (weak_self) 260 if (weak_self)
262 sync_handle_watcher_callback_count_--; 261 sync_handle_watcher_callback_count_--;
263 } 262 }
264 263
265 void Connector::OnHandleReadyInternal(MojoResult result) { 264 void Connector::OnHandleReadyInternal(MojoResult result) {
266 DCHECK(thread_checker_.CalledOnValidThread()); 265 DCHECK(thread_checker_.CalledOnValidThread());
267 266
268 if (result != MOJO_RESULT_OK) { 267 if (result != MOJO_RESULT_OK) {
269 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); 268 HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
270 return; 269 return;
271 } 270 }
272 ReadAllAvailableMessages(); 271 ReadAllAvailableMessages();
273 // At this point, this object might have been deleted. Return. 272 // At this point, this object might have been deleted. Return.
274 } 273 }
275 274
276 void Connector::WaitToReadMore() { 275 void Connector::WaitToReadMore() {
276 CHECK(!handle_watcher_.is_watching());
277 CHECK(!paused_); 277 CHECK(!paused_);
278 DCHECK(!handle_watcher_.IsWatching()); 278 handle_watcher_.Start(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
279 279 MOJO_DEADLINE_INDEFINITE,
280 MojoResult rv = handle_watcher_.Start( 280 base::Bind(&Connector::OnHandleWatcherHandleReady,
281 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 281 base::Unretained(this)));
282 base::Bind(&Connector::OnWatcherHandleReady,
283 base::Unretained(this)));
284
285 if (rv != MOJO_RESULT_OK) {
286 // If the watch failed because the handle is invalid or its conditions can
287 // no longer be met, we signal the error asynchronously to avoid reentry.
288 base::ThreadTaskRunnerHandle::Get()->PostTask(
289 FROM_HERE, base::Bind(&Connector::OnWatcherHandleReady,
290 weak_factory_.GetWeakPtr(), rv));
291 }
292 282
293 if (register_sync_handle_watch_count_ > 0 && 283 if (register_sync_handle_watch_count_ > 0 &&
294 !registered_with_sync_handle_watcher_) { 284 !registered_with_sync_handle_watcher_) {
295 registered_with_sync_handle_watcher_ = 285 registered_with_sync_handle_watcher_ =
296 SyncHandleWatcher::current()->RegisterHandle( 286 SyncHandleWatcher::current()->RegisterHandle(
297 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, 287 message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
298 base::Bind(&Connector::OnSyncHandleWatcherHandleReady, 288 base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
299 base::Unretained(this))); 289 base::Unretained(this)));
300 } 290 }
301 } 291 }
302 292
303 bool Connector::ReadSingleMessage(MojoResult* read_result) { 293 bool Connector::ReadSingleMessage(MojoResult* read_result) {
304 CHECK(!paused_); 294 CHECK(!paused_);
305 295
306 bool receiver_result = false; 296 bool receiver_result = false;
307 297
308 // Detect if |this| was destroyed during message dispatch. Allow for the 298 // Detect if |this| was destroyed during message dispatch. Allow for the
309 // possibility of re-entering ReadMore() through message dispatch. 299 // possibility of re-entering ReadMore() through message dispatch.
310 base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr(); 300 base::WeakPtr<Connector> weak_self = weak_factory_.GetWeakPtr();
311 301
312 Message message; 302 Message message;
313 const MojoResult rv = ReadMessage(message_pipe_.get(), &message); 303 const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
314 *read_result = rv; 304 *read_result = rv;
315 305
316 if (rv == MOJO_RESULT_OK) { 306 if (rv == MOJO_RESULT_OK) {
307 // Dispatching the message may spin in a nested message loop. To ensure we
308 // continue dispatching messages when this happens start listening for
309 // messagse now.
310 if (!handle_watcher_.is_watching()) {
311 // TODO: Need to evaluate the perf impact of this.
312 WaitToReadMore();
313 }
317 receiver_result = 314 receiver_result =
318 incoming_receiver_ && incoming_receiver_->Accept(&message); 315 incoming_receiver_ && incoming_receiver_->Accept(&message);
319 } 316 }
320 317
321 if (!weak_self) 318 if (!weak_self)
322 return false; 319 return false;
323 320
324 if (rv == MOJO_RESULT_SHOULD_WAIT) 321 if (rv == MOJO_RESULT_SHOULD_WAIT)
325 return true; 322 return true;
326 323
(...skipping 13 matching lines...) Expand all
340 while (!error_) { 337 while (!error_) {
341 MojoResult rv; 338 MojoResult rv;
342 339
343 // Return immediately if |this| was destroyed. Do not touch any members! 340 // Return immediately if |this| was destroyed. Do not touch any members!
344 if (!ReadSingleMessage(&rv)) 341 if (!ReadSingleMessage(&rv))
345 return; 342 return;
346 343
347 if (paused_) 344 if (paused_)
348 return; 345 return;
349 346
350 if (rv == MOJO_RESULT_SHOULD_WAIT) 347 if (rv == MOJO_RESULT_SHOULD_WAIT) {
348 // ReadSingleMessage could end up calling HandleError which resets
349 // message_pipe_ to a dummy one that is closed. The old EDK will see the
350 // that the peer is closed immediately, while the new one is asynchronous
351 // because of thread hops. In that case, there'll still be an async
352 // waiter.
353 if (!handle_watcher_.is_watching())
354 WaitToReadMore();
351 break; 355 break;
356 }
352 } 357 }
353 } 358 }
354 359
355 void Connector::CancelWait() { 360 void Connector::CancelWait() {
356 handle_watcher_.Cancel(); 361 handle_watcher_.Stop();
357 362
358 if (registered_with_sync_handle_watcher_) { 363 if (registered_with_sync_handle_watcher_) {
359 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get()); 364 SyncHandleWatcher::current()->UnregisterHandle(message_pipe_.get());
360 registered_with_sync_handle_watcher_ = false; 365 registered_with_sync_handle_watcher_ = false;
361 } 366 }
362 367
363 if (should_stop_sync_handle_watch_) 368 if (should_stop_sync_handle_watch_)
364 should_stop_sync_handle_watch_->data = true; 369 should_stop_sync_handle_watch_->data = true;
365 } 370 }
366 371
(...skipping 15 matching lines...) Expand all
382 CancelWait(); 387 CancelWait();
383 MayAutoLock locker(lock_.get()); 388 MayAutoLock locker(lock_.get());
384 Close(std::move(message_pipe_)); 389 Close(std::move(message_pipe_));
385 MessagePipe dummy_pipe; 390 MessagePipe dummy_pipe;
386 message_pipe_ = std::move(dummy_pipe.handle0); 391 message_pipe_ = std::move(dummy_pipe.handle0);
387 } else { 392 } else {
388 CancelWait(); 393 CancelWait();
389 } 394 }
390 395
391 if (force_async_handler) { 396 if (force_async_handler) {
397 // |dummy_pipe.handle1| has been destructed. Reading the pipe will
398 // eventually cause a read error on |message_pipe_| and set error state.
392 if (!paused_) 399 if (!paused_)
393 WaitToReadMore(); 400 WaitToReadMore();
394 } else { 401 } else {
395 error_ = true; 402 error_ = true;
396 connection_error_handler_.Run(); 403 connection_error_handler_.Run();
397 } 404 }
398 } 405 }
399 406
400 } // namespace internal 407 } // namespace internal
401 } // namespace mojo 408 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/connector.h ('k') | mojo/public/cpp/system/BUILD.gn » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698