OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** The onValue and onError handlers return either a value or a future */ | 7 /** The onValue and onError handlers return either a value or a future */ |
8 typedef FutureOr<T> _FutureOnValue<S, T>(S value); | 8 typedef FutureOr<T> _FutureOnValue<S, T>(S value); |
9 /** Test used by [Future.catchError] to handle skip some errors. */ | 9 /** Test used by [Future.catchError] to handle skip some errors. */ |
10 typedef bool _FutureErrorTest(var error); | 10 typedef bool _FutureErrorTest(Object error); |
11 /** Used by [WhenFuture]. */ | 11 /** Used by [WhenFuture]. */ |
12 typedef _FutureAction(); | 12 typedef dynamic _FutureAction(); |
13 | 13 |
14 abstract class _Completer<T> implements Completer<T> { | 14 abstract class _Completer<T> implements Completer<T> { |
15 final _Future<T> future = new _Future<T>(); | 15 final _Future<T> future = new _Future<T>(); |
16 | 16 |
17 void complete([FutureOr<T> value]); | 17 void complete([FutureOr<T> value]); |
18 | 18 |
19 void completeError(Object error, [StackTrace stackTrace]) { | 19 void completeError(Object error, [StackTrace stackTrace]) { |
20 error = _nonNullError(error); | 20 error = _nonNullError(error); |
21 if (!future._mayComplete) throw new StateError("Future already completed"); | 21 if (!future._mayComplete) throw new StateError("Future already completed"); |
22 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | 22 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
56 } | 56 } |
57 } | 57 } |
58 | 58 |
59 class _FutureListener<S, T> { | 59 class _FutureListener<S, T> { |
60 static const int MASK_VALUE = 1; | 60 static const int MASK_VALUE = 1; |
61 static const int MASK_ERROR = 2; | 61 static const int MASK_ERROR = 2; |
62 static const int MASK_TEST_ERROR = 4; | 62 static const int MASK_TEST_ERROR = 4; |
63 static const int MASK_WHENCOMPLETE = 8; | 63 static const int MASK_WHENCOMPLETE = 8; |
64 static const int STATE_CHAIN = 0; | 64 static const int STATE_CHAIN = 0; |
65 static const int STATE_THEN = MASK_VALUE; | 65 static const int STATE_THEN = MASK_VALUE; |
66 // TODO(johnmccutchan): Remove the hard coded value. See #26030. | 66 static const int STATE_THEN_ONERROR = MASK_VALUE | MASK_ERROR; |
67 static const int STATE_THEN_ONERROR = 3; // MASK_VALUE | MASK_ERROR. | |
68 static const int STATE_CATCHERROR = MASK_ERROR; | 67 static const int STATE_CATCHERROR = MASK_ERROR; |
69 // TODO(johnmccutchan): Remove the hard coded value. See #26030. | 68 static const int STATE_CATCHERROR_TEST = MASK_ERROR | MASK_TEST_ERROR; |
70 static const int STATE_CATCHERROR_TEST = 6; // MASK_ERROR | MASK_TEST_ERROR. | |
71 static const int STATE_WHENCOMPLETE = MASK_WHENCOMPLETE; | 69 static const int STATE_WHENCOMPLETE = MASK_WHENCOMPLETE; |
72 // Listeners on the same future are linked through this link. | 70 // Listeners on the same future are linked through this link. |
73 _FutureListener _nextListener = null; | 71 _FutureListener _nextListener = null; |
74 // The future to complete when this listener is activated. | 72 // The future to complete when this listener is activated. |
75 final _Future<T> result; | 73 final _Future<T> result; |
76 // Which fields means what. | 74 // Which fields means what. |
77 final int state; | 75 final int state; |
78 // Used for then/whenDone callback and error test | 76 // Used for then/whenDone callback and error test |
79 final Function callback; | 77 final Function callback; |
80 // Used for error callbacks. | 78 // Used for error callbacks. |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
126 assert(handlesError); | 124 assert(handlesError); |
127 return _onError != null; | 125 return _onError != null; |
128 } | 126 } |
129 | 127 |
130 FutureOr<T> handleValue(S sourceResult) { | 128 FutureOr<T> handleValue(S sourceResult) { |
131 return _zone.runUnary<FutureOr<T>, S>(_onValue, sourceResult); | 129 return _zone.runUnary<FutureOr<T>, S>(_onValue, sourceResult); |
132 } | 130 } |
133 | 131 |
134 bool matchesErrorTest(AsyncError asyncError) { | 132 bool matchesErrorTest(AsyncError asyncError) { |
135 if (!hasErrorTest) return true; | 133 if (!hasErrorTest) return true; |
136 _FutureErrorTest test = _errorTest; | 134 return _zone.runUnary<bool, Object>(_errorTest, asyncError.error); |
137 return _zone.runUnary<bool, dynamic>(_errorTest, asyncError.error); | |
138 } | 135 } |
139 | 136 |
140 FutureOr<T> handleError(AsyncError asyncError) { | 137 FutureOr<T> handleError(AsyncError asyncError) { |
141 assert(handlesError && hasErrorCallback); | 138 assert(handlesError && hasErrorCallback); |
142 if (errorCallback is ZoneBinaryCallback) { | 139 if (errorCallback is ZoneBinaryCallback) { |
143 var typedErrorCallback = errorCallback as Object | 140 var typedErrorCallback = errorCallback as Object |
144 /*=ZoneBinaryCallback<FutureOr<T>, Object, StackTrace>*/; | 141 /*=ZoneBinaryCallback<FutureOr<T>, Object, StackTrace>*/; |
145 return _zone.runBinary( | 142 return _zone.runBinary( |
146 typedErrorCallback, asyncError.error, asyncError.stackTrace); | 143 typedErrorCallback, asyncError.error, asyncError.stackTrace); |
147 } else { | 144 } else { |
148 return _zone.runUnary<FutureOr<T>, dynamic>( | 145 return _zone.runUnary<FutureOr<T>, Object>( |
149 errorCallback, asyncError.error); | 146 errorCallback, asyncError.error); |
150 } | 147 } |
151 } | 148 } |
152 | 149 |
153 dynamic handleWhenComplete() { | 150 dynamic handleWhenComplete() { |
154 assert(!handlesError); | 151 assert(!handlesError); |
155 return _zone.run(_whenCompleteAction); | 152 return _zone.run(_whenCompleteAction); |
156 } | 153 } |
157 } | 154 } |
158 | 155 |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
227 bool get _hasError => _state == _ERROR; | 224 bool get _hasError => _state == _ERROR; |
228 | 225 |
229 void _setChained(_Future source) { | 226 void _setChained(_Future source) { |
230 assert(_mayAddListener); | 227 assert(_mayAddListener); |
231 _state = _CHAINED; | 228 _state = _CHAINED; |
232 _resultOrListeners = source; | 229 _resultOrListeners = source; |
233 } | 230 } |
234 | 231 |
235 Future<E> then<E>(FutureOr<E> f(T value), {Function onError}) { | 232 Future<E> then<E>(FutureOr<E> f(T value), {Function onError}) { |
236 Zone currentZone = Zone.current; | 233 Zone currentZone = Zone.current; |
237 ZoneUnaryCallback registered; | |
238 if (!identical(currentZone, _ROOT_ZONE)) { | 234 if (!identical(currentZone, _ROOT_ZONE)) { |
239 f = currentZone.registerUnaryCallback<FutureOr<E>, T>(f); | 235 f = currentZone.registerUnaryCallback<FutureOr<E>, T>(f); |
240 if (onError != null) { | 236 if (onError != null) { |
241 onError = _registerErrorHandler<E>(onError, currentZone); | 237 onError = _registerErrorHandler<E>(onError, currentZone); |
242 } | 238 } |
243 } | 239 } |
244 return _thenNoZoneRegistration<E>(f, onError); | 240 return _thenNoZoneRegistration<E>(f, onError); |
245 } | 241 } |
246 | 242 |
247 // This method is used by async/await. | 243 // This method is used by async/await. |
248 Future<E> _thenNoZoneRegistration<E>(f(T value), Function onError) { | 244 Future<E> _thenNoZoneRegistration<E>( |
| 245 FutureOr<E> f(T value), Function onError) { |
249 _Future<E> result = new _Future<E>(); | 246 _Future<E> result = new _Future<E>(); |
250 _addListener(new _FutureListener<T, E>.then(result, f, onError)); | 247 _addListener(new _FutureListener<T, E>.then(result, f, onError)); |
251 return result; | 248 return result; |
252 } | 249 } |
253 | 250 |
254 Future<T> catchError(Function onError, {bool test(error)}) { | 251 Future<T> catchError(Function onError, {bool test(error)}) { |
255 _Future<T> result = new _Future<T>(); | 252 _Future<T> result = new _Future<T>(); |
256 if (!identical(result._zone, _ROOT_ZONE)) { | 253 if (!identical(result._zone, _ROOT_ZONE)) { |
257 onError = _registerErrorHandler<T>(onError, result._zone); | 254 onError = _registerErrorHandler<T>(onError, result._zone); |
258 if (test != null) test = result._zone.registerUnaryCallback(test); | 255 if (test != null) test = result._zone.registerUnaryCallback(test); |
259 } | 256 } |
260 _addListener(new _FutureListener<T, T>.catchError(result, onError, test)); | 257 _addListener(new _FutureListener<T, T>.catchError(result, onError, test)); |
261 return result; | 258 return result; |
262 } | 259 } |
263 | 260 |
264 Future<T> whenComplete(action()) { | 261 Future<T> whenComplete(dynamic action()) { |
265 _Future<T> result = new _Future<T>(); | 262 _Future<T> result = new _Future<T>(); |
266 if (!identical(result._zone, _ROOT_ZONE)) { | 263 if (!identical(result._zone, _ROOT_ZONE)) { |
267 action = result._zone.registerCallback<dynamic>(action); | 264 action = result._zone.registerCallback<dynamic>(action); |
268 } | 265 } |
269 _addListener(new _FutureListener<T, T>.whenComplete(result, action)); | 266 _addListener(new _FutureListener<T, T>.whenComplete(result, action)); |
270 return result; | 267 return result; |
271 } | 268 } |
272 | 269 |
273 Stream<T> asStream() => new Stream<T>.fromFuture(this); | 270 Stream<T> asStream() => new Stream<T>.fromFuture(this); |
274 | 271 |
(...skipping 171 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
446 _FutureListener listeners = target._removeListeners(); | 443 _FutureListener listeners = target._removeListeners(); |
447 target._cloneResult(source); | 444 target._cloneResult(source); |
448 _propagateToListeners(target, listeners); | 445 _propagateToListeners(target, listeners); |
449 } else { | 446 } else { |
450 _FutureListener listeners = target._resultOrListeners; | 447 _FutureListener listeners = target._resultOrListeners; |
451 target._setChained(source); | 448 target._setChained(source); |
452 source._prependListeners(listeners); | 449 source._prependListeners(listeners); |
453 } | 450 } |
454 } | 451 } |
455 | 452 |
456 void _complete(value) { | 453 void _complete(FutureOr<T> value) { |
457 assert(!_isComplete); | 454 assert(!_isComplete); |
458 if (value is Future) { | 455 if (value is Future<T>) { |
459 if (value is _Future) { | 456 if (value is _Future<T>) { |
460 _chainCoreFuture(value, this); | 457 _chainCoreFuture(value, this); |
461 } else { | 458 } else { |
462 _chainForeignFuture(value, this); | 459 _chainForeignFuture(value, this); |
463 } | 460 } |
464 } else { | 461 } else { |
465 _FutureListener listeners = _removeListeners(); | 462 _FutureListener listeners = _removeListeners(); |
466 _setValue(value as Object/*=T*/); | 463 _setValue(value as Object/*=T*/); |
467 _propagateToListeners(this, listeners); | 464 _propagateToListeners(this, listeners); |
468 } | 465 } |
469 } | 466 } |
470 | 467 |
471 void _completeWithValue(T value) { | 468 void _completeWithValue(T value) { |
472 assert(!_isComplete); | 469 assert(!_isComplete); |
473 assert(value is! Future); | 470 assert(value is! Future); |
474 | 471 |
475 _FutureListener listeners = _removeListeners(); | 472 _FutureListener listeners = _removeListeners(); |
476 _setValue(value); | 473 _setValue(value); |
477 _propagateToListeners(this, listeners); | 474 _propagateToListeners(this, listeners); |
478 } | 475 } |
479 | 476 |
480 void _completeError(error, [StackTrace stackTrace]) { | 477 void _completeError(Object error, [StackTrace stackTrace]) { |
481 assert(!_isComplete); | 478 assert(!_isComplete); |
482 | 479 |
483 _FutureListener listeners = _removeListeners(); | 480 _FutureListener listeners = _removeListeners(); |
484 _setError(error, stackTrace); | 481 _setError(error, stackTrace); |
485 _propagateToListeners(this, listeners); | 482 _propagateToListeners(this, listeners); |
486 } | 483 } |
487 | 484 |
488 void _asyncComplete(value) { | 485 void _asyncComplete(FutureOr<T> value) { |
489 assert(!_isComplete); | 486 assert(!_isComplete); |
490 // Two corner cases if the value is a future: | 487 // Two corner cases if the value is a future: |
491 // 1. the future is already completed and an error. | 488 // 1. the future is already completed and an error. |
492 // 2. the future is not yet completed but might become an error. | 489 // 2. the future is not yet completed but might become an error. |
493 // The first case means that we must not immediately complete the Future, | 490 // The first case means that we must not immediately complete the Future, |
494 // as our code would immediately start propagating the error without | 491 // as our code would immediately start propagating the error without |
495 // giving the time to install error-handlers. | 492 // giving the time to install error-handlers. |
496 // However the second case requires us to deal with the value immediately. | 493 // However the second case requires us to deal with the value immediately. |
497 // Otherwise the value could complete with an error and report an | 494 // Otherwise the value could complete with an error and report an |
498 // unhandled error, even though we know we are already going to listen to | 495 // unhandled error, even though we know we are already going to listen to |
499 // it. | 496 // it. |
500 | 497 |
501 if (value is Future) { | 498 if (value is Future<T>) { |
502 // Assign to typed variables so we get earlier checks in checked mode. | 499 if (value is _Future<T>) { |
503 Future<T> typedFuture = value as Object/*=Future<T>*/; | 500 if (value._hasError) { |
504 if (typedFuture is _Future) { | |
505 _Future<T> coreFuture = typedFuture; | |
506 if (coreFuture._hasError) { | |
507 // Case 1 from above. Delay completion to enable the user to register | 501 // Case 1 from above. Delay completion to enable the user to register |
508 // callbacks. | 502 // callbacks. |
509 _setPendingComplete(); | 503 _setPendingComplete(); |
510 _zone.scheduleMicrotask(() { | 504 _zone.scheduleMicrotask(() { |
511 _chainCoreFuture(coreFuture, this); | 505 _chainCoreFuture(value, this); |
512 }); | 506 }); |
513 } else { | 507 } else { |
514 _chainCoreFuture(coreFuture, this); | 508 _chainCoreFuture(value, this); |
515 } | 509 } |
516 } else { | 510 } else { |
517 // Case 2 from above. Chain the future immediately. | 511 // Case 2 from above. Chain the future immediately. |
518 // Note that we are still completing asynchronously (through | 512 // Note that we are still completing asynchronously (through |
519 // _chainForeignFuture). | 513 // _chainForeignFuture). |
520 _chainForeignFuture(typedFuture, this); | 514 _chainForeignFuture(value, this); |
521 } | 515 } |
522 return; | 516 return; |
523 } | 517 } |
524 T typedValue = value as Object/*=T*/; | 518 T typedValue = value as Object/*=T*/; |
525 | 519 |
526 _setPendingComplete(); | 520 _setPendingComplete(); |
527 _zone.scheduleMicrotask(() { | 521 _zone.scheduleMicrotask(() { |
528 _completeWithValue(typedValue); | 522 _completeWithValue(typedValue); |
529 }); | 523 }); |
530 } | 524 } |
(...skipping 171 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
702 result._setValue(listenerValueOrError); | 696 result._setValue(listenerValueOrError); |
703 } else { | 697 } else { |
704 AsyncError asyncError = listenerValueOrError; | 698 AsyncError asyncError = listenerValueOrError; |
705 result._setErrorObject(asyncError); | 699 result._setErrorObject(asyncError); |
706 } | 700 } |
707 // Prepare for next round. | 701 // Prepare for next round. |
708 source = result; | 702 source = result; |
709 } | 703 } |
710 } | 704 } |
711 | 705 |
712 Future<T> timeout(Duration timeLimit, {onTimeout()}) { | 706 Future<T> timeout(Duration timeLimit, {FutureOr<T> onTimeout()}) { |
713 if (_isComplete) return new _Future.immediate(this); | 707 if (_isComplete) return new _Future.immediate(this); |
714 _Future<T> result = new _Future<T>(); | 708 _Future<T> result = new _Future<T>(); |
715 Timer timer; | 709 Timer timer; |
716 if (onTimeout == null) { | 710 if (onTimeout == null) { |
717 timer = new Timer(timeLimit, () { | 711 timer = new Timer(timeLimit, () { |
718 result._completeError( | 712 result._completeError( |
719 new TimeoutException("Future not completed", timeLimit)); | 713 new TimeoutException("Future not completed", timeLimit)); |
720 }); | 714 }); |
721 } else { | 715 } else { |
722 Zone zone = Zone.current; | 716 Zone zone = Zone.current; |
(...skipping 13 matching lines...) Expand all Loading... |
736 } | 730 } |
737 }, onError: (e, s) { | 731 }, onError: (e, s) { |
738 if (timer.isActive) { | 732 if (timer.isActive) { |
739 timer.cancel(); | 733 timer.cancel(); |
740 result._completeError(e, s); | 734 result._completeError(e, s); |
741 } | 735 } |
742 }); | 736 }); |
743 return result; | 737 return result; |
744 } | 738 } |
745 } | 739 } |
OLD | NEW |