OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 // part of dart.async; |
| 6 |
| 7 deprecatedFutureValue(_FutureImpl future) => |
| 8 future._isComplete ? future._resultOrListeners : null; |
| 9 |
| 10 |
| 11 class _CompleterImpl<T> implements Completer<T> { |
| 12 final Future<T> future; |
| 13 bool _isComplete = false; |
| 14 |
| 15 _CompleterImpl() : future = new _FutureImpl<T>(); |
| 16 |
| 17 void complete(T value) { |
| 18 if (_isComplete) throw new StateError("Future already completed"); |
| 19 _isComplete = true; |
| 20 _FutureImpl future = this.future; |
| 21 future._setValue(value); |
| 22 } |
| 23 |
| 24 void completeError(Object error, [Object stackTrace = null]) { |
| 25 if (_isComplete) throw new StateError("Future already completed"); |
| 26 _isComplete = true; |
| 27 new Timer(0, (_) { |
| 28 // Never complete an error in the same cycle. Otherwise users might |
| 29 // not have a chance to register their error-handlers. |
| 30 _FutureImpl future = this.future; |
| 31 future._setError(new AsyncError(error, stackTrace)); |
| 32 }); |
| 33 } |
| 34 } |
| 35 |
| 36 /** |
| 37 * A listener on a future. |
| 38 * |
| 39 * When the future completes, the [_sendValue] or [_sendError] method |
| 40 * is invoked with the result. |
| 41 * |
| 42 * Listeners are kept in a linked list. |
| 43 */ |
| 44 abstract class _FutureListener<T> { |
| 45 _FutureListener _nextListener; |
| 46 factory _FutureListener.wrap(_FutureImpl future) { |
| 47 return new _FutureListenerWrapper(future); |
| 48 } |
| 49 void _sendValue(T value); |
| 50 void _sendError(AsyncError error); |
| 51 } |
| 52 |
| 53 /** Adapter for a [_FutureImpl] to be a future result listener. */ |
| 54 class _FutureListenerWrapper<T> implements _FutureListener<T> { |
| 55 _FutureImpl future; |
| 56 _FutureListener _nextListener; |
| 57 _FutureListenerWrapper(this.future); |
| 58 _sendValue(T value) { future._setValue(value); } |
| 59 _sendError(AsyncError error) { future._setError(error); } |
| 60 } |
| 61 |
| 62 class _FutureImpl<T> implements Future<T> { |
| 63 static const int _INCOMPLETE = 0; |
| 64 static const int _VALUE = 1; |
| 65 static const int _ERROR = 2; |
| 66 |
| 67 /** Whether the future is complete, and as what. */ |
| 68 int _state = _INCOMPLETE; |
| 69 |
| 70 bool get _isComplete => _state != _INCOMPLETE; |
| 71 bool get _hasValue => _state == _VALUE; |
| 72 bool get _hasError => _state == _ERROR; |
| 73 |
| 74 /** |
| 75 * Either the result, or a list of listeners until the future completes. |
| 76 * |
| 77 * The result of the future is either a value or an [AsyncError]. |
| 78 * A result is only stored when the future has completed. |
| 79 * |
| 80 * The listeners is an internally linked list of [_FutureListener]s. |
| 81 * Listeners are only remembered while the future is not yet complete. |
| 82 * |
| 83 * Since the result and the listeners cannot occur at the same time, |
| 84 * we can use the same field for both. |
| 85 */ |
| 86 var _resultOrListeners; |
| 87 |
| 88 _FutureImpl(); |
| 89 |
| 90 _FutureImpl.immediate(T value) { |
| 91 _state = _VALUE; |
| 92 _resultOrListeners = value; |
| 93 } |
| 94 |
| 95 _FutureImpl.immediateError(var error, [Object stackTrace]) { |
| 96 new Timer(0, (_) { _setError(new AsyncError(error, stackTrace)); }); |
| 97 } |
| 98 |
| 99 factory _FutureImpl.wait(Iterable<Future> futures) { |
| 100 // TODO(ajohnsen): can we do better wrt the generic type T? |
| 101 if (futures.isEmpty) { |
| 102 return new Future<List>.immediate(const []); |
| 103 } |
| 104 |
| 105 Completer completer = new Completer<List>(); |
| 106 int remaining = futures.length; |
| 107 List values = new List.fixedLength(futures.length); |
| 108 |
| 109 // As each future completes, put its value into the corresponding |
| 110 // position in the list of values. |
| 111 int i = 0; |
| 112 for (Future future in futures) { |
| 113 int pos = i++; |
| 114 future.then((Object value) { |
| 115 values[pos] = value; |
| 116 if (--remaining == 0) { |
| 117 completer.complete(values); |
| 118 } |
| 119 }); |
| 120 future.catchError((error) { |
| 121 completer.completeError(error.error, error.stackTrace); |
| 122 }); |
| 123 } |
| 124 |
| 125 return completer.future; |
| 126 } |
| 127 |
| 128 Future then(f(T value), { onError(AsyncError error) }) { |
| 129 if (!_isComplete) { |
| 130 if (onError == null) { |
| 131 return new _ThenFuture(f).._subscribeTo(this); |
| 132 } |
| 133 return new _SubscribeFuture(f, onError).._subscribeTo(this); |
| 134 } |
| 135 if (_hasError) { |
| 136 if (onError != null) { |
| 137 return _handleError(onError, null); |
| 138 } |
| 139 // The "f" funtion will never be called, so just return |
| 140 // a future that delegates to this. We don't want to return |
| 141 // this itself to give a signal that the future is complete. |
| 142 return new _FutureWrapper(this); |
| 143 } else { |
| 144 assert(_hasValue); |
| 145 return _handleValue(f); |
| 146 } |
| 147 } |
| 148 |
| 149 Future catchError(f(AsyncError asyncError), { bool test(error) }) { |
| 150 if (_hasValue) { |
| 151 return new _FutureWrapper(this); |
| 152 } |
| 153 if (!_isComplete) { |
| 154 return new _CatchErrorFuture(f, test).._subscribeTo(this); |
| 155 } else { |
| 156 return _handleError(f, test); |
| 157 } |
| 158 } |
| 159 |
| 160 Future<T> whenComplete(void action()) { |
| 161 _WhenFuture<T> whenFuture = new _WhenFuture<T>(action); |
| 162 if (!_isComplete) { |
| 163 _addListener(whenFuture); |
| 164 } else if (_hasValue) { |
| 165 new Timer(0, (_) { |
| 166 T value = _resultOrListeners; |
| 167 whenFuture._sendValue(value); |
| 168 }); |
| 169 } else { |
| 170 assert(_hasError); |
| 171 new Timer(0, (_) { |
| 172 AsyncError error = _resultOrListeners; |
| 173 whenFuture._sendError(error); |
| 174 }); |
| 175 } |
| 176 return whenFuture; |
| 177 } |
| 178 |
| 179 Future _handleValue(onValue(var value)) { |
| 180 assert(_hasValue); |
| 181 _ThenFuture thenFuture = new _ThenFuture(onValue); |
| 182 T value = _resultOrListeners; |
| 183 new Timer(0, (_) { thenFuture._sendValue(value); }); |
| 184 return thenFuture; |
| 185 } |
| 186 |
| 187 Future _handleError(onError(AsyncError error), bool test(error)) { |
| 188 assert(_hasError); |
| 189 AsyncError error = _resultOrListeners; |
| 190 _CatchErrorFuture errorFuture = new _CatchErrorFuture(onError, test); |
| 191 new Timer(0, (_) { errorFuture._sendError(error); }); |
| 192 return errorFuture; |
| 193 } |
| 194 |
| 195 Stream<T> asStream() => new Stream.fromFuture(this); |
| 196 |
| 197 void _setValue(T value) { |
| 198 if (_state != _INCOMPLETE) throw new StateError("Future already completed"); |
| 199 _FutureListener listeners = _removeListeners(); |
| 200 _state = _VALUE; |
| 201 _resultOrListeners = value; |
| 202 while (listeners != null) { |
| 203 _FutureListener listener = listeners; |
| 204 listeners = listener._nextListener; |
| 205 listener._nextListener = null; |
| 206 listener._sendValue(value); |
| 207 } |
| 208 } |
| 209 |
| 210 void _setError(AsyncError error) { |
| 211 if (_isComplete) throw new StateError("Future already completed"); |
| 212 _FutureListener listeners = _removeListeners(); |
| 213 _state = _ERROR; |
| 214 _resultOrListeners = error; |
| 215 if (listeners == null) { |
| 216 error.throwDelayed(); |
| 217 return; |
| 218 } |
| 219 while (listeners != null) { |
| 220 _FutureListener listener = listeners; |
| 221 listeners = listener._nextListener; |
| 222 listener._nextListener = null; |
| 223 listener._sendError(error); |
| 224 } |
| 225 } |
| 226 |
| 227 void _addListener(_FutureListener listener) { |
| 228 assert(!_isComplete); |
| 229 assert(listener._nextListener == null); |
| 230 listener._nextListener = _resultOrListeners; |
| 231 _resultOrListeners = listener; |
| 232 } |
| 233 |
| 234 _FutureListener _removeListeners() { |
| 235 // Reverse listeners before returning them, so the resulting list is in |
| 236 // subscription order. |
| 237 assert(!_isComplete); |
| 238 _FutureListener current = _resultOrListeners; |
| 239 _resultOrListeners = null; |
| 240 _FutureListener prev = null; |
| 241 while (current != null) { |
| 242 _FutureListener next = current._nextListener; |
| 243 current._nextListener = prev; |
| 244 prev = current; |
| 245 current = next; |
| 246 } |
| 247 return prev; |
| 248 } |
| 249 |
| 250 /** |
| 251 * Make another [_FutureImpl] receive the result of this one. |
| 252 * |
| 253 * If this future is already complete, the [future] is notified |
| 254 * immediately. This function is only called during event resolution |
| 255 * where it's acceptable to send an event. |
| 256 */ |
| 257 void _chain(_FutureImpl future) { |
| 258 if (!_isComplete) { |
| 259 _addListener(future._asListener()); |
| 260 } else if (_hasValue) { |
| 261 future._setValue(_resultOrListeners); |
| 262 } else { |
| 263 assert(_hasError); |
| 264 future._setError(_resultOrListeners); |
| 265 } |
| 266 } |
| 267 |
| 268 _FutureListener _asListener() => new _FutureListener.wrap(this); |
| 269 } |
| 270 |
| 271 /** |
| 272 * Transforming future base class. |
| 273 * |
| 274 * A transforming future is itself a future and a future listener. |
| 275 * Subclasses override [_sendValue]/[_sendError] to intercept |
| 276 * the results of a previous future. |
| 277 */ |
| 278 abstract class _TransformFuture<S, T> extends _FutureImpl<T> |
| 279 implements _FutureListener<S> { |
| 280 // _FutureListener implementation. |
| 281 _FutureListener _nextListener; |
| 282 |
| 283 void _sendValue(S value); |
| 284 |
| 285 void _sendError(AsyncError error); |
| 286 |
| 287 void _subscribeTo(_FutureImpl future) { |
| 288 future._addListener(this); |
| 289 } |
| 290 |
| 291 /** |
| 292 * Helper function to hand the result of transforming an incoming event. |
| 293 * |
| 294 * If the result is itself a [Future], this future is linked to that |
| 295 * future's output. If not, this future is completed with the result. |
| 296 */ |
| 297 void _setOrChainValue(var result) { |
| 298 if (result is Future) { |
| 299 // Result should be a Future<T>. |
| 300 if (result is _FutureImpl) { |
| 301 _FutureImpl chainFuture = result; |
| 302 chainFuture._chain(this); |
| 303 return; |
| 304 } else { |
| 305 Future future = result; |
| 306 future.then(_setValue, |
| 307 onError: _setError); |
| 308 return; |
| 309 } |
| 310 } else { |
| 311 // Result must be of type T. |
| 312 _setValue(result); |
| 313 } |
| 314 } |
| 315 } |
| 316 |
| 317 /** The onValue and onError handlers return either a value or a future */ |
| 318 typedef dynamic _FutureOnValue<T>(T value); |
| 319 typedef dynamic _FutureOnError(AsyncError error); |
| 320 /** Test used by [Future.catchError] to handle skip some errors. */ |
| 321 typedef bool _FutureErrorTest(var error); |
| 322 /** Used by [WhenFuture]. */ |
| 323 typedef void _FutureAction(); |
| 324 |
| 325 /** Future returned by [Future.then] with no [:onError:] parameter. */ |
| 326 class _ThenFuture<S, T> extends _TransformFuture<S, T> { |
| 327 final _FutureOnValue<S> _onValue; |
| 328 |
| 329 _ThenFuture(this._onValue); |
| 330 |
| 331 _sendValue(S value) { |
| 332 assert(_onValue != null); |
| 333 var result; |
| 334 try { |
| 335 result = _onValue(value); |
| 336 } catch (e, s) { |
| 337 _setError(new AsyncError(e, s)); |
| 338 return; |
| 339 } |
| 340 _setOrChainValue(result); |
| 341 } |
| 342 |
| 343 void _sendError(AsyncError error) { |
| 344 _setError(error); |
| 345 } |
| 346 } |
| 347 |
| 348 /** Future returned by [Future.catchError]. */ |
| 349 class _CatchErrorFuture<T> extends _TransformFuture<T,T> { |
| 350 final _FutureErrorTest _test; |
| 351 final _FutureOnError _onError; |
| 352 |
| 353 _CatchErrorFuture(this._onError, this._test); |
| 354 |
| 355 _sendValue(T value) { |
| 356 _setValue(value); |
| 357 } |
| 358 |
| 359 _sendError(AsyncError error) { |
| 360 assert(_onError != null); |
| 361 // if _test is supplied, check if it returns true, otherwise just |
| 362 // forward the error unmodified. |
| 363 if (_test != null) { |
| 364 bool matchesTest; |
| 365 try { |
| 366 matchesTest = _test(error.error); |
| 367 } catch (e, s) { |
| 368 _setError(new AsyncError.withCause(e, s, error)); |
| 369 return; |
| 370 } |
| 371 if (!matchesTest) { |
| 372 _setError(error); |
| 373 return; |
| 374 } |
| 375 } |
| 376 // Act on the error, and use the result as this future's result. |
| 377 var result; |
| 378 try { |
| 379 result = _onError(error); |
| 380 } catch (e, s) { |
| 381 _setError(new AsyncError.withCause(e, s, error)); |
| 382 return; |
| 383 } |
| 384 _setOrChainValue(result); |
| 385 } |
| 386 } |
| 387 |
| 388 /** Future returned by [Future.then] with an [:onError:] parameter. */ |
| 389 class _SubscribeFuture<S, T> extends _ThenFuture<S, T> { |
| 390 final _FutureOnError _onError; |
| 391 |
| 392 _SubscribeFuture(onValue(S value), this._onError) : super(onValue); |
| 393 |
| 394 // The _sendValue method is inherited from ThenFuture. |
| 395 |
| 396 void _sendError(AsyncError error) { |
| 397 assert(_onError != null); |
| 398 var result; |
| 399 try { |
| 400 result = _onError(error); |
| 401 } catch (e, s) { |
| 402 _setError(new AsyncError.withCause(e, s, error)); |
| 403 return; |
| 404 } |
| 405 _setOrChainValue(result); |
| 406 } |
| 407 } |
| 408 |
| 409 /** Future returned by [Future.whenComplete]. */ |
| 410 class _WhenFuture<T> extends _TransformFuture<T, T> { |
| 411 final _FutureAction _action; |
| 412 |
| 413 _WhenFuture(this._action); |
| 414 |
| 415 void _sendValue(T value) { |
| 416 try { |
| 417 _action(); |
| 418 } catch (e, s) { |
| 419 _setError(new AsyncError(e, s)); |
| 420 return; |
| 421 } |
| 422 _setValue(value); |
| 423 } |
| 424 |
| 425 void _sendError(AsyncError error) { |
| 426 try { |
| 427 _action(); |
| 428 } catch (e, s) { |
| 429 error = new AsyncError.withCause(e, s, error); |
| 430 } |
| 431 _setError(error); |
| 432 } |
| 433 } |
| 434 |
| 435 /** |
| 436 * Thin wrapper around a [Future]. |
| 437 * |
| 438 * This is used to return a "new" [Future] that effectively work just |
| 439 * as an existing [Future], without making this discoverable by comparing |
| 440 * identities. |
| 441 */ |
| 442 class _FutureWrapper<T> implements Future<T> { |
| 443 final Future<T> _future; |
| 444 |
| 445 _FutureWrapper(this._future); |
| 446 |
| 447 Future then(function(T value), { onError(AsyncError error) }) { |
| 448 return _future.then(function, onError: onError); |
| 449 } |
| 450 |
| 451 Future catchError(function(AsyncError error), {bool test(var error)}) { |
| 452 return _future.catchError(function, test: test); |
| 453 } |
| 454 |
| 455 Future whenComplete(void action()) { |
| 456 return _future.whenComplete(action); |
| 457 } |
| 458 |
| 459 Stream<T> asStream() => new Stream.fromFuture(this); |
| 460 } |
OLD | NEW |