Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(167)

Unified Diff: sdk/lib/async/future_impl.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/future.dart ('k') | sdk/lib/async/merge_stream.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/future_impl.dart
diff --git a/sdk/lib/async/future_impl.dart b/sdk/lib/async/future_impl.dart
new file mode 100644
index 0000000000000000000000000000000000000000..bf071e0e7bc60b50e2d59859d06090f20855463f
--- /dev/null
+++ b/sdk/lib/async/future_impl.dart
@@ -0,0 +1,460 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// part of dart.async;
+
+deprecatedFutureValue(_FutureImpl future) =>
+ future._isComplete ? future._resultOrListeners : null;
+
+
+class _CompleterImpl<T> implements Completer<T> {
+ final Future<T> future;
+ bool _isComplete = false;
+
+ _CompleterImpl() : future = new _FutureImpl<T>();
+
+ void complete(T value) {
+ if (_isComplete) throw new StateError("Future already completed");
+ _isComplete = true;
+ _FutureImpl future = this.future;
+ future._setValue(value);
+ }
+
+ void completeError(Object error, [Object stackTrace = null]) {
+ if (_isComplete) throw new StateError("Future already completed");
+ _isComplete = true;
+ new Timer(0, (_) {
+ // Never complete an error in the same cycle. Otherwise users might
+ // not have a chance to register their error-handlers.
+ _FutureImpl future = this.future;
+ future._setError(new AsyncError(error, stackTrace));
+ });
+ }
+}
+
+/**
+ * A listener on a future.
+ *
+ * When the future completes, the [_sendValue] or [_sendError] method
+ * is invoked with the result.
+ *
+ * Listeners are kept in a linked list.
+ */
+abstract class _FutureListener<T> {
+ _FutureListener _nextListener;
+ factory _FutureListener.wrap(_FutureImpl future) {
+ return new _FutureListenerWrapper(future);
+ }
+ void _sendValue(T value);
+ void _sendError(AsyncError error);
+}
+
+/** Adapter for a [_FutureImpl] to be a future result listener. */
+class _FutureListenerWrapper<T> implements _FutureListener<T> {
+ _FutureImpl future;
+ _FutureListener _nextListener;
+ _FutureListenerWrapper(this.future);
+ _sendValue(T value) { future._setValue(value); }
+ _sendError(AsyncError error) { future._setError(error); }
+}
+
+class _FutureImpl<T> implements Future<T> {
+ static const int _INCOMPLETE = 0;
+ static const int _VALUE = 1;
+ static const int _ERROR = 2;
+
+ /** Whether the future is complete, and as what. */
+ int _state = _INCOMPLETE;
+
+ bool get _isComplete => _state != _INCOMPLETE;
+ bool get _hasValue => _state == _VALUE;
+ bool get _hasError => _state == _ERROR;
+
+ /**
+ * Either the result, or a list of listeners until the future completes.
+ *
+ * The result of the future is either a value or an [AsyncError].
+ * A result is only stored when the future has completed.
+ *
+ * The listeners is an internally linked list of [_FutureListener]s.
+ * Listeners are only remembered while the future is not yet complete.
+ *
+ * Since the result and the listeners cannot occur at the same time,
+ * we can use the same field for both.
+ */
+ var _resultOrListeners;
+
+ _FutureImpl();
+
+ _FutureImpl.immediate(T value) {
+ _state = _VALUE;
+ _resultOrListeners = value;
+ }
+
+ _FutureImpl.immediateError(var error, [Object stackTrace]) {
+ new Timer(0, (_) { _setError(new AsyncError(error, stackTrace)); });
+ }
+
+ factory _FutureImpl.wait(Iterable<Future> futures) {
+ // TODO(ajohnsen): can we do better wrt the generic type T?
+ if (futures.isEmpty) {
+ return new Future<List>.immediate(const []);
+ }
+
+ Completer completer = new Completer<List>();
+ int remaining = futures.length;
+ List values = new List.fixedLength(futures.length);
+
+ // As each future completes, put its value into the corresponding
+ // position in the list of values.
+ int i = 0;
+ for (Future future in futures) {
+ int pos = i++;
+ future.then((Object value) {
+ values[pos] = value;
+ if (--remaining == 0) {
+ completer.complete(values);
+ }
+ });
+ future.catchError((error) {
+ completer.completeError(error.error, error.stackTrace);
+ });
+ }
+
+ return completer.future;
+ }
+
+ Future then(f(T value), { onError(AsyncError error) }) {
+ if (!_isComplete) {
+ if (onError == null) {
+ return new _ThenFuture(f).._subscribeTo(this);
+ }
+ return new _SubscribeFuture(f, onError).._subscribeTo(this);
+ }
+ if (_hasError) {
+ if (onError != null) {
+ return _handleError(onError, null);
+ }
+ // The "f" funtion will never be called, so just return
+ // a future that delegates to this. We don't want to return
+ // this itself to give a signal that the future is complete.
+ return new _FutureWrapper(this);
+ } else {
+ assert(_hasValue);
+ return _handleValue(f);
+ }
+ }
+
+ Future catchError(f(AsyncError asyncError), { bool test(error) }) {
+ if (_hasValue) {
+ return new _FutureWrapper(this);
+ }
+ if (!_isComplete) {
+ return new _CatchErrorFuture(f, test).._subscribeTo(this);
+ } else {
+ return _handleError(f, test);
+ }
+ }
+
+ Future<T> whenComplete(void action()) {
+ _WhenFuture<T> whenFuture = new _WhenFuture<T>(action);
+ if (!_isComplete) {
+ _addListener(whenFuture);
+ } else if (_hasValue) {
+ new Timer(0, (_) {
+ T value = _resultOrListeners;
+ whenFuture._sendValue(value);
+ });
+ } else {
+ assert(_hasError);
+ new Timer(0, (_) {
+ AsyncError error = _resultOrListeners;
+ whenFuture._sendError(error);
+ });
+ }
+ return whenFuture;
+ }
+
+ Future _handleValue(onValue(var value)) {
+ assert(_hasValue);
+ _ThenFuture thenFuture = new _ThenFuture(onValue);
+ T value = _resultOrListeners;
+ new Timer(0, (_) { thenFuture._sendValue(value); });
+ return thenFuture;
+ }
+
+ Future _handleError(onError(AsyncError error), bool test(error)) {
+ assert(_hasError);
+ AsyncError error = _resultOrListeners;
+ _CatchErrorFuture errorFuture = new _CatchErrorFuture(onError, test);
+ new Timer(0, (_) { errorFuture._sendError(error); });
+ return errorFuture;
+ }
+
+ Stream<T> asStream() => new Stream.fromFuture(this);
+
+ void _setValue(T value) {
+ if (_state != _INCOMPLETE) throw new StateError("Future already completed");
+ _FutureListener listeners = _removeListeners();
+ _state = _VALUE;
+ _resultOrListeners = value;
+ while (listeners != null) {
+ _FutureListener listener = listeners;
+ listeners = listener._nextListener;
+ listener._nextListener = null;
+ listener._sendValue(value);
+ }
+ }
+
+ void _setError(AsyncError error) {
+ if (_isComplete) throw new StateError("Future already completed");
+ _FutureListener listeners = _removeListeners();
+ _state = _ERROR;
+ _resultOrListeners = error;
+ if (listeners == null) {
+ error.throwDelayed();
+ return;
+ }
+ while (listeners != null) {
+ _FutureListener listener = listeners;
+ listeners = listener._nextListener;
+ listener._nextListener = null;
+ listener._sendError(error);
+ }
+ }
+
+ void _addListener(_FutureListener listener) {
+ assert(!_isComplete);
+ assert(listener._nextListener == null);
+ listener._nextListener = _resultOrListeners;
+ _resultOrListeners = listener;
+ }
+
+ _FutureListener _removeListeners() {
+ // Reverse listeners before returning them, so the resulting list is in
+ // subscription order.
+ assert(!_isComplete);
+ _FutureListener current = _resultOrListeners;
+ _resultOrListeners = null;
+ _FutureListener prev = null;
+ while (current != null) {
+ _FutureListener next = current._nextListener;
+ current._nextListener = prev;
+ prev = current;
+ current = next;
+ }
+ return prev;
+ }
+
+ /**
+ * Make another [_FutureImpl] receive the result of this one.
+ *
+ * If this future is already complete, the [future] is notified
+ * immediately. This function is only called during event resolution
+ * where it's acceptable to send an event.
+ */
+ void _chain(_FutureImpl future) {
+ if (!_isComplete) {
+ _addListener(future._asListener());
+ } else if (_hasValue) {
+ future._setValue(_resultOrListeners);
+ } else {
+ assert(_hasError);
+ future._setError(_resultOrListeners);
+ }
+ }
+
+ _FutureListener _asListener() => new _FutureListener.wrap(this);
+}
+
+/**
+ * Transforming future base class.
+ *
+ * A transforming future is itself a future and a future listener.
+ * Subclasses override [_sendValue]/[_sendError] to intercept
+ * the results of a previous future.
+ */
+abstract class _TransformFuture<S, T> extends _FutureImpl<T>
+ implements _FutureListener<S> {
+ // _FutureListener implementation.
+ _FutureListener _nextListener;
+
+ void _sendValue(S value);
+
+ void _sendError(AsyncError error);
+
+ void _subscribeTo(_FutureImpl future) {
+ future._addListener(this);
+ }
+
+ /**
+ * Helper function to hand the result of transforming an incoming event.
+ *
+ * If the result is itself a [Future], this future is linked to that
+ * future's output. If not, this future is completed with the result.
+ */
+ void _setOrChainValue(var result) {
+ if (result is Future) {
+ // Result should be a Future<T>.
+ if (result is _FutureImpl) {
+ _FutureImpl chainFuture = result;
+ chainFuture._chain(this);
+ return;
+ } else {
+ Future future = result;
+ future.then(_setValue,
+ onError: _setError);
+ return;
+ }
+ } else {
+ // Result must be of type T.
+ _setValue(result);
+ }
+ }
+}
+
+/** The onValue and onError handlers return either a value or a future */
+typedef dynamic _FutureOnValue<T>(T value);
+typedef dynamic _FutureOnError(AsyncError error);
+/** Test used by [Future.catchError] to handle skip some errors. */
+typedef bool _FutureErrorTest(var error);
+/** Used by [WhenFuture]. */
+typedef void _FutureAction();
+
+/** Future returned by [Future.then] with no [:onError:] parameter. */
+class _ThenFuture<S, T> extends _TransformFuture<S, T> {
+ final _FutureOnValue<S> _onValue;
+
+ _ThenFuture(this._onValue);
+
+ _sendValue(S value) {
+ assert(_onValue != null);
+ var result;
+ try {
+ result = _onValue(value);
+ } catch (e, s) {
+ _setError(new AsyncError(e, s));
+ return;
+ }
+ _setOrChainValue(result);
+ }
+
+ void _sendError(AsyncError error) {
+ _setError(error);
+ }
+}
+
+/** Future returned by [Future.catchError]. */
+class _CatchErrorFuture<T> extends _TransformFuture<T,T> {
+ final _FutureErrorTest _test;
+ final _FutureOnError _onError;
+
+ _CatchErrorFuture(this._onError, this._test);
+
+ _sendValue(T value) {
+ _setValue(value);
+ }
+
+ _sendError(AsyncError error) {
+ assert(_onError != null);
+ // if _test is supplied, check if it returns true, otherwise just
+ // forward the error unmodified.
+ if (_test != null) {
+ bool matchesTest;
+ try {
+ matchesTest = _test(error.error);
+ } catch (e, s) {
+ _setError(new AsyncError.withCause(e, s, error));
+ return;
+ }
+ if (!matchesTest) {
+ _setError(error);
+ return;
+ }
+ }
+ // Act on the error, and use the result as this future's result.
+ var result;
+ try {
+ result = _onError(error);
+ } catch (e, s) {
+ _setError(new AsyncError.withCause(e, s, error));
+ return;
+ }
+ _setOrChainValue(result);
+ }
+}
+
+/** Future returned by [Future.then] with an [:onError:] parameter. */
+class _SubscribeFuture<S, T> extends _ThenFuture<S, T> {
+ final _FutureOnError _onError;
+
+ _SubscribeFuture(onValue(S value), this._onError) : super(onValue);
+
+ // The _sendValue method is inherited from ThenFuture.
+
+ void _sendError(AsyncError error) {
+ assert(_onError != null);
+ var result;
+ try {
+ result = _onError(error);
+ } catch (e, s) {
+ _setError(new AsyncError.withCause(e, s, error));
+ return;
+ }
+ _setOrChainValue(result);
+ }
+}
+
+/** Future returned by [Future.whenComplete]. */
+class _WhenFuture<T> extends _TransformFuture<T, T> {
+ final _FutureAction _action;
+
+ _WhenFuture(this._action);
+
+ void _sendValue(T value) {
+ try {
+ _action();
+ } catch (e, s) {
+ _setError(new AsyncError(e, s));
+ return;
+ }
+ _setValue(value);
+ }
+
+ void _sendError(AsyncError error) {
+ try {
+ _action();
+ } catch (e, s) {
+ error = new AsyncError.withCause(e, s, error);
+ }
+ _setError(error);
+ }
+}
+
+/**
+ * Thin wrapper around a [Future].
+ *
+ * This is used to return a "new" [Future] that effectively work just
+ * as an existing [Future], without making this discoverable by comparing
+ * identities.
+ */
+class _FutureWrapper<T> implements Future<T> {
+ final Future<T> _future;
+
+ _FutureWrapper(this._future);
+
+ Future then(function(T value), { onError(AsyncError error) }) {
+ return _future.then(function, onError: onError);
+ }
+
+ Future catchError(function(AsyncError error), {bool test(var error)}) {
+ return _future.catchError(function, test: test);
+ }
+
+ Future whenComplete(void action()) {
+ return _future.whenComplete(action);
+ }
+
+ Stream<T> asStream() => new Stream.fromFuture(this);
+}
« no previous file with comments | « sdk/lib/async/future.dart ('k') | sdk/lib/async/merge_stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698