OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 part of dart.async; | |
6 | |
7 /** The onValue and onError handlers return either a value or a future */ | |
8 typedef dynamic/*T|Future<T>*/ _FutureOnValue<S, T>(S value); | |
9 /** Test used by [Future.catchError] to handle skip some errors. */ | |
10 typedef bool _FutureErrorTest(var error); | |
11 /** Used by [WhenFuture]. */ | |
12 typedef _FutureAction(); | |
13 | |
14 abstract class _Completer<T> implements Completer<T> { | |
15 final _Future<T> future = new _Future<T>(); | |
16 | |
17 void complete([value]); | |
18 | |
19 void completeError(Object error, [StackTrace stackTrace]) { | |
20 error = _nonNullError(error); | |
21 if (!future._mayComplete) throw new StateError("Future already completed"); | |
22 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | |
23 if (replacement != null) { | |
24 error = _nonNullError(replacement.error); | |
25 stackTrace = replacement.stackTrace; | |
26 } | |
27 _completeError(error, stackTrace); | |
28 } | |
29 | |
30 void _completeError(Object error, StackTrace stackTrace); | |
31 | |
32 // The future's _isComplete doesn't take into account pending completions. | |
33 // We therefore use _mayComplete. | |
34 bool get isCompleted => !future._mayComplete; | |
35 } | |
36 | |
37 class _AsyncCompleter<T> extends _Completer<T> { | |
38 | |
39 void complete([value]) { | |
40 if (!future._mayComplete) throw new StateError("Future already completed"); | |
41 future._asyncComplete(value); | |
42 } | |
43 | |
44 void _completeError(Object error, StackTrace stackTrace) { | |
45 future._asyncCompleteError(error, stackTrace); | |
46 } | |
47 } | |
48 | |
49 class _SyncCompleter<T> extends _Completer<T> { | |
50 void complete([value]) { | |
51 if (!future._mayComplete) throw new StateError("Future already completed"); | |
52 future._complete(value); | |
53 } | |
54 | |
55 void _completeError(Object error, StackTrace stackTrace) { | |
56 future._completeError(error, stackTrace); | |
57 } | |
58 } | |
59 | |
60 class _FutureListener<S, T> { | |
61 static const int MASK_VALUE = 1; | |
62 static const int MASK_ERROR = 2; | |
63 static const int MASK_TEST_ERROR = 4; | |
64 static const int MASK_WHENCOMPLETE = 8; | |
65 static const int STATE_CHAIN = 0; | |
66 static const int STATE_THEN = MASK_VALUE; | |
67 // TODO(johnmccutchan): Remove the hard coded value. See #26030. | |
68 static const int STATE_THEN_ONERROR = 3; // MASK_VALUE | MASK_ERROR. | |
69 static const int STATE_CATCHERROR = MASK_ERROR; | |
70 // TODO(johnmccutchan): Remove the hard coded value. See #26030. | |
71 static const int STATE_CATCHERROR_TEST = 6; // MASK_ERROR | MASK_TEST_ERROR. | |
72 static const int STATE_WHENCOMPLETE = MASK_WHENCOMPLETE; | |
73 // Listeners on the same future are linked through this link. | |
74 _FutureListener _nextListener = null; | |
75 // The future to complete when this listener is activated. | |
76 final _Future<T> result; | |
77 // Which fields means what. | |
78 final int state; | |
79 // Used for then/whenDone callback and error test | |
80 final Function callback; | |
81 // Used for error callbacks. | |
82 final Function errorCallback; | |
83 | |
84 _FutureListener.then(this.result, | |
85 _FutureOnValue<S, T> onValue, Function errorCallback) | |
86 : callback = onValue, | |
87 errorCallback = errorCallback, | |
88 state = (errorCallback == null) ? STATE_THEN : STATE_THEN_ONERROR; | |
89 | |
90 _FutureListener.catchError(this.result, | |
91 this.errorCallback, _FutureErrorTest test) | |
92 : callback = test, | |
93 state = (test == null) ? STATE_CATCHERROR : STATE_CATCHERROR_TEST; | |
94 | |
95 _FutureListener.whenComplete(this.result, _FutureAction onComplete) | |
96 : callback = onComplete, | |
97 errorCallback = null, | |
98 state = STATE_WHENCOMPLETE; | |
99 | |
100 Zone get _zone => result._zone; | |
101 | |
102 bool get handlesValue => (state & MASK_VALUE != 0); | |
103 bool get handlesError => (state & MASK_ERROR != 0); | |
104 bool get hasErrorTest => (state == STATE_CATCHERROR_TEST); | |
105 bool get handlesComplete => (state == STATE_WHENCOMPLETE); | |
106 | |
107 | |
108 _FutureOnValue<S, T> get _onValue { | |
109 assert(handlesValue); | |
110 return callback as Object /*=_FutureOnValue<S, T>*/; | |
111 } | |
112 Function get _onError => errorCallback; | |
113 _FutureErrorTest get _errorTest { | |
114 assert(hasErrorTest); | |
115 return callback as Object /*=_FutureErrorTest*/; | |
116 } | |
117 _FutureAction get _whenCompleteAction { | |
118 assert(handlesComplete); | |
119 return callback as Object /*=_FutureAction*/; | |
120 } | |
121 | |
122 /// Whether this listener has an error callback. | |
123 /// | |
124 /// This function must only be called if the listener [handlesError]. | |
125 bool get hasErrorCallback { | |
126 assert(handlesError); | |
127 return _onError != null; | |
128 } | |
129 | |
130 dynamic/*T|Future<T>*/ handleValue(S sourceResult) { | |
131 return _zone.runUnary<dynamic/*T|Future<T>*/, S>( | |
132 _onValue, sourceResult); | |
133 } | |
134 | |
135 bool matchesErrorTest(AsyncError asyncError) { | |
136 if (!hasErrorTest) return true; | |
137 _FutureErrorTest test = _errorTest; | |
138 return _zone.runUnary<bool, dynamic>(_errorTest, asyncError.error); | |
139 } | |
140 | |
141 dynamic/*T|Future<T>*/ handleError(AsyncError asyncError) { | |
142 assert(handlesError && hasErrorCallback); | |
143 if (errorCallback is ZoneBinaryCallback) { | |
144 var typedErrorCallback = errorCallback as Object | |
145 /*=ZoneBinaryCallback<Object/*T|Future<T>*/, Object, StackTrace>*/; | |
146 return _zone.runBinary(typedErrorCallback, | |
147 asyncError.error, | |
148 asyncError.stackTrace); | |
149 } else { | |
150 return _zone.runUnary<dynamic/*T|Future<T>*/, dynamic>( | |
151 errorCallback, asyncError.error); | |
152 } | |
153 } | |
154 | |
155 dynamic handleWhenComplete() { | |
156 assert(!handlesError); | |
157 return _zone.run(_whenCompleteAction); | |
158 } | |
159 } | |
160 | |
161 class _Future<T> implements Future<T> { | |
162 /// Initial state, waiting for a result. In this state, the | |
163 /// [resultOrListeners] field holds a single-linked list of | |
164 /// [_FutureListener] listeners. | |
165 static const int _INCOMPLETE = 0; | |
166 /// Pending completion. Set when completed using [_asyncComplete] or | |
167 /// [_asyncCompleteError]. It is an error to try to complete it again. | |
168 /// [resultOrListeners] holds listeners. | |
169 static const int _PENDING_COMPLETE = 1; | |
170 /// The future has been chained to another future. The result of that | |
171 /// other future becomes the result of this future as well. | |
172 /// [resultOrListeners] contains the source future. | |
173 static const int _CHAINED = 2; | |
174 /// The future has been completed with a value result. | |
175 static const int _VALUE = 4; | |
176 /// The future has been completed with an error result. | |
177 static const int _ERROR = 8; | |
178 | |
179 /** Whether the future is complete, and as what. */ | |
180 int _state = _INCOMPLETE; | |
181 | |
182 /** | |
183 * Zone that the future was completed from. | |
184 * This is the zone that an error result belongs to. | |
185 * | |
186 * Until the future is completed, the field may hold the zone that | |
187 * listener callbacks used to create this future should be run in. | |
188 */ | |
189 final Zone _zone = Zone.current; | |
190 | |
191 /** | |
192 * Either the result, a list of listeners or another future. | |
193 * | |
194 * The result of the future is either a value or an error. | |
195 * A result is only stored when the future has completed. | |
196 * | |
197 * The listeners is an internally linked list of [_FutureListener]s. | |
198 * Listeners are only remembered while the future is not yet complete, | |
199 * and it is not chained to another future. | |
200 * | |
201 * The future is another future that his future is chained to. This future | |
202 * is waiting for the other future to complete, and when it does, this future | |
203 * will complete with the same result. | |
204 * All listeners are forwarded to the other future. | |
205 */ | |
206 var _resultOrListeners; | |
207 | |
208 // This constructor is used by async/await. | |
209 _Future(); | |
210 | |
211 /// Valid types for value: `T` or `Future<T>`. | |
212 _Future.immediate(value) { | |
213 _asyncComplete(value); | |
214 } | |
215 | |
216 _Future.immediateError(var error, [StackTrace stackTrace]) { | |
217 _asyncCompleteError(error, stackTrace); | |
218 } | |
219 | |
220 bool get _mayComplete => _state == _INCOMPLETE; | |
221 bool get _isPendingComplete => _state == _PENDING_COMPLETE; | |
222 bool get _mayAddListener => _state <= _PENDING_COMPLETE; | |
223 bool get _isChained => _state == _CHAINED; | |
224 bool get _isComplete => _state >= _VALUE; | |
225 bool get _hasError => _state == _ERROR; | |
226 | |
227 void _setChained(_Future source) { | |
228 assert(_mayAddListener); | |
229 _state = _CHAINED; | |
230 _resultOrListeners = source; | |
231 } | |
232 | |
233 Future<E> then<E>( | |
234 FutureOr<E> f(T value), { Function onError }) { | |
235 Zone currentZone = Zone.current; | |
236 ZoneUnaryCallback registered; | |
237 if (!identical(currentZone, _ROOT_ZONE)) { | |
238 f = currentZone.registerUnaryCallback<FutureOr<E>, T>(f); | |
239 if (onError != null) { | |
240 onError = _registerErrorHandler<T>(onError, currentZone); | |
241 } | |
242 } | |
243 return _thenNoZoneRegistration<E>(f, onError); | |
244 } | |
245 | |
246 // This method is used by async/await. | |
247 Future<E> _thenNoZoneRegistration<E>(f(T value), Function onError) { | |
248 _Future<E> result = new _Future<E>(); | |
249 _addListener(new _FutureListener<T, E>.then(result, f, onError)); | |
250 return result; | |
251 } | |
252 | |
253 Future<T> catchError(Function onError, { bool test(error) }) { | |
254 _Future<T> result = new _Future<T>(); | |
255 if (!identical(result._zone, _ROOT_ZONE)) { | |
256 onError = _registerErrorHandler<T>(onError, result._zone); | |
257 if (test != null) test = result._zone.registerUnaryCallback(test); | |
258 } | |
259 _addListener(new _FutureListener<T, T>.catchError( | |
260 result, onError, test)); | |
261 return result; | |
262 } | |
263 | |
264 Future<T> whenComplete(action()) { | |
265 _Future<T> result = new _Future<T>(); | |
266 if (!identical(result._zone, _ROOT_ZONE)) { | |
267 action = result._zone.registerCallback<dynamic>(action); | |
268 } | |
269 _addListener(new _FutureListener<T, T>.whenComplete(result, action)); | |
270 return result; | |
271 } | |
272 | |
273 Stream<T> asStream() => new Stream<T>.fromFuture(this); | |
274 | |
275 void _setPendingComplete() { | |
276 assert(_mayComplete); | |
277 _state = _PENDING_COMPLETE; | |
278 } | |
279 | |
280 void _clearPendingComplete() { | |
281 assert(_isPendingComplete); | |
282 _state = _INCOMPLETE; | |
283 } | |
284 | |
285 AsyncError get _error { | |
286 assert(_hasError); | |
287 return _resultOrListeners; | |
288 } | |
289 | |
290 _Future get _chainSource { | |
291 assert(_isChained); | |
292 return _resultOrListeners; | |
293 } | |
294 | |
295 // This method is used by async/await. | |
296 void _setValue(T value) { | |
297 assert(!_isComplete); // But may have a completion pending. | |
298 _state = _VALUE; | |
299 _resultOrListeners = value; | |
300 } | |
301 | |
302 void _setErrorObject(AsyncError error) { | |
303 assert(!_isComplete); // But may have a completion pending. | |
304 _state = _ERROR; | |
305 _resultOrListeners = error; | |
306 } | |
307 | |
308 void _setError(Object error, StackTrace stackTrace) { | |
309 _setErrorObject(new AsyncError(error, stackTrace)); | |
310 } | |
311 | |
312 /// Copy the completion result of [source] into this future. | |
313 /// | |
314 /// Used when a chained future notices that its source is completed. | |
315 void _cloneResult(_Future source) { | |
316 assert(!_isComplete); | |
317 assert(source._isComplete); | |
318 _state = source._state; | |
319 _resultOrListeners = source._resultOrListeners; | |
320 } | |
321 | |
322 void _addListener(_FutureListener listener) { | |
323 assert(listener._nextListener == null); | |
324 if (_mayAddListener) { | |
325 listener._nextListener = _resultOrListeners; | |
326 _resultOrListeners = listener; | |
327 } else { | |
328 if (_isChained) { | |
329 // Delegate listeners to chained source future. | |
330 // If the source is complete, instead copy its values and | |
331 // drop the chaining. | |
332 _Future source = _chainSource; | |
333 if (!source._isComplete) { | |
334 source._addListener(listener); | |
335 return; | |
336 } | |
337 _cloneResult(source); | |
338 } | |
339 assert(_isComplete); | |
340 // Handle late listeners asynchronously. | |
341 _zone.scheduleMicrotask(() { | |
342 _propagateToListeners(this, listener); | |
343 }); | |
344 } | |
345 } | |
346 | |
347 void _prependListeners(_FutureListener listeners) { | |
348 if (listeners == null) return; | |
349 if (_mayAddListener) { | |
350 _FutureListener existingListeners = _resultOrListeners; | |
351 _resultOrListeners = listeners; | |
352 if (existingListeners != null) { | |
353 _FutureListener cursor = listeners; | |
354 while (cursor._nextListener != null) { | |
355 cursor = cursor._nextListener; | |
356 } | |
357 cursor._nextListener = existingListeners; | |
358 } | |
359 } else { | |
360 if (_isChained) { | |
361 // Delegate listeners to chained source future. | |
362 // If the source is complete, instead copy its values and | |
363 // drop the chaining. | |
364 _Future source = _chainSource; | |
365 if (!source._isComplete) { | |
366 source._prependListeners(listeners); | |
367 return; | |
368 } | |
369 _cloneResult(source); | |
370 } | |
371 assert(_isComplete); | |
372 listeners = _reverseListeners(listeners); | |
373 _zone.scheduleMicrotask(() { | |
374 _propagateToListeners(this, listeners); | |
375 }); | |
376 } | |
377 } | |
378 | |
379 _FutureListener _removeListeners() { | |
380 // Reverse listeners before returning them, so the resulting list is in | |
381 // subscription order. | |
382 assert(!_isComplete); | |
383 _FutureListener current = _resultOrListeners; | |
384 _resultOrListeners = null; | |
385 return _reverseListeners(current); | |
386 } | |
387 | |
388 _FutureListener _reverseListeners(_FutureListener listeners) { | |
389 _FutureListener prev = null; | |
390 _FutureListener current = listeners; | |
391 while (current != null) { | |
392 _FutureListener next = current._nextListener; | |
393 current._nextListener = prev; | |
394 prev = current; | |
395 current = next; | |
396 } | |
397 return prev; | |
398 } | |
399 | |
400 // Take the value (when completed) of source and complete target with that | |
401 // value (or error). This function could chain all Futures, but is slower | |
402 // for _Future than _chainCoreFuture, so you must use _chainCoreFuture | |
403 // in that case. | |
404 static void _chainForeignFuture(Future source, _Future target) { | |
405 assert(!target._isComplete); | |
406 assert(source is! _Future); | |
407 | |
408 // Mark the target as chained (and as such half-completed). | |
409 target._setPendingComplete(); | |
410 try { | |
411 source.then((value) { | |
412 assert(target._isPendingComplete); | |
413 // The "value" may be another future if the foreign future | |
414 // implementation is mis-behaving, | |
415 // so use _complete instead of _completeWithValue. | |
416 target._clearPendingComplete(); // Clear this first, it's set again. | |
417 target._complete(value); | |
418 }, | |
419 // TODO(floitsch): eventually we would like to make this non-optional | |
420 // and dependent on the listeners of the target future. If none of | |
421 // the target future's listeners want to have the stack trace we don't | |
422 // need a trace. | |
423 onError: (error, [stackTrace]) { | |
424 assert(target._isPendingComplete); | |
425 target._completeError(error, stackTrace); | |
426 }); | |
427 } catch (e, s) { | |
428 // This only happens if the `then` call threw synchronously when given | |
429 // valid arguments. | |
430 // That requires a non-conforming implementation of the Future interface, | |
431 // which should, hopefully, never happen. | |
432 scheduleMicrotask(() { | |
433 target._completeError(e, s); | |
434 }); | |
435 } | |
436 } | |
437 | |
438 // Take the value (when completed) of source and complete target with that | |
439 // value (or error). This function expects that source is a _Future. | |
440 static void _chainCoreFuture(_Future source, _Future target) { | |
441 assert(target._mayAddListener); // Not completed, not already chained. | |
442 while (source._isChained) { | |
443 source = source._chainSource; | |
444 } | |
445 if (source._isComplete) { | |
446 _FutureListener listeners = target._removeListeners(); | |
447 target._cloneResult(source); | |
448 _propagateToListeners(target, listeners); | |
449 } else { | |
450 _FutureListener listeners = target._resultOrListeners; | |
451 target._setChained(source); | |
452 source._prependListeners(listeners); | |
453 } | |
454 } | |
455 | |
456 void _complete(value) { | |
457 assert(!_isComplete); | |
458 if (value is Future) { | |
459 if (value is _Future) { | |
460 _chainCoreFuture(value, this); | |
461 } else { | |
462 _chainForeignFuture(value, this); | |
463 } | |
464 } else { | |
465 _FutureListener listeners = _removeListeners(); | |
466 _setValue(value as Object /*=T*/); | |
467 _propagateToListeners(this, listeners); | |
468 } | |
469 } | |
470 | |
471 void _completeWithValue(T value) { | |
472 assert(!_isComplete); | |
473 assert(value is! Future); | |
474 | |
475 _FutureListener listeners = _removeListeners(); | |
476 _setValue(value); | |
477 _propagateToListeners(this, listeners); | |
478 } | |
479 | |
480 void _completeError(error, [StackTrace stackTrace]) { | |
481 assert(!_isComplete); | |
482 | |
483 _FutureListener listeners = _removeListeners(); | |
484 _setError(error, stackTrace); | |
485 _propagateToListeners(this, listeners); | |
486 } | |
487 | |
488 void _asyncComplete(value) { | |
489 assert(!_isComplete); | |
490 // Two corner cases if the value is a future: | |
491 // 1. the future is already completed and an error. | |
492 // 2. the future is not yet completed but might become an error. | |
493 // The first case means that we must not immediately complete the Future, | |
494 // as our code would immediately start propagating the error without | |
495 // giving the time to install error-handlers. | |
496 // However the second case requires us to deal with the value immediately. | |
497 // Otherwise the value could complete with an error and report an | |
498 // unhandled error, even though we know we are already going to listen to | |
499 // it. | |
500 | |
501 if (value is Future) { | |
502 // Assign to typed variables so we get earlier checks in checked mode. | |
503 Future<T> typedFuture = value as Object /*=Future<T>*/; | |
504 if (typedFuture is _Future) { | |
505 _Future<T> coreFuture = typedFuture; | |
506 if (coreFuture._hasError) { | |
507 // Case 1 from above. Delay completion to enable the user to register | |
508 // callbacks. | |
509 _setPendingComplete(); | |
510 _zone.scheduleMicrotask(() { | |
511 _chainCoreFuture(coreFuture, this); | |
512 }); | |
513 } else { | |
514 _chainCoreFuture(coreFuture, this); | |
515 } | |
516 } else { | |
517 // Case 2 from above. Chain the future immediately. | |
518 // Note that we are still completing asynchronously (through | |
519 // _chainForeignFuture). | |
520 _chainForeignFuture(typedFuture, this); | |
521 } | |
522 return; | |
523 } | |
524 T typedValue = value as Object /*=T*/; | |
525 | |
526 _setPendingComplete(); | |
527 _zone.scheduleMicrotask(() { | |
528 _completeWithValue(typedValue); | |
529 }); | |
530 } | |
531 | |
532 void _asyncCompleteError(error, StackTrace stackTrace) { | |
533 assert(!_isComplete); | |
534 | |
535 _setPendingComplete(); | |
536 _zone.scheduleMicrotask(() { | |
537 _completeError(error, stackTrace); | |
538 }); | |
539 } | |
540 | |
541 /** | |
542 * Propagates the value/error of [source] to its [listeners], executing the | |
543 * listeners' callbacks. | |
544 */ | |
545 static void _propagateToListeners(_Future source, _FutureListener listeners) { | |
546 while (true) { | |
547 assert(source._isComplete); | |
548 bool hasError = source._hasError; | |
549 if (listeners == null) { | |
550 if (hasError) { | |
551 AsyncError asyncError = source._error; | |
552 source._zone.handleUncaughtError( | |
553 asyncError.error, asyncError.stackTrace); | |
554 } | |
555 return; | |
556 } | |
557 // Usually futures only have one listener. If they have several, we | |
558 // call handle them separately in recursive calls, continuing | |
559 // here only when there is only one listener left. | |
560 while (listeners._nextListener != null) { | |
561 _FutureListener listener = listeners; | |
562 listeners = listener._nextListener; | |
563 listener._nextListener = null; | |
564 _propagateToListeners(source, listener); | |
565 } | |
566 _FutureListener listener = listeners; | |
567 final sourceResult = source._resultOrListeners; | |
568 // Do the actual propagation. | |
569 // Set initial state of listenerHasError and listenerValueOrError. These | |
570 // variables are updated with the outcome of potential callbacks. | |
571 // Non-error results, including futures, are stored in | |
572 // listenerValueOrError and listenerHasError is set to false. Errors | |
573 // are stored in listenerValueOrError as an [AsyncError] and | |
574 // listenerHasError is set to true. | |
575 bool listenerHasError = hasError; | |
576 var listenerValueOrError = sourceResult; | |
577 | |
578 // Only if we either have an error or callbacks, go into this, somewhat | |
579 // expensive, branch. Here we'll enter/leave the zone. Many futures | |
580 // don't have callbacks, so this is a significant optimization. | |
581 if (hasError || listener.handlesValue || listener.handlesComplete) { | |
582 Zone zone = listener._zone; | |
583 if (hasError && !source._zone.inSameErrorZone(zone)) { | |
584 // Don't cross zone boundaries with errors. | |
585 AsyncError asyncError = source._error; | |
586 source._zone.handleUncaughtError( | |
587 asyncError.error, asyncError.stackTrace); | |
588 return; | |
589 } | |
590 | |
591 Zone oldZone; | |
592 if (!identical(Zone.current, zone)) { | |
593 // Change zone if it's not current. | |
594 oldZone = Zone._enter(zone); | |
595 } | |
596 | |
597 // These callbacks are abstracted to isolate the try/catch blocks | |
598 // from the rest of the code to work around a V8 glass jaw. | |
599 void handleWhenCompleteCallback() { | |
600 // The whenComplete-handler is not combined with normal value/error | |
601 // handling. This means at most one handleX method is called per | |
602 // listener. | |
603 assert(!listener.handlesValue); | |
604 assert(!listener.handlesError); | |
605 var completeResult; | |
606 try { | |
607 completeResult = listener.handleWhenComplete(); | |
608 } catch (e, s) { | |
609 if (hasError && identical(source._error.error, e)) { | |
610 listenerValueOrError = source._error; | |
611 } else { | |
612 listenerValueOrError = new AsyncError(e, s); | |
613 } | |
614 listenerHasError = true; | |
615 return; | |
616 } | |
617 if (completeResult is Future) { | |
618 if (completeResult is _Future && completeResult._isComplete) { | |
619 if (completeResult._hasError) { | |
620 listenerValueOrError = completeResult._error; | |
621 listenerHasError = true; | |
622 } | |
623 // Otherwise use the existing result of source. | |
624 return; | |
625 } | |
626 // We have to wait for the completeResult future to complete | |
627 // before knowing if it's an error or we should use the result | |
628 // of source. | |
629 var originalSource = source; | |
630 listenerValueOrError = completeResult.then((_) => originalSource); | |
631 listenerHasError = false; | |
632 } | |
633 } | |
634 | |
635 void handleValueCallback() { | |
636 try { | |
637 listenerValueOrError = listener.handleValue(sourceResult); | |
638 } catch (e, s) { | |
639 listenerValueOrError = new AsyncError(e, s); | |
640 listenerHasError = true; | |
641 } | |
642 } | |
643 | |
644 void handleError() { | |
645 try { | |
646 AsyncError asyncError = source._error; | |
647 if (listener.matchesErrorTest(asyncError) && | |
648 listener.hasErrorCallback) { | |
649 listenerValueOrError = listener.handleError(asyncError); | |
650 listenerHasError = false; | |
651 } | |
652 } catch (e, s) { | |
653 if (identical(source._error.error, e)) { | |
654 listenerValueOrError = source._error; | |
655 } else { | |
656 listenerValueOrError = new AsyncError(e, s); | |
657 } | |
658 listenerHasError = true; | |
659 } | |
660 } | |
661 | |
662 | |
663 if (listener.handlesComplete) { | |
664 handleWhenCompleteCallback(); | |
665 } else if (!hasError) { | |
666 if (listener.handlesValue) { | |
667 handleValueCallback(); | |
668 } | |
669 } else { | |
670 if (listener.handlesError) { | |
671 handleError(); | |
672 } | |
673 } | |
674 | |
675 // If we changed zone, oldZone will not be null. | |
676 if (oldZone != null) Zone._leave(oldZone); | |
677 | |
678 // If the listener's value is a future we need to chain it. Note that | |
679 // this can only happen if there is a callback. | |
680 if (listenerValueOrError is Future) { | |
681 Future chainSource = listenerValueOrError; | |
682 // Shortcut if the chain-source is already completed. Just continue | |
683 // the loop. | |
684 _Future result = listener.result; | |
685 if (chainSource is _Future) { | |
686 if (chainSource._isComplete) { | |
687 listeners = result._removeListeners(); | |
688 result._cloneResult(chainSource); | |
689 source = chainSource; | |
690 continue; | |
691 } else { | |
692 _chainCoreFuture(chainSource, result); | |
693 } | |
694 } else { | |
695 _chainForeignFuture(chainSource, result); | |
696 } | |
697 return; | |
698 } | |
699 } | |
700 _Future result = listener.result; | |
701 listeners = result._removeListeners(); | |
702 if (!listenerHasError) { | |
703 result._setValue(listenerValueOrError); | |
704 } else { | |
705 AsyncError asyncError = listenerValueOrError; | |
706 result._setErrorObject(asyncError); | |
707 } | |
708 // Prepare for next round. | |
709 source = result; | |
710 } | |
711 } | |
712 | |
713 Future<T> timeout(Duration timeLimit, {onTimeout()}) { | |
714 if (_isComplete) return new _Future.immediate(this); | |
715 _Future<T> result = new _Future<T>(); | |
716 Timer timer; | |
717 if (onTimeout == null) { | |
718 timer = new Timer(timeLimit, () { | |
719 result._completeError(new TimeoutException("Future not completed", | |
720 timeLimit)); | |
721 }); | |
722 } else { | |
723 Zone zone = Zone.current; | |
724 onTimeout = zone.registerCallback(onTimeout); | |
725 timer = new Timer(timeLimit, () { | |
726 try { | |
727 result._complete(zone.run(onTimeout)); | |
728 } catch (e, s) { | |
729 result._completeError(e, s); | |
730 } | |
731 }); | |
732 } | |
733 this.then((T v) { | |
734 if (timer.isActive) { | |
735 timer.cancel(); | |
736 result._completeWithValue(v); | |
737 } | |
738 }, onError: (e, s) { | |
739 if (timer.isActive) { | |
740 timer.cancel(); | |
741 result._completeError(e, s); | |
742 } | |
743 }); | |
744 return result; | |
745 } | |
746 } | |
OLD | NEW |