| Index: sdk/lib/async/future_impl.dart
|
| diff --git a/sdk/lib/async/future_impl.dart b/sdk/lib/async/future_impl.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..bf071e0e7bc60b50e2d59859d06090f20855463f
|
| --- /dev/null
|
| +++ b/sdk/lib/async/future_impl.dart
|
| @@ -0,0 +1,460 @@
|
| +// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// part of dart.async;
|
| +
|
| +deprecatedFutureValue(_FutureImpl future) =>
|
| + future._isComplete ? future._resultOrListeners : null;
|
| +
|
| +
|
| +class _CompleterImpl<T> implements Completer<T> {
|
| + final Future<T> future;
|
| + bool _isComplete = false;
|
| +
|
| + _CompleterImpl() : future = new _FutureImpl<T>();
|
| +
|
| + void complete(T value) {
|
| + if (_isComplete) throw new StateError("Future already completed");
|
| + _isComplete = true;
|
| + _FutureImpl future = this.future;
|
| + future._setValue(value);
|
| + }
|
| +
|
| + void completeError(Object error, [Object stackTrace = null]) {
|
| + if (_isComplete) throw new StateError("Future already completed");
|
| + _isComplete = true;
|
| + new Timer(0, (_) {
|
| + // Never complete an error in the same cycle. Otherwise users might
|
| + // not have a chance to register their error-handlers.
|
| + _FutureImpl future = this.future;
|
| + future._setError(new AsyncError(error, stackTrace));
|
| + });
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * A listener on a future.
|
| + *
|
| + * When the future completes, the [_sendValue] or [_sendError] method
|
| + * is invoked with the result.
|
| + *
|
| + * Listeners are kept in a linked list.
|
| + */
|
| +abstract class _FutureListener<T> {
|
| + _FutureListener _nextListener;
|
| + factory _FutureListener.wrap(_FutureImpl future) {
|
| + return new _FutureListenerWrapper(future);
|
| + }
|
| + void _sendValue(T value);
|
| + void _sendError(AsyncError error);
|
| +}
|
| +
|
| +/** Adapter for a [_FutureImpl] to be a future result listener. */
|
| +class _FutureListenerWrapper<T> implements _FutureListener<T> {
|
| + _FutureImpl future;
|
| + _FutureListener _nextListener;
|
| + _FutureListenerWrapper(this.future);
|
| + _sendValue(T value) { future._setValue(value); }
|
| + _sendError(AsyncError error) { future._setError(error); }
|
| +}
|
| +
|
| +class _FutureImpl<T> implements Future<T> {
|
| + static const int _INCOMPLETE = 0;
|
| + static const int _VALUE = 1;
|
| + static const int _ERROR = 2;
|
| +
|
| + /** Whether the future is complete, and as what. */
|
| + int _state = _INCOMPLETE;
|
| +
|
| + bool get _isComplete => _state != _INCOMPLETE;
|
| + bool get _hasValue => _state == _VALUE;
|
| + bool get _hasError => _state == _ERROR;
|
| +
|
| + /**
|
| + * Either the result, or a list of listeners until the future completes.
|
| + *
|
| + * The result of the future is either a value or an [AsyncError].
|
| + * A result is only stored when the future has completed.
|
| + *
|
| + * The listeners is an internally linked list of [_FutureListener]s.
|
| + * Listeners are only remembered while the future is not yet complete.
|
| + *
|
| + * Since the result and the listeners cannot occur at the same time,
|
| + * we can use the same field for both.
|
| + */
|
| + var _resultOrListeners;
|
| +
|
| + _FutureImpl();
|
| +
|
| + _FutureImpl.immediate(T value) {
|
| + _state = _VALUE;
|
| + _resultOrListeners = value;
|
| + }
|
| +
|
| + _FutureImpl.immediateError(var error, [Object stackTrace]) {
|
| + new Timer(0, (_) { _setError(new AsyncError(error, stackTrace)); });
|
| + }
|
| +
|
| + factory _FutureImpl.wait(Iterable<Future> futures) {
|
| + // TODO(ajohnsen): can we do better wrt the generic type T?
|
| + if (futures.isEmpty) {
|
| + return new Future<List>.immediate(const []);
|
| + }
|
| +
|
| + Completer completer = new Completer<List>();
|
| + int remaining = futures.length;
|
| + List values = new List.fixedLength(futures.length);
|
| +
|
| + // As each future completes, put its value into the corresponding
|
| + // position in the list of values.
|
| + int i = 0;
|
| + for (Future future in futures) {
|
| + int pos = i++;
|
| + future.then((Object value) {
|
| + values[pos] = value;
|
| + if (--remaining == 0) {
|
| + completer.complete(values);
|
| + }
|
| + });
|
| + future.catchError((error) {
|
| + completer.completeError(error.error, error.stackTrace);
|
| + });
|
| + }
|
| +
|
| + return completer.future;
|
| + }
|
| +
|
| + Future then(f(T value), { onError(AsyncError error) }) {
|
| + if (!_isComplete) {
|
| + if (onError == null) {
|
| + return new _ThenFuture(f).._subscribeTo(this);
|
| + }
|
| + return new _SubscribeFuture(f, onError).._subscribeTo(this);
|
| + }
|
| + if (_hasError) {
|
| + if (onError != null) {
|
| + return _handleError(onError, null);
|
| + }
|
| + // The "f" funtion will never be called, so just return
|
| + // a future that delegates to this. We don't want to return
|
| + // this itself to give a signal that the future is complete.
|
| + return new _FutureWrapper(this);
|
| + } else {
|
| + assert(_hasValue);
|
| + return _handleValue(f);
|
| + }
|
| + }
|
| +
|
| + Future catchError(f(AsyncError asyncError), { bool test(error) }) {
|
| + if (_hasValue) {
|
| + return new _FutureWrapper(this);
|
| + }
|
| + if (!_isComplete) {
|
| + return new _CatchErrorFuture(f, test).._subscribeTo(this);
|
| + } else {
|
| + return _handleError(f, test);
|
| + }
|
| + }
|
| +
|
| + Future<T> whenComplete(void action()) {
|
| + _WhenFuture<T> whenFuture = new _WhenFuture<T>(action);
|
| + if (!_isComplete) {
|
| + _addListener(whenFuture);
|
| + } else if (_hasValue) {
|
| + new Timer(0, (_) {
|
| + T value = _resultOrListeners;
|
| + whenFuture._sendValue(value);
|
| + });
|
| + } else {
|
| + assert(_hasError);
|
| + new Timer(0, (_) {
|
| + AsyncError error = _resultOrListeners;
|
| + whenFuture._sendError(error);
|
| + });
|
| + }
|
| + return whenFuture;
|
| + }
|
| +
|
| + Future _handleValue(onValue(var value)) {
|
| + assert(_hasValue);
|
| + _ThenFuture thenFuture = new _ThenFuture(onValue);
|
| + T value = _resultOrListeners;
|
| + new Timer(0, (_) { thenFuture._sendValue(value); });
|
| + return thenFuture;
|
| + }
|
| +
|
| + Future _handleError(onError(AsyncError error), bool test(error)) {
|
| + assert(_hasError);
|
| + AsyncError error = _resultOrListeners;
|
| + _CatchErrorFuture errorFuture = new _CatchErrorFuture(onError, test);
|
| + new Timer(0, (_) { errorFuture._sendError(error); });
|
| + return errorFuture;
|
| + }
|
| +
|
| + Stream<T> asStream() => new Stream.fromFuture(this);
|
| +
|
| + void _setValue(T value) {
|
| + if (_state != _INCOMPLETE) throw new StateError("Future already completed");
|
| + _FutureListener listeners = _removeListeners();
|
| + _state = _VALUE;
|
| + _resultOrListeners = value;
|
| + while (listeners != null) {
|
| + _FutureListener listener = listeners;
|
| + listeners = listener._nextListener;
|
| + listener._nextListener = null;
|
| + listener._sendValue(value);
|
| + }
|
| + }
|
| +
|
| + void _setError(AsyncError error) {
|
| + if (_isComplete) throw new StateError("Future already completed");
|
| + _FutureListener listeners = _removeListeners();
|
| + _state = _ERROR;
|
| + _resultOrListeners = error;
|
| + if (listeners == null) {
|
| + error.throwDelayed();
|
| + return;
|
| + }
|
| + while (listeners != null) {
|
| + _FutureListener listener = listeners;
|
| + listeners = listener._nextListener;
|
| + listener._nextListener = null;
|
| + listener._sendError(error);
|
| + }
|
| + }
|
| +
|
| + void _addListener(_FutureListener listener) {
|
| + assert(!_isComplete);
|
| + assert(listener._nextListener == null);
|
| + listener._nextListener = _resultOrListeners;
|
| + _resultOrListeners = listener;
|
| + }
|
| +
|
| + _FutureListener _removeListeners() {
|
| + // Reverse listeners before returning them, so the resulting list is in
|
| + // subscription order.
|
| + assert(!_isComplete);
|
| + _FutureListener current = _resultOrListeners;
|
| + _resultOrListeners = null;
|
| + _FutureListener prev = null;
|
| + while (current != null) {
|
| + _FutureListener next = current._nextListener;
|
| + current._nextListener = prev;
|
| + prev = current;
|
| + current = next;
|
| + }
|
| + return prev;
|
| + }
|
| +
|
| + /**
|
| + * Make another [_FutureImpl] receive the result of this one.
|
| + *
|
| + * If this future is already complete, the [future] is notified
|
| + * immediately. This function is only called during event resolution
|
| + * where it's acceptable to send an event.
|
| + */
|
| + void _chain(_FutureImpl future) {
|
| + if (!_isComplete) {
|
| + _addListener(future._asListener());
|
| + } else if (_hasValue) {
|
| + future._setValue(_resultOrListeners);
|
| + } else {
|
| + assert(_hasError);
|
| + future._setError(_resultOrListeners);
|
| + }
|
| + }
|
| +
|
| + _FutureListener _asListener() => new _FutureListener.wrap(this);
|
| +}
|
| +
|
| +/**
|
| + * Transforming future base class.
|
| + *
|
| + * A transforming future is itself a future and a future listener.
|
| + * Subclasses override [_sendValue]/[_sendError] to intercept
|
| + * the results of a previous future.
|
| + */
|
| +abstract class _TransformFuture<S, T> extends _FutureImpl<T>
|
| + implements _FutureListener<S> {
|
| + // _FutureListener implementation.
|
| + _FutureListener _nextListener;
|
| +
|
| + void _sendValue(S value);
|
| +
|
| + void _sendError(AsyncError error);
|
| +
|
| + void _subscribeTo(_FutureImpl future) {
|
| + future._addListener(this);
|
| + }
|
| +
|
| + /**
|
| + * Helper function to hand the result of transforming an incoming event.
|
| + *
|
| + * If the result is itself a [Future], this future is linked to that
|
| + * future's output. If not, this future is completed with the result.
|
| + */
|
| + void _setOrChainValue(var result) {
|
| + if (result is Future) {
|
| + // Result should be a Future<T>.
|
| + if (result is _FutureImpl) {
|
| + _FutureImpl chainFuture = result;
|
| + chainFuture._chain(this);
|
| + return;
|
| + } else {
|
| + Future future = result;
|
| + future.then(_setValue,
|
| + onError: _setError);
|
| + return;
|
| + }
|
| + } else {
|
| + // Result must be of type T.
|
| + _setValue(result);
|
| + }
|
| + }
|
| +}
|
| +
|
| +/** The onValue and onError handlers return either a value or a future */
|
| +typedef dynamic _FutureOnValue<T>(T value);
|
| +typedef dynamic _FutureOnError(AsyncError error);
|
| +/** Test used by [Future.catchError] to handle skip some errors. */
|
| +typedef bool _FutureErrorTest(var error);
|
| +/** Used by [WhenFuture]. */
|
| +typedef void _FutureAction();
|
| +
|
| +/** Future returned by [Future.then] with no [:onError:] parameter. */
|
| +class _ThenFuture<S, T> extends _TransformFuture<S, T> {
|
| + final _FutureOnValue<S> _onValue;
|
| +
|
| + _ThenFuture(this._onValue);
|
| +
|
| + _sendValue(S value) {
|
| + assert(_onValue != null);
|
| + var result;
|
| + try {
|
| + result = _onValue(value);
|
| + } catch (e, s) {
|
| + _setError(new AsyncError(e, s));
|
| + return;
|
| + }
|
| + _setOrChainValue(result);
|
| + }
|
| +
|
| + void _sendError(AsyncError error) {
|
| + _setError(error);
|
| + }
|
| +}
|
| +
|
| +/** Future returned by [Future.catchError]. */
|
| +class _CatchErrorFuture<T> extends _TransformFuture<T,T> {
|
| + final _FutureErrorTest _test;
|
| + final _FutureOnError _onError;
|
| +
|
| + _CatchErrorFuture(this._onError, this._test);
|
| +
|
| + _sendValue(T value) {
|
| + _setValue(value);
|
| + }
|
| +
|
| + _sendError(AsyncError error) {
|
| + assert(_onError != null);
|
| + // if _test is supplied, check if it returns true, otherwise just
|
| + // forward the error unmodified.
|
| + if (_test != null) {
|
| + bool matchesTest;
|
| + try {
|
| + matchesTest = _test(error.error);
|
| + } catch (e, s) {
|
| + _setError(new AsyncError.withCause(e, s, error));
|
| + return;
|
| + }
|
| + if (!matchesTest) {
|
| + _setError(error);
|
| + return;
|
| + }
|
| + }
|
| + // Act on the error, and use the result as this future's result.
|
| + var result;
|
| + try {
|
| + result = _onError(error);
|
| + } catch (e, s) {
|
| + _setError(new AsyncError.withCause(e, s, error));
|
| + return;
|
| + }
|
| + _setOrChainValue(result);
|
| + }
|
| +}
|
| +
|
| +/** Future returned by [Future.then] with an [:onError:] parameter. */
|
| +class _SubscribeFuture<S, T> extends _ThenFuture<S, T> {
|
| + final _FutureOnError _onError;
|
| +
|
| + _SubscribeFuture(onValue(S value), this._onError) : super(onValue);
|
| +
|
| + // The _sendValue method is inherited from ThenFuture.
|
| +
|
| + void _sendError(AsyncError error) {
|
| + assert(_onError != null);
|
| + var result;
|
| + try {
|
| + result = _onError(error);
|
| + } catch (e, s) {
|
| + _setError(new AsyncError.withCause(e, s, error));
|
| + return;
|
| + }
|
| + _setOrChainValue(result);
|
| + }
|
| +}
|
| +
|
| +/** Future returned by [Future.whenComplete]. */
|
| +class _WhenFuture<T> extends _TransformFuture<T, T> {
|
| + final _FutureAction _action;
|
| +
|
| + _WhenFuture(this._action);
|
| +
|
| + void _sendValue(T value) {
|
| + try {
|
| + _action();
|
| + } catch (e, s) {
|
| + _setError(new AsyncError(e, s));
|
| + return;
|
| + }
|
| + _setValue(value);
|
| + }
|
| +
|
| + void _sendError(AsyncError error) {
|
| + try {
|
| + _action();
|
| + } catch (e, s) {
|
| + error = new AsyncError.withCause(e, s, error);
|
| + }
|
| + _setError(error);
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * Thin wrapper around a [Future].
|
| + *
|
| + * This is used to return a "new" [Future] that effectively work just
|
| + * as an existing [Future], without making this discoverable by comparing
|
| + * identities.
|
| + */
|
| +class _FutureWrapper<T> implements Future<T> {
|
| + final Future<T> _future;
|
| +
|
| + _FutureWrapper(this._future);
|
| +
|
| + Future then(function(T value), { onError(AsyncError error) }) {
|
| + return _future.then(function, onError: onError);
|
| + }
|
| +
|
| + Future catchError(function(AsyncError error), {bool test(var error)}) {
|
| + return _future.catchError(function, test: test);
|
| + }
|
| +
|
| + Future whenComplete(void action()) {
|
| + return _future.whenComplete(action);
|
| + }
|
| +
|
| + Stream<T> asStream() => new Stream.fromFuture(this);
|
| +}
|
|
|