Index: sdk/lib/async/future_impl.dart |
diff --git a/sdk/lib/async/future_impl.dart b/sdk/lib/async/future_impl.dart |
index f27b34dce6c77990bee147b542f7e172ba205870..68dbbc041e7f49408f1edec2aa481a3cf0805d93 100644 |
--- a/sdk/lib/async/future_impl.dart |
+++ b/sdk/lib/async/future_impl.dart |
@@ -95,11 +95,6 @@ class _FutureListener { |
errorCallback = null, |
state = STATE_WHENCOMPLETE; |
- _FutureListener.chain(this.result) |
- : callback = null, |
- errorCallback = null, |
- state = STATE_CHAIN; |
- |
Zone get _zone => result._zone; |
bool get handlesValue => (state & MASK_VALUE != 0); |
@@ -133,8 +128,7 @@ class _Future<T> implements Future<T> { |
static const int _PENDING_COMPLETE = 1; |
/// The future has been chained to another future. The result of that |
/// other future becomes the result of this future as well. |
- // TODO(floitsch): we don't really need a special "_CHAINED" state. We could |
- // just use the PENDING_COMPLETE state instead. |
+ /// [resultOrListeners] contains the source future. |
static const int _CHAINED = 2; |
/// The future has been completed with a value result. |
static const int _VALUE = 4; |
@@ -167,11 +161,6 @@ class _Future<T> implements Future<T> { |
* is waiting for the other future to complete, and when it does, this future |
* will complete with the same result. |
* All listeners are forwarded to the other future. |
- * |
- * The cases are disjoint - incomplete and unchained ([_INCOMPLETE]), |
- * incomplete and chained ([_CHAINED]), or completed with value or error |
- * ([_VALUE] or [_ERROR]) - so the field only needs to hold |
- * one value at a time. |
*/ |
var _resultOrListeners; |
@@ -189,14 +178,15 @@ class _Future<T> implements Future<T> { |
bool get _mayComplete => _state == _INCOMPLETE; |
bool get _isPendingComplete => _state == _PENDING_COMPLETE; |
+ bool get _mayAddListener => _state <= _PENDING_COMPLETE; |
bool get _isChained => _state == _CHAINED; |
bool get _isComplete => _state >= _VALUE; |
- bool get _hasValue => _state == _VALUE; |
bool get _hasError => _state == _ERROR; |
- void _setChained() { |
- assert(!_isComplete); |
+ void _setChained(_Future source) { |
+ assert(_mayAddListener); |
_state = _CHAINED; |
+ _resultOrListeners = source; |
} |
Future then(f(T value), { Function onError }) { |
@@ -238,18 +228,18 @@ class _Future<T> implements Future<T> { |
Stream<T> asStream() => new Stream<T>.fromFuture(this); |
- void _markPendingCompletion() { |
- if (!_mayComplete) throw new StateError("Future already completed"); |
+ void _setPendingComplete() { |
+ assert(_mayComplete); |
_state = _PENDING_COMPLETE; |
} |
- T get _value { |
- assert(_isComplete && _hasValue); |
- return _resultOrListeners; |
- } |
- |
AsyncError get _error { |
- assert(_isComplete && _hasError); |
+ assert(_hasError); |
+ return _resultOrListeners; |
+ } |
+ |
+ _Future get _chainSource { |
+ assert(_isChained); |
return _resultOrListeners; |
} |
@@ -270,16 +260,70 @@ class _Future<T> implements Future<T> { |
_setErrorObject(new AsyncError(error, stackTrace)); |
} |
+ /// Copy the completion result of [source] into this future. |
+ /// |
+ /// Used when a chained future notices that its source is completed. |
+ void _cloneResult(_Future source) { |
+ assert(!_isComplete); |
+ assert(source._isComplete); |
+ _state = source._state; |
+ _resultOrListeners = source._resultOrListeners; |
+ } |
+ |
void _addListener(_FutureListener listener) { |
assert(listener._nextListener == null); |
- if (_isComplete) { |
- // Handle late listeners asynchronously. |
- _zone.scheduleMicrotask(() { |
- _propagateToListeners(this, listener); |
- }); |
- } else { |
+ if (_mayAddListener) { |
listener._nextListener = _resultOrListeners; |
_resultOrListeners = listener; |
+ } else { |
+ if (_isChained) { |
+ // Delegate listeners to chained source future. |
+ // If the source is complete, instead copy its values and |
+ // drop the chaining. |
+ _Future source = _chainSource; |
+ if (!source._isComplete) { |
+ source._addListener(listener); |
+ return; |
+ } |
+ _cloneResult(source); |
+ } |
+ assert(_isComplete); |
+ // Handle late listeners asynchronously. |
+ _zone.scheduleMicrotask(() { |
+ _propagateToListeners(this, listener); |
+ }); |
+ } |
+ } |
+ |
+ void _prependListeners(_FutureListener listeners) { |
+ if (listeners == null) return; |
+ if (_mayAddListener) { |
+ _FutureListener existingListeners = _resultOrListeners; |
+ _resultOrListeners = listeners; |
+ if (existingListeners != null) { |
+ _FutureListener cursor = listeners; |
+ while (cursor._nextListener != null) { |
+ cursor = cursor._nextListener; |
+ } |
+ cursor._nextListener = existingListeners; |
+ } |
+ } else { |
+ if (_isChained) { |
+ // Delegate listeners to chained source future. |
+ // If the source is complete, instead copy its values and |
+ // drop the chaining. |
+ _Future source = _chainSource; |
+ if (!source._isComplete) { |
+ source._prependListeners(listeners); |
+ return; |
+ } |
+ _cloneResult(source); |
+ } |
+ assert(_isComplete); |
+ listeners = _reverseListeners(listeners); |
+ _zone.scheduleMicrotask(() { |
+ _propagateToListeners(this, listeners); |
+ }); |
} |
} |
@@ -289,7 +333,12 @@ class _Future<T> implements Future<T> { |
assert(!_isComplete); |
_FutureListener current = _resultOrListeners; |
_resultOrListeners = null; |
+ return _reverseListeners(current); |
+ } |
+ |
+ _FutureListener _reverseListeners(_FutureListener listeners) { |
_FutureListener prev = null; |
+ _FutureListener current = listeners; |
while (current != null) { |
_FutureListener next = current._nextListener; |
current._nextListener = prev; |
@@ -308,10 +357,10 @@ class _Future<T> implements Future<T> { |
assert(source is! _Future); |
// Mark the target as chained (and as such half-completed). |
- target._setChained(); |
+ target._setPendingComplete(); |
try { |
source.then((value) { |
- assert(target._isChained); |
+ assert(target._isPendingComplete); |
target._completeWithValue(value); |
}, |
// TODO(floitsch): eventually we would like to make this non-optional |
@@ -319,7 +368,7 @@ class _Future<T> implements Future<T> { |
// the target future's listeners want to have the stack trace we don't |
// need a trace. |
onError: (error, [stackTrace]) { |
- assert(target._isChained); |
+ assert(target._isPendingComplete); |
target._completeError(error, stackTrace); |
}); |
} catch (e, s) { |
@@ -336,16 +385,18 @@ class _Future<T> implements Future<T> { |
// Take the value (when completed) of source and complete target with that |
// value (or error). This function expects that source is a _Future. |
static void _chainCoreFuture(_Future source, _Future target) { |
- assert(!target._isComplete); |
- assert(source is _Future); |
- |
- // Mark the target as chained (and as such half-completed). |
- target._setChained(); |
- _FutureListener listener = new _FutureListener.chain(target); |
+ assert(target._mayAddListener); // Not completed, not already chained. |
+ while (source._isChained) { |
+ source = source._chainSource; |
+ } |
if (source._isComplete) { |
- _propagateToListeners(source, listener); |
+ _FutureListener listeners = target._removeListeners(); |
+ target._cloneResult(source); |
+ _propagateToListeners(target, listeners); |
} else { |
- source._addListener(listener); |
+ _FutureListener listeners = target._resultOrListeners; |
+ target._setChained(source); |
+ source._prependListeners(listeners); |
} |
} |
@@ -404,7 +455,7 @@ class _Future<T> implements Future<T> { |
if (coreFuture._hasError) { |
// Case 1 from above. Delay completion to enable the user to register |
// callbacks. |
- _markPendingCompletion(); |
+ _setPendingComplete(); |
_zone.scheduleMicrotask(() { |
_chainCoreFuture(coreFuture, this); |
}); |
@@ -420,9 +471,10 @@ class _Future<T> implements Future<T> { |
return; |
} else { |
T typedValue = value; |
+ assert(typedValue is T); // Avoid warning that typedValue is unused. |
} |
- _markPendingCompletion(); |
+ _setPendingComplete(); |
_zone.scheduleMicrotask(() { |
_completeWithValue(value); |
}); |
@@ -431,7 +483,7 @@ class _Future<T> implements Future<T> { |
void _asyncCompleteError(error, StackTrace stackTrace) { |
assert(!_isComplete); |
- _markPendingCompletion(); |
+ _setPendingComplete(); |
_zone.scheduleMicrotask(() { |
_completeError(error, stackTrace); |
}); |
@@ -463,11 +515,15 @@ class _Future<T> implements Future<T> { |
_propagateToListeners(source, listener); |
} |
_FutureListener listener = listeners; |
+ final sourceResult = source._resultOrListeners; |
// Do the actual propagation. |
// Set initial state of listenerHasError and listenerValueOrError. These |
// variables are updated with the outcome of potential callbacks. |
+ // Non-error results, including futures, are stored in |
+ // listenerValueOrError and listenerHasError is set to false. Errors |
+ // are stored in listenerValueOrError as an [AsyncError] and |
+ // listenerHasError is set to true. |
bool listenerHasError = hasError; |
- final sourceResult = source._resultOrListeners; |
var listenerValueOrError = sourceResult; |
// Only if we either have an error or callbacks, go into this, somewhat |
@@ -598,10 +654,9 @@ class _Future<T> implements Future<T> { |
_Future result = listener.result; |
if (chainSource is _Future) { |
if (chainSource._isComplete) { |
- // propagate the value (simulating a tail call). |
- result._setChained(); |
+ listeners = result._removeListeners(); |
+ result._cloneResult(chainSource); |
source = chainSource; |
- listeners = new _FutureListener.chain(result); |
continue; |
} else { |
_chainCoreFuture(chainSource, result); |