OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** The onValue and onError handlers return either a value or a future */ | 7 /** The onValue and onError handlers return either a value or a future */ |
8 typedef dynamic _FutureOnValue<T>(T value); | 8 typedef dynamic _FutureOnValue<T>(T value); |
9 typedef dynamic _FutureOnError(error); | |
10 /** Test used by [Future.catchError] to handle skip some errors. */ | 9 /** Test used by [Future.catchError] to handle skip some errors. */ |
11 typedef bool _FutureErrorTest(var error); | 10 typedef bool _FutureErrorTest(var error); |
12 /** Used by [WhenFuture]. */ | 11 /** Used by [WhenFuture]. */ |
13 typedef _FutureAction(); | 12 typedef _FutureAction(); |
14 | 13 |
15 abstract class _Completer<T> implements Completer<T> { | 14 abstract class _Completer<T> implements Completer<T> { |
16 final _Future<T> future = new _Future<T>(); | 15 final _Future<T> future = new _Future<T>(); |
17 | 16 |
18 void complete([T value]); | 17 void complete([T value]); |
19 | 18 |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
123 // TODO(floitsch): since single listeners are the common case we should | 122 // TODO(floitsch): since single listeners are the common case we should |
124 // use a bit to indicate that the _resultOrListeners contains a container. | 123 // use a bit to indicate that the _resultOrListeners contains a container. |
125 _Future _nextListener; | 124 _Future _nextListener; |
126 | 125 |
127 // TODO(floitsch): we only need two closure fields to store the callbacks. | 126 // TODO(floitsch): we only need two closure fields to store the callbacks. |
128 // If we store the type of a closure in the state field (where there are | 127 // If we store the type of a closure in the state field (where there are |
129 // still bits left), we can just store two closures instead of using 4 | 128 // still bits left), we can just store two closures instead of using 4 |
130 // fields of which 2 are always null. | 129 // fields of which 2 are always null. |
131 final _FutureOnValue _onValueCallback; | 130 final _FutureOnValue _onValueCallback; |
132 final _FutureErrorTest _errorTestCallback; | 131 final _FutureErrorTest _errorTestCallback; |
133 final _FutureOnError _onErrorCallback; | 132 final Function _onErrorCallback; |
134 final _FutureAction _whenCompleteActionCallback; | 133 final _FutureAction _whenCompleteActionCallback; |
135 | 134 |
136 _FutureOnValue get _onValue => _isChained ? null : _onValueCallback; | 135 _FutureOnValue get _onValue => _isChained ? null : _onValueCallback; |
137 _FutureErrorTest get _errorTest => _isChained ? null : _errorTestCallback; | 136 _FutureErrorTest get _errorTest => _isChained ? null : _errorTestCallback; |
138 _FutureOnError get _onError => _isChained ? null : _onErrorCallback; | 137 Function get _onError => _isChained ? null : _onErrorCallback; |
139 _FutureAction get _whenCompleteAction | 138 _FutureAction get _whenCompleteAction |
140 => _isChained ? null : _whenCompleteActionCallback; | 139 => _isChained ? null : _whenCompleteActionCallback; |
141 | 140 |
142 _Future() | 141 _Future() |
143 : _zone = Zone.current, | 142 : _zone = Zone.current, |
144 _onValueCallback = null, _errorTestCallback = null, | 143 _onValueCallback = null, _errorTestCallback = null, |
145 _onErrorCallback = null, _whenCompleteActionCallback = null; | 144 _onErrorCallback = null, _whenCompleteActionCallback = null; |
146 | 145 |
147 /// Valid types for value: `T` or `Future<T>`. | 146 /// Valid types for value: `T` or `Future<T>`. |
148 _Future.immediate(value) | 147 _Future.immediate(value) |
149 : _zone = Zone.current, | 148 : _zone = Zone.current, |
150 _onValueCallback = null, _errorTestCallback = null, | 149 _onValueCallback = null, _errorTestCallback = null, |
151 _onErrorCallback = null, _whenCompleteActionCallback = null { | 150 _onErrorCallback = null, _whenCompleteActionCallback = null { |
152 _asyncComplete(value); | 151 _asyncComplete(value); |
153 } | 152 } |
154 | 153 |
155 _Future.immediateError(var error, [Object stackTrace]) | 154 _Future.immediateError(var error, [Object stackTrace]) |
156 : _zone = Zone.current, | 155 : _zone = Zone.current, |
157 _onValueCallback = null, _errorTestCallback = null, | 156 _onValueCallback = null, _errorTestCallback = null, |
158 _onErrorCallback = null, _whenCompleteActionCallback = null { | 157 _onErrorCallback = null, _whenCompleteActionCallback = null { |
159 _asyncCompleteError(error, stackTrace); | 158 _asyncCompleteError(error, stackTrace); |
160 } | 159 } |
161 | 160 |
162 _Future._then(onValueCallback(value), onErrorCallback(e)) | 161 _Future._then(onValueCallback(value), Function onErrorCallback) |
163 : _zone = Zone.current, | 162 : _zone = Zone.current, |
164 _onValueCallback = Zone.current.registerUnaryCallback(onValueCallback), | 163 _onValueCallback = Zone.current.registerUnaryCallback(onValueCallback), |
165 _onErrorCallback = Zone.current.registerUnaryCallback(onErrorCallback), | 164 _onErrorCallback = _registerErrorHandler(onErrorCallback, Zone.current), |
166 _errorTestCallback = null, | 165 _errorTestCallback = null, |
167 _whenCompleteActionCallback = null; | 166 _whenCompleteActionCallback = null; |
168 | 167 |
169 _Future._catchError(onErrorCallback(e), bool errorTestCallback(e)) | 168 _Future._catchError(Function onErrorCallback, bool errorTestCallback(e)) |
170 : _zone = Zone.current, | 169 : _zone = Zone.current, |
171 _onErrorCallback = Zone.current.registerUnaryCallback(onErrorCallback), | 170 _onErrorCallback = _registerErrorHandler(onErrorCallback, Zone.current), |
172 _errorTestCallback = Zone.current.registerUnaryCallback(errorTestCallback)
, | 171 _errorTestCallback = |
| 172 Zone.current.registerUnaryCallback(errorTestCallback), |
173 _onValueCallback = null, | 173 _onValueCallback = null, |
174 _whenCompleteActionCallback = null; | 174 _whenCompleteActionCallback = null; |
175 | 175 |
176 _Future._whenComplete(whenCompleteActionCallback()) | 176 _Future._whenComplete(whenCompleteActionCallback()) |
177 : _zone = Zone.current, | 177 : _zone = Zone.current, |
178 _whenCompleteActionCallback = | 178 _whenCompleteActionCallback = |
179 Zone.current.registerCallback(whenCompleteActionCallback), | 179 Zone.current.registerCallback(whenCompleteActionCallback), |
180 _onValueCallback = null, | 180 _onValueCallback = null, |
181 _errorTestCallback = null, | 181 _errorTestCallback = null, |
182 _onErrorCallback = null; | 182 _onErrorCallback = null; |
183 | 183 |
184 Future then(f(T value), { onError(error) }) { | 184 Future then(f(T value), { Function onError }) { |
185 _Future result; | 185 _Future result; |
186 result = new _Future._then(f, onError); | 186 result = new _Future._then(f, onError); |
187 _addListener(result); | 187 _addListener(result); |
188 return result; | 188 return result; |
189 } | 189 } |
190 | 190 |
191 Future catchError(f(error), { bool test(error) }) { | 191 Future catchError(Function onError, { bool test(error) }) { |
192 _Future result = new _Future._catchError(f, test); | 192 _Future result = new _Future._catchError(onError, test); |
193 _addListener(result); | 193 _addListener(result); |
194 return result; | 194 return result; |
195 } | 195 } |
196 | 196 |
197 Future<T> whenComplete(action()) { | 197 Future<T> whenComplete(action()) { |
198 _Future result = new _Future<T>._whenComplete(action); | 198 _Future result = new _Future<T>._whenComplete(action); |
199 _addListener(result); | 199 _addListener(result); |
200 return result; | 200 return result; |
201 } | 201 } |
202 | 202 |
203 Stream<T> asStream() => new Stream.fromFuture(this); | 203 Stream<T> asStream() => new Stream.fromFuture(this); |
204 | 204 |
205 void _markPendingCompletion() { | 205 void _markPendingCompletion() { |
206 if (!_mayComplete) throw new StateError("Future already completed"); | 206 if (!_mayComplete) throw new StateError("Future already completed"); |
207 _state = _PENDING_COMPLETE; | 207 _state = _PENDING_COMPLETE; |
208 } | 208 } |
209 | 209 |
210 T get _value { | 210 T get _value { |
211 assert(_isComplete && _hasValue); | 211 assert(_isComplete && _hasValue); |
212 return _resultOrListeners; | 212 return _resultOrListeners; |
213 } | 213 } |
214 | 214 |
215 Object get _error { | 215 _AsyncError get _error { |
216 assert(_isComplete && _hasError); | 216 assert(_isComplete && _hasError); |
217 return _resultOrListeners; | 217 return _resultOrListeners; |
218 } | 218 } |
219 | 219 |
220 void _setValue(T value) { | 220 void _setValue(T value) { |
221 assert(!_isComplete); // But may have a completion pending. | 221 assert(!_isComplete); // But may have a completion pending. |
222 _state = _VALUE; | 222 _state = _VALUE; |
223 _resultOrListeners = value; | 223 _resultOrListeners = value; |
224 } | 224 } |
225 | 225 |
226 void _setError(Object error) { | 226 void _setError(Object error, StackTrace stackTrace) { |
227 assert(!_isComplete); // But may have a completion pending. | 227 assert(!_isComplete); // But may have a completion pending. |
228 _state = _ERROR; | 228 _state = _ERROR; |
229 _resultOrListeners = error; | 229 _resultOrListeners = new _AsyncError(error, stackTrace); |
230 } | 230 } |
231 | 231 |
232 void _addListener(_Future listener) { | 232 void _addListener(_Future listener) { |
233 assert(listener._nextListener == null); | 233 assert(listener._nextListener == null); |
234 if (_isComplete) { | 234 if (_isComplete) { |
235 // Handle late listeners asynchronously. | 235 // Handle late listeners asynchronously. |
236 _zone.scheduleMicrotask(() { | 236 _zone.scheduleMicrotask(() { |
237 _propagateToListeners(this, listener); | 237 _propagateToListeners(this, listener); |
238 }); | 238 }); |
239 } else { | 239 } else { |
(...skipping 28 matching lines...) Expand all Loading... |
268 if (internalFuture._isComplete) { | 268 if (internalFuture._isComplete) { |
269 _propagateToListeners(internalFuture, target); | 269 _propagateToListeners(internalFuture, target); |
270 } else { | 270 } else { |
271 internalFuture._addListener(target); | 271 internalFuture._addListener(target); |
272 } | 272 } |
273 } else { | 273 } else { |
274 source.then((value) { | 274 source.then((value) { |
275 assert(target._isChained); | 275 assert(target._isChained); |
276 target._complete(value); | 276 target._complete(value); |
277 }, | 277 }, |
278 onError: (error) { | 278 // TODO(floitsch): eventually we would like to make this non-optional |
| 279 // and dependent on the listeners of the target future. If none of |
| 280 // the target future's listeners want to have the stack trace we don't |
| 281 // need a trace. |
| 282 onError: (error, [stackTrace]) { |
279 assert(target._isChained); | 283 assert(target._isChained); |
280 target._completeError(error); | 284 target._completeError(error, stackTrace); |
281 }); | 285 }); |
282 } | 286 } |
283 } | 287 } |
284 | 288 |
285 void _complete(value) { | 289 void _complete(value) { |
286 assert(!_isComplete); | 290 assert(!_isComplete); |
287 assert(_onValue == null); | 291 assert(_onValue == null); |
288 assert(_onError == null); | 292 assert(_onError == null); |
289 assert(_whenCompleteAction == null); | 293 assert(_whenCompleteAction == null); |
290 assert(_errorTest == null); | 294 assert(_errorTest == null); |
(...skipping 13 matching lines...) Expand all Loading... |
304 assert(_onError == null); | 308 assert(_onError == null); |
305 assert(_whenCompleteAction == null); | 309 assert(_whenCompleteAction == null); |
306 assert(_errorTest == null); | 310 assert(_errorTest == null); |
307 | 311 |
308 if (stackTrace != null) { | 312 if (stackTrace != null) { |
309 // Force the stack trace onto the error, even if it already had one. | 313 // Force the stack trace onto the error, even if it already had one. |
310 _attachStackTrace(error, stackTrace); | 314 _attachStackTrace(error, stackTrace); |
311 } | 315 } |
312 | 316 |
313 _Future listeners = _isChained ? null : _removeListeners(); | 317 _Future listeners = _isChained ? null : _removeListeners(); |
314 _setError(error); | 318 _setError(error, stackTrace); |
315 _propagateToListeners(this, listeners); | 319 _propagateToListeners(this, listeners); |
316 } | 320 } |
317 | 321 |
318 void _asyncComplete(value) { | 322 void _asyncComplete(value) { |
319 assert(!_isComplete); | 323 assert(!_isComplete); |
320 assert(_onValue == null); | 324 assert(_onValue == null); |
321 assert(_onError == null); | 325 assert(_onError == null); |
322 assert(_whenCompleteAction == null); | 326 assert(_whenCompleteAction == null); |
323 assert(_errorTest == null); | 327 assert(_errorTest == null); |
324 // Two corner cases if the value is a future: | 328 // Two corner cases if the value is a future: |
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
391 * | 395 * |
392 * If [runCallback] is true (which should be the default) it executes | 396 * If [runCallback] is true (which should be the default) it executes |
393 * the registered action of listeners. If it is `false` then the callback is | 397 * the registered action of listeners. If it is `false` then the callback is |
394 * skipped. This is used to complete futures with chained futures. | 398 * skipped. This is used to complete futures with chained futures. |
395 */ | 399 */ |
396 static void _propagateToListeners(_Future source, _Future listeners) { | 400 static void _propagateToListeners(_Future source, _Future listeners) { |
397 while (true) { | 401 while (true) { |
398 if (!source._isComplete) return; // Chained future. | 402 if (!source._isComplete) return; // Chained future. |
399 bool hasError = source._hasError; | 403 bool hasError = source._hasError; |
400 if (hasError && listeners == null) { | 404 if (hasError && listeners == null) { |
401 source._zone.handleUncaughtError(source._error); | 405 _AsyncError asyncError = source._error; |
| 406 source._zone.handleUncaughtError( |
| 407 asyncError.error, asyncError.stackTrace); |
402 return; | 408 return; |
403 } | 409 } |
404 if (listeners == null) return; | 410 if (listeners == null) return; |
405 _Future listener = listeners; | 411 _Future listener = listeners; |
406 if (listener._nextListener != null) { | 412 if (listener._nextListener != null) { |
407 // Usually futures only have one listener. If they have several, we | 413 // Usually futures only have one listener. If they have several, we |
408 // handle them specially. | 414 // handle them specially. |
409 _propagateMultipleListeners(source, listeners); | 415 _propagateMultipleListeners(source, listeners); |
410 return; | 416 return; |
411 } | 417 } |
412 if (hasError && !source._zone.inSameErrorZone(listener._zone)) { | 418 if (hasError && !source._zone.inSameErrorZone(listener._zone)) { |
413 // Don't cross zone boundaries with errors. | 419 // Don't cross zone boundaries with errors. |
414 source._zone.handleUncaughtError(source._error); | 420 _AsyncError asyncError = source._error; |
| 421 source._zone.handleUncaughtError( |
| 422 asyncError.error, asyncError.stackTrace); |
415 return; | 423 return; |
416 } | 424 } |
417 if (!identical(Zone.current, listener._zone)) { | 425 if (!identical(Zone.current, listener._zone)) { |
418 // Run the propagation in the listener's zone to avoid | 426 // Run the propagation in the listener's zone to avoid |
419 // zone transitions. The idea is that many chained futures will | 427 // zone transitions. The idea is that many chained futures will |
420 // be in the same zone. | 428 // be in the same zone. |
421 listener._zone.run(() { | 429 listener._zone.run(() { |
422 _propagateToListeners(source, listener); | 430 _propagateToListeners(source, listener); |
423 }); | 431 }); |
424 return; | 432 return; |
(...skipping 22 matching lines...) Expand all Loading... |
447 var value = source._value; | 455 var value = source._value; |
448 if (listener._onValue != null) { | 456 if (listener._onValue != null) { |
449 listenerValueOrError = listener._onValue(value); | 457 listenerValueOrError = listener._onValue(value); |
450 listenerHasValue = true; | 458 listenerHasValue = true; |
451 } else { | 459 } else { |
452 // Copy over the value from the source. | 460 // Copy over the value from the source. |
453 listenerValueOrError = value; | 461 listenerValueOrError = value; |
454 listenerHasValue = true; | 462 listenerHasValue = true; |
455 } | 463 } |
456 } else { | 464 } else { |
457 Object error = source._error; | 465 _AsyncError asyncError = source._error; |
458 _FutureErrorTest test = listener._errorTest; | 466 _FutureErrorTest test = listener._errorTest; |
459 bool matchesTest = true; | 467 bool matchesTest = true; |
460 if (test != null) { | 468 if (test != null) { |
461 matchesTest = test(error); | 469 matchesTest = test(asyncError.error); |
462 } | 470 } |
463 if (matchesTest && listener._onError != null) { | 471 if (matchesTest && listener._onError != null) { |
464 listenerValueOrError = listener._onError(error); | 472 Function errorCallback = listener._onError; |
| 473 listenerValueOrError = _invokeErrorHandler(errorCallback, |
| 474 asyncError.error, |
| 475 asyncError.stackTrace); |
465 listenerHasValue = true; | 476 listenerHasValue = true; |
466 } else { | 477 } else { |
467 // Copy over the error from the source. | 478 // Copy over the error from the source. |
468 listenerValueOrError = error; | 479 listenerValueOrError = asyncError; |
469 listenerHasValue = false; | 480 listenerHasValue = false; |
470 } | 481 } |
471 } | 482 } |
472 | 483 |
473 if (listener._whenCompleteAction != null) { | 484 if (listener._whenCompleteAction != null) { |
474 var completeResult = listener._whenCompleteAction(); | 485 var completeResult = listener._whenCompleteAction(); |
475 if (completeResult is Future) { | 486 if (completeResult is Future) { |
476 listener._isChained = true; | 487 listener._isChained = true; |
477 completeResult.then((ignored) { | 488 completeResult.then((ignored) { |
478 // Try again, but this time don't run the whenComplete callback. | 489 // Try again, but this time don't run the whenComplete callback. |
479 _propagateToListeners(source, listener); | 490 _propagateToListeners(source, listener); |
480 }, onError: (error) { | 491 }, onError: (error, [stackTrace]) { |
481 // When there is an error, we have to make the error the new | 492 // When there is an error, we have to make the error the new |
482 // result of the current listener. | 493 // result of the current listener. |
483 if (completeResult is! _Future) { | 494 if (completeResult is! _Future) { |
484 // This should be a rare case. | 495 // This should be a rare case. |
485 completeResult = new _Future(); | 496 completeResult = new _Future(); |
486 completeResult._setError(error); | 497 completeResult._setError(error, stackTrace); |
487 } | 498 } |
488 _propagateToListeners(completeResult, listener); | 499 _propagateToListeners(completeResult, listener); |
489 }); | 500 }); |
490 isPropagationAborted = true; | 501 isPropagationAborted = true; |
491 } | 502 } |
492 } | 503 } |
493 } catch (e, s) { | 504 } catch (e, s) { |
494 // Set the exception as error. | 505 // Set the exception as error unless the error is the same as the |
495 listenerValueOrError = _asyncError(e, s); | 506 // original one. |
| 507 if (hasError && identical(source._error.error, e)) { |
| 508 listenerValueOrError = source._error; |
| 509 } else { |
| 510 listenerValueOrError = new _AsyncError(_asyncError(e, s), s); |
| 511 } |
496 listenerHasValue = false; | 512 listenerHasValue = false; |
497 } | 513 } |
498 }); | 514 }); |
499 if (isPropagationAborted) return; | 515 if (isPropagationAborted) return; |
500 // If the listener's value is a future we need to chain it. | 516 // If the listener's value is a future we need to chain it. |
501 if (listenerHasValue && listenerValueOrError is Future) { | 517 if (listenerHasValue && listenerValueOrError is Future) { |
502 Future chainSource = listenerValueOrError; | 518 Future chainSource = listenerValueOrError; |
503 // Shortcut if the chain-source is already completed. Just continue the | 519 // Shortcut if the chain-source is already completed. Just continue the |
504 // loop. | 520 // loop. |
505 if (chainSource is _Future && (chainSource as _Future)._isComplete) { | 521 if (chainSource is _Future && (chainSource as _Future)._isComplete) { |
506 // propagate the value (simulating a tail call). | 522 // propagate the value (simulating a tail call). |
507 listener._isChained = true; | 523 listener._isChained = true; |
508 source = chainSource; | 524 source = chainSource; |
509 listeners = listener; | 525 listeners = listener; |
510 continue; | 526 continue; |
511 } | 527 } |
512 _chainFutures(chainSource, listener); | 528 _chainFutures(chainSource, listener); |
513 return; | 529 return; |
514 } | 530 } |
515 | 531 |
516 if (listenerHasValue) { | 532 if (listenerHasValue) { |
517 listeners = listener._removeListeners(); | 533 listeners = listener._removeListeners(); |
518 listener._setValue(listenerValueOrError); | 534 listener._setValue(listenerValueOrError); |
519 } else { | 535 } else { |
520 listeners = listener._removeListeners(); | 536 listeners = listener._removeListeners(); |
521 listener._setError(listenerValueOrError); | 537 _AsyncError asyncError = listenerValueOrError; |
| 538 listener._setError(asyncError.error, asyncError.stackTrace); |
522 } | 539 } |
523 // Prepare for next round. | 540 // Prepare for next round. |
524 source = listener; | 541 source = listener; |
525 } | 542 } |
526 } | 543 } |
527 } | 544 } |
OLD | NEW |