Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(252)

Side by Side Diff: sdk/lib/async/future_impl.dart

Issue 1510053004: Simplify future-propagation for whenComplete. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Address comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** The onValue and onError handlers return either a value or a future */ 7 /** The onValue and onError handlers return either a value or a future */
8 typedef dynamic _FutureOnValue<T>(T value); 8 typedef dynamic _FutureOnValue<T>(T value);
9 /** Test used by [Future.catchError] to handle skip some errors. */ 9 /** Test used by [Future.catchError] to handle skip some errors. */
10 typedef bool _FutureErrorTest(var error); 10 typedef bool _FutureErrorTest(var error);
(...skipping 169 matching lines...) Expand 10 before | Expand all | Expand 10 after
180 180
181 /// Valid types for value: `T` or `Future<T>`. 181 /// Valid types for value: `T` or `Future<T>`.
182 _Future.immediate(value) { 182 _Future.immediate(value) {
183 _asyncComplete(value); 183 _asyncComplete(value);
184 } 184 }
185 185
186 _Future.immediateError(var error, [StackTrace stackTrace]) { 186 _Future.immediateError(var error, [StackTrace stackTrace]) {
187 _asyncCompleteError(error, stackTrace); 187 _asyncCompleteError(error, stackTrace);
188 } 188 }
189 189
190 bool get _mayComplete => _state == _INCOMPLETE; 190 bool get _mayComplete => _state == _INCOMPLETE;
191 bool get _isChained => _state == _CHAINED; 191 bool get _isPendingComplete => _state == _PENDING_COMPLETE;
192 bool get _isComplete => _state >= _VALUE; 192 bool get _isChained => _state == _CHAINED;
193 bool get _hasValue => _state == _VALUE; 193 bool get _isComplete => _state >= _VALUE;
194 bool get _hasError => _state == _ERROR; 194 bool get _hasValue => _state == _VALUE;
195 bool get _hasError => _state == _ERROR;
195 196
196 set _isChained(bool value) { 197 void _setChained() {
197 if (value) { 198 assert(!_isComplete);
198 assert(!_isComplete); 199 _state = _CHAINED;
199 _state = _CHAINED;
200 } else {
201 assert(_isChained);
202 _state = _INCOMPLETE;
203 }
204 } 200 }
205 201
206 Future then(f(T value), { Function onError }) { 202 Future then(f(T value), { Function onError }) {
207 Zone currentZone = Zone.current; 203 Zone currentZone = Zone.current;
208 if (!identical(currentZone, _ROOT_ZONE)) { 204 if (!identical(currentZone, _ROOT_ZONE)) {
209 f = currentZone.registerUnaryCallback(f); 205 f = currentZone.registerUnaryCallback(f);
210 if (onError != null) { 206 if (onError != null) {
211 onError = _registerErrorHandler(onError, currentZone); 207 onError = _registerErrorHandler(onError, currentZone);
212 } 208 }
213 } 209 }
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
297 while (current != null) { 293 while (current != null) {
298 _FutureListener next = current._nextListener; 294 _FutureListener next = current._nextListener;
299 current._nextListener = prev; 295 current._nextListener = prev;
300 prev = current; 296 prev = current;
301 current = next; 297 current = next;
302 } 298 }
303 return prev; 299 return prev;
304 } 300 }
305 301
306 // Take the value (when completed) of source and complete target with that 302 // Take the value (when completed) of source and complete target with that
307 // value (or error). This function can chain all Futures, but is slower 303 // value (or error). This function could chain all Futures, but is slower
308 // for _Future than _chainCoreFuture - Use _chainCoreFuture in that case. 304 // for _Future than _chainCoreFuture, so you must use _chainCoreFuture
305 // in that case.
309 static void _chainForeignFuture(Future source, _Future target) { 306 static void _chainForeignFuture(Future source, _Future target) {
310 assert(!target._isComplete); 307 assert(!target._isComplete);
311 assert(source is! _Future); 308 assert(source is! _Future);
312 309
313 // Mark the target as chained (and as such half-completed). 310 // Mark the target as chained (and as such half-completed).
314 target._isChained = true; 311 target._setChained();
315 try { 312 try {
316 source.then((value) { 313 source.then((value) {
317 assert(target._isChained); 314 assert(target._isChained);
318 target._completeWithValue(value); 315 target._completeWithValue(value);
319 }, 316 },
320 // TODO(floitsch): eventually we would like to make this non-optional 317 // TODO(floitsch): eventually we would like to make this non-optional
321 // and dependent on the listeners of the target future. If none of 318 // and dependent on the listeners of the target future. If none of
322 // the target future's listeners want to have the stack trace we don't 319 // the target future's listeners want to have the stack trace we don't
323 // need a trace. 320 // need a trace.
324 onError: (error, [stackTrace]) { 321 onError: (error, [stackTrace]) {
(...skipping 11 matching lines...) Expand all
336 } 333 }
337 } 334 }
338 335
339 // Take the value (when completed) of source and complete target with that 336 // Take the value (when completed) of source and complete target with that
340 // value (or error). This function expects that source is a _Future. 337 // value (or error). This function expects that source is a _Future.
341 static void _chainCoreFuture(_Future source, _Future target) { 338 static void _chainCoreFuture(_Future source, _Future target) {
342 assert(!target._isComplete); 339 assert(!target._isComplete);
343 assert(source is _Future); 340 assert(source is _Future);
344 341
345 // Mark the target as chained (and as such half-completed). 342 // Mark the target as chained (and as such half-completed).
346 target._isChained = true; 343 target._setChained();
347 _FutureListener listener = new _FutureListener.chain(target); 344 _FutureListener listener = new _FutureListener.chain(target);
348 if (source._isComplete) { 345 if (source._isComplete) {
349 _propagateToListeners(source, listener); 346 _propagateToListeners(source, listener);
350 } else { 347 } else {
351 source._addListener(listener); 348 source._addListener(listener);
352 } 349 }
353 } 350 }
354 351
355 void _complete(value) { 352 void _complete(value) {
356 assert(!_isComplete); 353 assert(!_isComplete);
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
397 // unhandled error, even though we know we are already going to listen to 394 // unhandled error, even though we know we are already going to listen to
398 // it. 395 // it.
399 396
400 if (value == null) { 397 if (value == null) {
401 // No checks for `null`. 398 // No checks for `null`.
402 } else if (value is Future) { 399 } else if (value is Future) {
403 // Assign to typed variables so we get earlier checks in checked mode. 400 // Assign to typed variables so we get earlier checks in checked mode.
404 Future<T> typedFuture = value; 401 Future<T> typedFuture = value;
405 if (typedFuture is _Future) { 402 if (typedFuture is _Future) {
406 _Future<T> coreFuture = typedFuture; 403 _Future<T> coreFuture = typedFuture;
407 if (coreFuture._isComplete && coreFuture._hasError) { 404 if (coreFuture._hasError) {
408 // Case 1 from above. Delay completion to enable the user to register 405 // Case 1 from above. Delay completion to enable the user to register
409 // callbacks. 406 // callbacks.
410 _markPendingCompletion(); 407 _markPendingCompletion();
411 _zone.scheduleMicrotask(() { 408 _zone.scheduleMicrotask(() {
412 _chainCoreFuture(coreFuture, this); 409 _chainCoreFuture(coreFuture, this);
413 }); 410 });
414 } else { 411 } else {
415 _chainCoreFuture(coreFuture, this); 412 _chainCoreFuture(coreFuture, this);
416 } 413 }
417 } else { 414 } else {
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
460 // call handle them separately in recursive calls, continuing 457 // call handle them separately in recursive calls, continuing
461 // here only when there is only one listener left. 458 // here only when there is only one listener left.
462 while (listeners._nextListener != null) { 459 while (listeners._nextListener != null) {
463 _FutureListener listener = listeners; 460 _FutureListener listener = listeners;
464 listeners = listener._nextListener; 461 listeners = listener._nextListener;
465 listener._nextListener = null; 462 listener._nextListener = null;
466 _propagateToListeners(source, listener); 463 _propagateToListeners(source, listener);
467 } 464 }
468 _FutureListener listener = listeners; 465 _FutureListener listener = listeners;
469 // Do the actual propagation. 466 // Do the actual propagation.
470 // Set initial state of listenerHasValue and listenerValueOrError. These 467 // Set initial state of listenerHasError and listenerValueOrError. These
471 // variables are updated, with the outcome of potential callbacks. 468 // variables are updated with the outcome of potential callbacks.
472 bool listenerHasValue = true; 469 bool listenerHasError = hasError;
473 final sourceValue = hasError ? null : source._value; 470 final sourceResult = source._resultOrListeners;
474 var listenerValueOrError = sourceValue; 471 var listenerValueOrError = sourceResult;
475 // Set to true if a whenComplete needs to wait for a future. 472
476 // The whenComplete action will resume the propagation by itself.
477 bool isPropagationAborted = false;
478 // TODO(floitsch): mark the listener as pending completion. Currently
479 // we can't do this, since the markPendingCompletion verifies that
480 // the future is not already marked (or chained).
481 // Only if we either have an error or callbacks, go into this, somewhat 473 // Only if we either have an error or callbacks, go into this, somewhat
482 // expensive, branch. Here we'll enter/leave the zone. Many futures 474 // expensive, branch. Here we'll enter/leave the zone. Many futures
483 // doesn't have callbacks, so this is a significant optimization. 475 // don't have callbacks, so this is a significant optimization.
484 if (hasError || (listener.handlesValue || listener.handlesComplete)) { 476 if (hasError || listener.handlesValue || listener.handlesComplete) {
485 Zone zone = listener._zone; 477 Zone zone = listener._zone;
486 if (hasError && !source._zone.inSameErrorZone(zone)) { 478 if (hasError && !source._zone.inSameErrorZone(zone)) {
487 // Don't cross zone boundaries with errors. 479 // Don't cross zone boundaries with errors.
488 AsyncError asyncError = source._error; 480 AsyncError asyncError = source._error;
489 source._zone.handleUncaughtError( 481 source._zone.handleUncaughtError(
490 asyncError.error, asyncError.stackTrace); 482 asyncError.error, asyncError.stackTrace);
491 return; 483 return;
492 } 484 }
493 485
494 Zone oldZone; 486 Zone oldZone;
495 if (!identical(Zone.current, zone)) { 487 if (!identical(Zone.current, zone)) {
496 // Change zone if it's not current. 488 // Change zone if it's not current.
497 oldZone = Zone._enter(zone); 489 oldZone = Zone._enter(zone);
498 } 490 }
499 491
500 bool handleValueCallback() { 492 void handleValueCallback() {
493 assert(!hasError);
501 try { 494 try {
502 listenerValueOrError = zone.runUnary(listener._onValue, 495 listenerValueOrError = zone.runUnary(listener._onValue,
503 sourceValue); 496 sourceResult);
504 return true; 497 listenerHasError = false;
505 } catch (e, s) { 498 } catch (e, s) {
506 listenerValueOrError = new AsyncError(e, s); 499 listenerValueOrError = new AsyncError(e, s);
507 return false; 500 listenerHasError = true;
508 } 501 }
509 } 502 }
510 503
511 void handleError() { 504 void handleError() {
512 AsyncError asyncError = source._error; 505 AsyncError asyncError = source._error;
513 bool matchesTest = true; 506 bool matchesTest = true;
514 if (listener.hasErrorTest) { 507 if (listener.hasErrorTest) {
515 _FutureErrorTest test = listener._errorTest; 508 _FutureErrorTest test = listener._errorTest;
516 try { 509 try {
517 matchesTest = zone.runUnary(test, asyncError.error); 510 matchesTest = zone.runUnary(test, asyncError.error);
518 } catch (e, s) { 511 } catch (e, s) {
519 listenerValueOrError = identical(asyncError.error, e) ? 512 listenerValueOrError = identical(asyncError.error, e)
520 asyncError : new AsyncError(e, s); 513 ? asyncError
521 listenerHasValue = false; 514 : new AsyncError(e, s);
515 listenerHasError = true;
522 return; 516 return;
523 } 517 }
524 } 518 }
525 Function errorCallback = listener._onError; 519 Function errorCallback = listener._onError;
526 if (matchesTest && errorCallback != null) { 520 if (matchesTest && errorCallback != null) {
527 try { 521 try {
528 if (errorCallback is ZoneBinaryCallback) { 522 if (errorCallback is ZoneBinaryCallback) {
529 listenerValueOrError = zone.runBinary(errorCallback, 523 listenerValueOrError = zone.runBinary(errorCallback,
530 asyncError.error, 524 asyncError.error,
531 asyncError.stackTrace); 525 asyncError.stackTrace);
532 } else { 526 } else {
533 listenerValueOrError = zone.runUnary(errorCallback, 527 listenerValueOrError = zone.runUnary(errorCallback,
534 asyncError.error); 528 asyncError.error);
535 } 529 }
530 listenerHasError = false;
536 } catch (e, s) { 531 } catch (e, s) {
537 listenerValueOrError = identical(asyncError.error, e) ? 532 listenerValueOrError = identical(asyncError.error, e)
538 asyncError : new AsyncError(e, s); 533 ? asyncError
539 listenerHasValue = false; 534 : new AsyncError(e, s);
540 return; 535 listenerHasError = true;
541 } 536 }
542 listenerHasValue = true;
543 } else {
544 // Copy over the error from the source.
545 listenerValueOrError = asyncError;
546 listenerHasValue = false;
547 } 537 }
548 } 538 }
549 539
550 void handleWhenCompleteCallback() { 540 void handleWhenCompleteCallback() {
541 assert(!listener.handlesError);
551 var completeResult; 542 var completeResult;
552 try { 543 try {
553 completeResult = zone.run(listener._whenCompleteAction); 544 completeResult = zone.run(listener._whenCompleteAction);
554 } catch (e, s) { 545 } catch (e, s) {
555 if (hasError && identical(source._error.error, e)) { 546 if (hasError && identical(source._error.error, e)) {
556 listenerValueOrError = source._error; 547 listenerValueOrError = source._error;
557 } else { 548 } else {
558 listenerValueOrError = new AsyncError(e, s); 549 listenerValueOrError = new AsyncError(e, s);
559 } 550 }
560 listenerHasValue = false; 551 listenerHasError = true;
561 return; 552 return;
562 } 553 }
563 if (completeResult is Future) { 554 if (completeResult is Future) {
564 _Future result = listener.result; 555 if (completeResult is _Future && completeResult._isComplete) {
565 result._isChained = true; 556 if (completeResult._hasError) {
566 isPropagationAborted = true; 557 listenerValueOrError = completeResult._error;
567 completeResult.then((ignored) { 558 listenerHasError = true;
568 _propagateToListeners(source, new _FutureListener.chain(result));
569 }, onError: (error, [stackTrace]) {
570 // When there is an error, we have to make the error the new
571 // result of the current listener.
572 if (completeResult is! _Future) {
573 // This should be a rare case.
574 completeResult = new _Future();
575 completeResult._setError(error, stackTrace);
576 } 559 }
577 _propagateToListeners(completeResult, 560 // Otherwise use the existing result of source.
578 new _FutureListener.chain(result)); 561 return;
579 }); 562 }
563 // We have to wait for the completeResult future to complete
564 // before knowing if it's an error or we should use the result
565 // of source.
566 var originalSource = source;
567 listenerValueOrError = completeResult.then((_) => originalSource);
568 listenerHasError = false;
580 } 569 }
581 } 570 }
582 571
583 if (!hasError) { 572 if (listener.handlesComplete) {
573 // The whenComplete-handler is not combined with normal value/error
574 // handling. This means at most one handleX method is called per
575 // listener.
576 assert(!listener.handlesValue);
577 assert(!listener.handlesError);
578 handleWhenCompleteCallback();
579 } else if (!hasError) {
584 if (listener.handlesValue) { 580 if (listener.handlesValue) {
585 listenerHasValue = handleValueCallback(); 581 handleValueCallback();
586 } 582 }
587 } else { 583 } else {
588 handleError(); 584 if (listener.handlesError) {
585 handleError();
586 }
589 } 587 }
590 if (listener.handlesComplete) { 588
591 handleWhenCompleteCallback();
592 }
593 // If we changed zone, oldZone will not be null. 589 // If we changed zone, oldZone will not be null.
594 if (oldZone != null) Zone._leave(oldZone); 590 if (oldZone != null) Zone._leave(oldZone);
595 591
596 if (isPropagationAborted) return;
597 // If the listener's value is a future we need to chain it. Note that 592 // If the listener's value is a future we need to chain it. Note that
598 // this can only happen if there is a callback. Since 'is' checks 593 // this can only happen if there is a callback.
599 // can be expensive, we're trying to avoid it. 594 if (listenerValueOrError is Future) {
600 if (listenerHasValue &&
601 !identical(sourceValue, listenerValueOrError) &&
602 listenerValueOrError is Future) {
603 Future chainSource = listenerValueOrError; 595 Future chainSource = listenerValueOrError;
604 // Shortcut if the chain-source is already completed. Just continue 596 // Shortcut if the chain-source is already completed. Just continue
605 // the loop. 597 // the loop.
606 _Future result = listener.result; 598 _Future result = listener.result;
607 if (chainSource is _Future) { 599 if (chainSource is _Future) {
608 if (chainSource._isComplete) { 600 if (chainSource._isComplete) {
609 // propagate the value (simulating a tail call). 601 // propagate the value (simulating a tail call).
610 result._isChained = true; 602 result._setChained();
611 source = chainSource; 603 source = chainSource;
612 listeners = new _FutureListener.chain(result); 604 listeners = new _FutureListener.chain(result);
613 continue; 605 continue;
614 } else { 606 } else {
615 _chainCoreFuture(chainSource, result); 607 _chainCoreFuture(chainSource, result);
616 } 608 }
617 } else { 609 } else {
618 _chainForeignFuture(chainSource, result); 610 _chainForeignFuture(chainSource, result);
619 } 611 }
620 return; 612 return;
621 } 613 }
622 } 614 }
623 _Future result = listener.result; 615 _Future result = listener.result;
624 listeners = result._removeListeners(); 616 listeners = result._removeListeners();
625 if (listenerHasValue) { 617 if (!listenerHasError) {
626 result._setValue(listenerValueOrError); 618 result._setValue(listenerValueOrError);
627 } else { 619 } else {
628 AsyncError asyncError = listenerValueOrError; 620 AsyncError asyncError = listenerValueOrError;
629 result._setErrorObject(asyncError); 621 result._setErrorObject(asyncError);
630 } 622 }
631 // Prepare for next round. 623 // Prepare for next round.
632 source = result; 624 source = result;
633 } 625 }
634 } 626 }
635 627
(...skipping 24 matching lines...) Expand all
660 } 652 }
661 }, onError: (e, s) { 653 }, onError: (e, s) {
662 if (timer.isActive) { 654 if (timer.isActive) {
663 timer.cancel(); 655 timer.cancel();
664 result._completeError(e, s); 656 result._completeError(e, s);
665 } 657 }
666 }); 658 });
667 return result; 659 return result;
668 } 660 }
669 } 661 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698