Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(38)

Side by Side Diff: tool/input_sdk/lib/async/future_impl.dart

Issue 1953153002: Update dart:async to match the Dart repo. (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** The onValue and onError handlers return either a value or a future */ 7 /** The onValue and onError handlers return either a value or a future */
8 typedef dynamic _FutureOnValue<T>(T value); 8 typedef dynamic/*T|Future<T>*/ _FutureOnValue<S, T>(S value);
9 /** Test used by [Future.catchError] to handle skip some errors. */ 9 /** Test used by [Future.catchError] to handle skip some errors. */
10 typedef bool _FutureErrorTest(var error); 10 typedef bool _FutureErrorTest(var error);
11 /** Used by [WhenFuture]. */ 11 /** Used by [WhenFuture]. */
12 typedef _FutureAction(); 12 typedef _FutureAction();
13 13
14 abstract class _Completer<T> implements Completer<T> { 14 abstract class _Completer<T> implements Completer<T> {
15 final _Future<T> future = new _Future<T>(); 15 final _Future<T> future = new _Future<T>();
16 16
17 void complete([value]); 17 void complete([value]);
18 18
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
50 void complete([value]) { 50 void complete([value]) {
51 if (!future._mayComplete) throw new StateError("Future already completed"); 51 if (!future._mayComplete) throw new StateError("Future already completed");
52 future._complete(value); 52 future._complete(value);
53 } 53 }
54 54
55 void _completeError(Object error, StackTrace stackTrace) { 55 void _completeError(Object error, StackTrace stackTrace) {
56 future._completeError(error, stackTrace); 56 future._completeError(error, stackTrace);
57 } 57 }
58 } 58 }
59 59
60 class _FutureListener { 60 class _FutureListener<S, T> {
61 static const int MASK_VALUE = 1; 61 static const int MASK_VALUE = 1;
62 static const int MASK_ERROR = 2; 62 static const int MASK_ERROR = 2;
63 static const int MASK_TEST_ERROR = 4; 63 static const int MASK_TEST_ERROR = 4;
64 static const int MASK_WHENCOMPLETE = 8; 64 static const int MASK_WHENCOMPLETE = 8;
65 static const int STATE_CHAIN = 0; 65 static const int STATE_CHAIN = 0;
66 static const int STATE_THEN = MASK_VALUE; 66 static const int STATE_THEN = MASK_VALUE;
67 static const int STATE_THEN_ONERROR = MASK_VALUE | MASK_ERROR; 67 // TODO(johnmccutchan): Remove the hard coded value. See #26030.
68 static const int STATE_THEN_ONERROR = 3; // MASK_VALUE | MASK_ERROR.
68 static const int STATE_CATCHERROR = MASK_ERROR; 69 static const int STATE_CATCHERROR = MASK_ERROR;
69 static const int STATE_CATCHERROR_TEST = MASK_ERROR | MASK_TEST_ERROR; 70 // TODO(johnmccutchan): Remove the hard coded value. See #26030.
71 static const int STATE_CATCHERROR_TEST = 6; // MASK_ERROR | MASK_TEST_ERROR.
70 static const int STATE_WHENCOMPLETE = MASK_WHENCOMPLETE; 72 static const int STATE_WHENCOMPLETE = MASK_WHENCOMPLETE;
71 // Listeners on the same future are linked through this link. 73 // Listeners on the same future are linked through this link.
72 _FutureListener _nextListener = null; 74 _FutureListener _nextListener = null;
73 // The future to complete when this listener is activated. 75 // The future to complete when this listener is activated.
74 final _Future result; 76 final _Future<T> result;
75 // Which fields means what. 77 // Which fields means what.
76 final int state; 78 final int state;
77 // Used for then/whenDone callback and error test 79 // Used for then/whenDone callback and error test
78 final Function callback; 80 final Function callback;
79 // Used for error callbacks. 81 // Used for error callbacks.
80 final Function errorCallback; 82 final Function errorCallback;
81 83
82 _FutureListener.then(this.result, 84 _FutureListener.then(this.result,
83 _FutureOnValue onValue, Function errorCallback) 85 _FutureOnValue<S, T> onValue, Function errorCallback)
84 : callback = onValue, 86 : callback = onValue,
85 errorCallback = errorCallback, 87 errorCallback = errorCallback,
86 state = (errorCallback == null) ? STATE_THEN : STATE_THEN_ONERROR; 88 state = (errorCallback == null) ? STATE_THEN : STATE_THEN_ONERROR;
87 89
88 _FutureListener.catchError(this.result, 90 _FutureListener.catchError(this.result,
89 this.errorCallback, _FutureErrorTest test) 91 this.errorCallback, _FutureErrorTest test)
90 : callback = test, 92 : callback = test,
91 state = (test == null) ? STATE_CATCHERROR : STATE_CATCHERROR_TEST; 93 state = (test == null) ? STATE_CATCHERROR : STATE_CATCHERROR_TEST;
92 94
93 _FutureListener.whenComplete(this.result, _FutureAction onComplete) 95 _FutureListener.whenComplete(this.result, _FutureAction onComplete)
94 : callback = onComplete, 96 : callback = onComplete,
95 errorCallback = null, 97 errorCallback = null,
96 state = STATE_WHENCOMPLETE; 98 state = STATE_WHENCOMPLETE;
97 99
98 _FutureListener.chain(this.result)
99 : callback = null,
100 errorCallback = null,
101 state = STATE_CHAIN;
102
103 Zone get _zone => result._zone; 100 Zone get _zone => result._zone;
104 101
105 bool get handlesValue => (state & MASK_VALUE != 0); 102 bool get handlesValue => (state & MASK_VALUE != 0);
106 bool get handlesError => (state & MASK_ERROR != 0); 103 bool get handlesError => (state & MASK_ERROR != 0);
107 bool get hasErrorTest => (state == STATE_CATCHERROR_TEST); 104 bool get hasErrorTest => (state == STATE_CATCHERROR_TEST);
108 bool get handlesComplete => (state == STATE_WHENCOMPLETE); 105 bool get handlesComplete => (state == STATE_WHENCOMPLETE);
109 106
110 _FutureOnValue get _onValue { 107
108 _FutureOnValue<S, T> get _onValue {
111 assert(handlesValue); 109 assert(handlesValue);
112 return callback; 110 return callback as Object /*=_FutureOnValue<S, T>*/;
113 } 111 }
114 Function get _onError => errorCallback; 112 Function get _onError => errorCallback;
115 _FutureErrorTest get _errorTest { 113 _FutureErrorTest get _errorTest {
116 assert(hasErrorTest); 114 assert(hasErrorTest);
117 return callback; 115 return callback as Object /*=_FutureErrorTest*/;
118 } 116 }
119 _FutureAction get _whenCompleteAction { 117 _FutureAction get _whenCompleteAction {
120 assert(handlesComplete); 118 assert(handlesComplete);
121 return callback; 119 return callback as Object /*=_FutureAction*/;
120 }
121
122 /// Whether this listener has an error callback.
123 ///
124 /// This function must only be called if the listener [handlesError].
125 bool get hasErrorCallback {
126 assert(handlesError);
127 return _onError != null;
128 }
129
130 dynamic/*T|Future<T>*/ handleValue(S sourceResult) {
131 return _zone.runUnary/*<dynamic/*T|Future<T>*/, S>*/(
132 _onValue, sourceResult);
133 }
134
135 bool matchesErrorTest(AsyncError asyncError) {
136 if (!hasErrorTest) return true;
137 _FutureErrorTest test = _errorTest;
138 return _zone.runUnary/*<bool, dynamic>*/(_errorTest, asyncError.error);
139 }
140
141 dynamic/*T|Future<T>*/ handleError(AsyncError asyncError) {
142 assert(handlesError && hasErrorCallback);
143 if (errorCallback is ZoneBinaryCallback) {
144 var typedErrorCallback = errorCallback as Object
145 /*=ZoneBinaryCallback<Object/*T|Future<T>*/, Object, StackTrace>*/;
146 return _zone.runBinary(typedErrorCallback,
147 asyncError.error,
148 asyncError.stackTrace);
149 } else {
150 return _zone.runUnary/*<dynamic/*T|Future<T>*/, dynamic>*/(
151 errorCallback, asyncError.error);
152 }
153 }
154
155 dynamic handleWhenComplete() {
156 assert(!handlesError);
157 return _zone.run(_whenCompleteAction);
122 } 158 }
123 } 159 }
124 160
125 class _Future<T> implements Future<T> { 161 class _Future<T> implements Future<T> {
126 /// Initial state, waiting for a result. In this state, the 162 /// Initial state, waiting for a result. In this state, the
127 /// [resultOrListeners] field holds a single-linked list of 163 /// [resultOrListeners] field holds a single-linked list of
128 /// [_FutureListener] listeners. 164 /// [_FutureListener] listeners.
129 static const int _INCOMPLETE = 0; 165 static const int _INCOMPLETE = 0;
130 /// Pending completion. Set when completed using [_asyncComplete] or 166 /// Pending completion. Set when completed using [_asyncComplete] or
131 /// [_asyncCompleteError]. It is an error to try to complete it again. 167 /// [_asyncCompleteError]. It is an error to try to complete it again.
132 /// [resultOrListeners] holds listeners. 168 /// [resultOrListeners] holds listeners.
133 static const int _PENDING_COMPLETE = 1; 169 static const int _PENDING_COMPLETE = 1;
134 /// The future has been chained to another future. The result of that 170 /// The future has been chained to another future. The result of that
135 /// other future becomes the result of this future as well. 171 /// other future becomes the result of this future as well.
136 // TODO(floitsch): we don't really need a special "_CHAINED" state. We could 172 /// [resultOrListeners] contains the source future.
137 // just use the PENDING_COMPLETE state instead.
138 static const int _CHAINED = 2; 173 static const int _CHAINED = 2;
139 /// The future has been completed with a value result. 174 /// The future has been completed with a value result.
140 static const int _VALUE = 4; 175 static const int _VALUE = 4;
141 /// The future has been completed with an error result. 176 /// The future has been completed with an error result.
142 static const int _ERROR = 8; 177 static const int _ERROR = 8;
143 178
144 /** Whether the future is complete, and as what. */ 179 /** Whether the future is complete, and as what. */
145 int _state = _INCOMPLETE; 180 int _state = _INCOMPLETE;
146 181
147 /** 182 /**
(...skipping 12 matching lines...) Expand all
160 * A result is only stored when the future has completed. 195 * A result is only stored when the future has completed.
161 * 196 *
162 * The listeners is an internally linked list of [_FutureListener]s. 197 * The listeners is an internally linked list of [_FutureListener]s.
163 * Listeners are only remembered while the future is not yet complete, 198 * Listeners are only remembered while the future is not yet complete,
164 * and it is not chained to another future. 199 * and it is not chained to another future.
165 * 200 *
166 * The future is another future that his future is chained to. This future 201 * The future is another future that his future is chained to. This future
167 * is waiting for the other future to complete, and when it does, this future 202 * is waiting for the other future to complete, and when it does, this future
168 * will complete with the same result. 203 * will complete with the same result.
169 * All listeners are forwarded to the other future. 204 * All listeners are forwarded to the other future.
170 *
171 * The cases are disjoint - incomplete and unchained ([_INCOMPLETE]),
172 * incomplete and chained ([_CHAINED]), or completed with value or error
173 * ([_VALUE] or [_ERROR]) - so the field only needs to hold
174 * one value at a time.
175 */ 205 */
176 var _resultOrListeners; 206 var _resultOrListeners;
177 207
208 // This constructor is used by async/await.
178 _Future(); 209 _Future();
179 210
180 /// Valid types for value: `T` or `Future<T>`. 211 /// Valid types for value: `T` or `Future<T>`.
181 _Future.immediate(value) { 212 _Future.immediate(value) {
182 _asyncComplete(value); 213 _asyncComplete(value);
183 } 214 }
184 215
185 _Future.immediateError(var error, [StackTrace stackTrace]) { 216 _Future.immediateError(var error, [StackTrace stackTrace]) {
186 _asyncCompleteError(error, stackTrace); 217 _asyncCompleteError(error, stackTrace);
187 } 218 }
188 219
189 bool get _mayComplete => _state == _INCOMPLETE; 220 bool get _mayComplete => _state == _INCOMPLETE;
190 bool get _isChained => _state == _CHAINED; 221 bool get _isPendingComplete => _state == _PENDING_COMPLETE;
191 bool get _isComplete => _state >= _VALUE; 222 bool get _mayAddListener => _state <= _PENDING_COMPLETE;
192 bool get _hasValue => _state == _VALUE; 223 bool get _isChained => _state == _CHAINED;
193 bool get _hasError => _state == _ERROR; 224 bool get _isComplete => _state >= _VALUE;
225 bool get _hasError => _state == _ERROR;
194 226
195 set _isChained(bool value) { 227 void _setChained(_Future source) {
196 if (value) { 228 assert(_mayAddListener);
197 assert(!_isComplete); 229 _state = _CHAINED;
198 _state = _CHAINED; 230 _resultOrListeners = source;
199 } else {
200 assert(_isChained);
201 _state = _INCOMPLETE;
202 }
203 } 231 }
204 232
205 Future/*<S>*/ then/*<S>*/(/*=S*/ f(T value), { Function onError }) { 233 Future/*<E>*/ then/*<E>*/(
206 _Future/*<S>*/ result = new _Future(); 234 /*=dynamic/*E|Future<E>*/*/ f(T value), { Function onError }) {
207 if (!identical(result._zone, _ROOT_ZONE)) { 235 Zone currentZone = Zone.current;
208 f = result._zone.registerUnaryCallback(f); 236 ZoneUnaryCallback registered;
237 if (!identical(currentZone, _ROOT_ZONE)) {
238 f = currentZone.registerUnaryCallback/*<dynamic, T>*/(f);
209 if (onError != null) { 239 if (onError != null) {
210 onError = _registerErrorHandler(onError, result._zone); 240 onError = _registerErrorHandler/*<T>*/(onError, currentZone);
211 } 241 }
212 } 242 }
213 _addListener(new _FutureListener.then(result, f, onError)); 243 return _thenNoZoneRegistration/*<E>*/(f, onError);
244 }
245
246 // This method is used by async/await.
247 Future/*<E>*/ _thenNoZoneRegistration/*<E>*/(f(T value), Function onError) {
248 _Future/*<E>*/ result = new _Future/*<E>*/();
249 _addListener(new _FutureListener/*<T, E>*/.then(result, f, onError));
214 return result; 250 return result;
215 } 251 }
216 252
217 Future catchError(Function onError, { bool test(error) }) { 253 Future/*<T>*/ catchError(Function onError, { bool test(error) }) {
218 _Future result = new _Future(); 254 _Future/*<T>*/ result = new _Future/*<T>*/();
219 if (!identical(result._zone, _ROOT_ZONE)) { 255 if (!identical(result._zone, _ROOT_ZONE)) {
220 onError = _registerErrorHandler(onError, result._zone); 256 onError = _registerErrorHandler/*<T>*/(onError, result._zone);
221 if (test != null) test = result._zone.registerUnaryCallback(test); 257 if (test != null) test = result._zone.registerUnaryCallback(test);
222 } 258 }
223 _addListener(new _FutureListener.catchError(result, onError, test)); 259 _addListener(new _FutureListener/*<T, T>*/.catchError(
260 result, onError, test));
224 return result; 261 return result;
225 } 262 }
226 263
227 Future<T> whenComplete(action()) { 264 Future<T> whenComplete(action()) {
228 _Future result = new _Future<T>(); 265 _Future<T> result = new _Future<T>();
229 if (!identical(result._zone, _ROOT_ZONE)) { 266 if (!identical(result._zone, _ROOT_ZONE)) {
230 action = result._zone.registerCallback(action); 267 action = result._zone.registerCallback/*<dynamic>*/(action);
231 } 268 }
232 _addListener(new _FutureListener.whenComplete(result, action)); 269 _addListener(new _FutureListener/*<T, T>*/.whenComplete(result, action));
233 return result; 270 return result;
234 } 271 }
235 272
236 Stream<T> asStream() => new Stream.fromFuture(this); 273 Stream<T> asStream() => new Stream<T>.fromFuture(this);
237 274
238 void _markPendingCompletion() { 275 void _setPendingComplete() {
239 if (!_mayComplete) throw new StateError("Future already completed"); 276 assert(_mayComplete);
240 _state = _PENDING_COMPLETE; 277 _state = _PENDING_COMPLETE;
241 } 278 }
242 279
243 T get _value { 280 AsyncError get _error {
244 assert(_isComplete && _hasValue); 281 assert(_hasError);
245 return _resultOrListeners; 282 return _resultOrListeners;
246 } 283 }
247 284
248 AsyncError get _error { 285 _Future get _chainSource {
249 assert(_isComplete && _hasError); 286 assert(_isChained);
250 return _resultOrListeners; 287 return _resultOrListeners;
251 } 288 }
252 289
290 // This method is used by async/await.
253 void _setValue(T value) { 291 void _setValue(T value) {
254 assert(!_isComplete); // But may have a completion pending. 292 assert(!_isComplete); // But may have a completion pending.
255 _state = _VALUE; 293 _state = _VALUE;
256 _resultOrListeners = value; 294 _resultOrListeners = value;
257 } 295 }
258 296
259 void _setErrorObject(AsyncError error) { 297 void _setErrorObject(AsyncError error) {
260 assert(!_isComplete); // But may have a completion pending. 298 assert(!_isComplete); // But may have a completion pending.
261 _state = _ERROR; 299 _state = _ERROR;
262 _resultOrListeners = error; 300 _resultOrListeners = error;
263 } 301 }
264 302
265 void _setError(Object error, StackTrace stackTrace) { 303 void _setError(Object error, StackTrace stackTrace) {
266 _setErrorObject(new AsyncError(error, stackTrace)); 304 _setErrorObject(new AsyncError(error, stackTrace));
267 } 305 }
268 306
307 /// Copy the completion result of [source] into this future.
308 ///
309 /// Used when a chained future notices that its source is completed.
310 void _cloneResult(_Future source) {
311 assert(!_isComplete);
312 assert(source._isComplete);
313 _state = source._state;
314 _resultOrListeners = source._resultOrListeners;
315 }
316
269 void _addListener(_FutureListener listener) { 317 void _addListener(_FutureListener listener) {
270 assert(listener._nextListener == null); 318 assert(listener._nextListener == null);
271 if (_isComplete) { 319 if (_mayAddListener) {
320 listener._nextListener = _resultOrListeners;
321 _resultOrListeners = listener;
322 } else {
323 if (_isChained) {
324 // Delegate listeners to chained source future.
325 // If the source is complete, instead copy its values and
326 // drop the chaining.
327 _Future source = _chainSource;
328 if (!source._isComplete) {
329 source._addListener(listener);
330 return;
331 }
332 _cloneResult(source);
333 }
334 assert(_isComplete);
272 // Handle late listeners asynchronously. 335 // Handle late listeners asynchronously.
273 _zone.scheduleMicrotask(() { 336 _zone.scheduleMicrotask(() {
274 _propagateToListeners(this, listener); 337 _propagateToListeners(this, listener);
275 }); 338 });
276 } else {
277 listener._nextListener = _resultOrListeners;
278 _resultOrListeners = listener;
279 } 339 }
280 } 340 }
281 341
342 void _prependListeners(_FutureListener listeners) {
343 if (listeners == null) return;
344 if (_mayAddListener) {
345 _FutureListener existingListeners = _resultOrListeners;
346 _resultOrListeners = listeners;
347 if (existingListeners != null) {
348 _FutureListener cursor = listeners;
349 while (cursor._nextListener != null) {
350 cursor = cursor._nextListener;
351 }
352 cursor._nextListener = existingListeners;
353 }
354 } else {
355 if (_isChained) {
356 // Delegate listeners to chained source future.
357 // If the source is complete, instead copy its values and
358 // drop the chaining.
359 _Future source = _chainSource;
360 if (!source._isComplete) {
361 source._prependListeners(listeners);
362 return;
363 }
364 _cloneResult(source);
365 }
366 assert(_isComplete);
367 listeners = _reverseListeners(listeners);
368 _zone.scheduleMicrotask(() {
369 _propagateToListeners(this, listeners);
370 });
371 }
372 }
373
282 _FutureListener _removeListeners() { 374 _FutureListener _removeListeners() {
283 // Reverse listeners before returning them, so the resulting list is in 375 // Reverse listeners before returning them, so the resulting list is in
284 // subscription order. 376 // subscription order.
285 assert(!_isComplete); 377 assert(!_isComplete);
286 _FutureListener current = _resultOrListeners; 378 _FutureListener current = _resultOrListeners;
287 _resultOrListeners = null; 379 _resultOrListeners = null;
380 return _reverseListeners(current);
381 }
382
383 _FutureListener _reverseListeners(_FutureListener listeners) {
288 _FutureListener prev = null; 384 _FutureListener prev = null;
385 _FutureListener current = listeners;
289 while (current != null) { 386 while (current != null) {
290 _FutureListener next = current._nextListener; 387 _FutureListener next = current._nextListener;
291 current._nextListener = prev; 388 current._nextListener = prev;
292 prev = current; 389 prev = current;
293 current = next; 390 current = next;
294 } 391 }
295 return prev; 392 return prev;
296 } 393 }
297 394
298 // Take the value (when completed) of source and complete target with that 395 // Take the value (when completed) of source and complete target with that
299 // value (or error). This function can chain all Futures, but is slower 396 // value (or error). This function could chain all Futures, but is slower
300 // for _Future than _chainCoreFuture - Use _chainCoreFuture in that case. 397 // for _Future than _chainCoreFuture, so you must use _chainCoreFuture
398 // in that case.
301 static void _chainForeignFuture(Future source, _Future target) { 399 static void _chainForeignFuture(Future source, _Future target) {
302 assert(!target._isComplete); 400 assert(!target._isComplete);
303 assert(source is! _Future); 401 assert(source is! _Future);
304 402
305 // Mark the target as chained (and as such half-completed). 403 // Mark the target as chained (and as such half-completed).
306 target._isChained = true; 404 target._setPendingComplete();
307 source.then((value) { 405 try {
308 assert(target._isChained); 406 source.then((value) {
309 target._completeWithValue(value); 407 assert(target._isPendingComplete);
310 }, 408 target._completeWithValue(value);
311 // TODO(floitsch): eventually we would like to make this non-optional 409 },
312 // and dependent on the listeners of the target future. If none of 410 // TODO(floitsch): eventually we would like to make this non-optional
313 // the target future's listeners want to have the stack trace we don't 411 // and dependent on the listeners of the target future. If none of
314 // need a trace. 412 // the target future's listeners want to have the stack trace we don't
315 onError: (error, [stackTrace]) { 413 // need a trace.
316 assert(target._isChained); 414 onError: (error, [stackTrace]) {
317 target._completeError(error, stackTrace); 415 assert(target._isPendingComplete);
416 target._completeError(error, stackTrace);
417 });
418 } catch (e, s) {
419 // This only happens if the `then` call threw synchronously when given
420 // valid arguments.
421 // That requires a non-conforming implementation of the Future interface,
422 // which should, hopefully, never happen.
423 scheduleMicrotask(() {
424 target._completeError(e, s);
318 }); 425 });
426 }
319 } 427 }
320 428
321 // Take the value (when completed) of source and complete target with that 429 // Take the value (when completed) of source and complete target with that
322 // value (or error). This function expects that source is a _Future. 430 // value (or error). This function expects that source is a _Future.
323 static void _chainCoreFuture(_Future source, _Future target) { 431 static void _chainCoreFuture(_Future source, _Future target) {
324 assert(!target._isComplete); 432 assert(target._mayAddListener); // Not completed, not already chained.
325 assert(source is _Future); 433 while (source._isChained) {
326 434 source = source._chainSource;
327 // Mark the target as chained (and as such half-completed). 435 }
328 target._isChained = true;
329 _FutureListener listener = new _FutureListener.chain(target);
330 if (source._isComplete) { 436 if (source._isComplete) {
331 _propagateToListeners(source, listener); 437 _FutureListener listeners = target._removeListeners();
438 target._cloneResult(source);
439 _propagateToListeners(target, listeners);
332 } else { 440 } else {
333 source._addListener(listener); 441 _FutureListener listeners = target._resultOrListeners;
442 target._setChained(source);
443 source._prependListeners(listeners);
334 } 444 }
335 } 445 }
336 446
337 void _complete(value) { 447 void _complete(value) {
338 assert(!_isComplete); 448 assert(!_isComplete);
339 if (value is Future) { 449 if (value is Future) {
340 if (value is _Future) { 450 if (value is _Future) {
341 _chainCoreFuture(value, this); 451 _chainCoreFuture(value, this);
342 } else { 452 } else {
343 _chainForeignFuture(value, this); 453 _chainForeignFuture(value, this);
344 } 454 }
345 } else { 455 } else {
346 _FutureListener listeners = _removeListeners(); 456 _FutureListener listeners = _removeListeners();
347 _setValue(value); 457 _setValue(value as Object /*=T*/);
348 _propagateToListeners(this, listeners); 458 _propagateToListeners(this, listeners);
349 } 459 }
350 } 460 }
351 461
352 void _completeWithValue(value) { 462 void _completeWithValue(T value) {
353 assert(!_isComplete); 463 assert(!_isComplete);
354 assert(value is! Future); 464 assert(value is! Future);
355 465
356 _FutureListener listeners = _removeListeners(); 466 _FutureListener listeners = _removeListeners();
357 _setValue(value); 467 _setValue(value);
358 _propagateToListeners(this, listeners); 468 _propagateToListeners(this, listeners);
359 } 469 }
360 470
361 void _completeError(error, [StackTrace stackTrace]) { 471 void _completeError(error, [StackTrace stackTrace]) {
362 assert(!_isComplete); 472 assert(!_isComplete);
363 473
364 _FutureListener listeners = _removeListeners(); 474 _FutureListener listeners = _removeListeners();
365 _setError(error, stackTrace); 475 _setError(error, stackTrace);
366 _propagateToListeners(this, listeners); 476 _propagateToListeners(this, listeners);
367 } 477 }
368 478
369 void _asyncComplete(value) { 479 void _asyncComplete(value) {
370 assert(!_isComplete); 480 assert(!_isComplete);
371 // Two corner cases if the value is a future: 481 // Two corner cases if the value is a future:
372 // 1. the future is already completed and an error. 482 // 1. the future is already completed and an error.
373 // 2. the future is not yet completed but might become an error. 483 // 2. the future is not yet completed but might become an error.
374 // The first case means that we must not immediately complete the Future, 484 // The first case means that we must not immediately complete the Future,
375 // as our code would immediately start propagating the error without 485 // as our code would immediately start propagating the error without
376 // giving the time to install error-handlers. 486 // giving the time to install error-handlers.
377 // However the second case requires us to deal with the value immediately. 487 // However the second case requires us to deal with the value immediately.
378 // Otherwise the value could complete with an error and report an 488 // Otherwise the value could complete with an error and report an
379 // unhandled error, even though we know we are already going to listen to 489 // unhandled error, even though we know we are already going to listen to
380 // it. 490 // it.
381 491
382 if (value == null) { 492 if (value is Future) {
383 // No checks for `null`.
384 } else if (value is Future) {
385 // Assign to typed variables so we get earlier checks in checked mode. 493 // Assign to typed variables so we get earlier checks in checked mode.
386 Future<T> typedFuture = value; 494 Future<T> typedFuture = value as Object /*=Future<T>*/;
387 if (typedFuture is _Future) { 495 if (typedFuture is _Future) {
388 _Future<T> coreFuture = typedFuture; 496 _Future<T> coreFuture = typedFuture;
389 if (coreFuture._isComplete && coreFuture._hasError) { 497 if (coreFuture._hasError) {
390 // Case 1 from above. Delay completion to enable the user to register 498 // Case 1 from above. Delay completion to enable the user to register
391 // callbacks. 499 // callbacks.
392 _markPendingCompletion(); 500 _setPendingComplete();
393 _zone.scheduleMicrotask(() { 501 _zone.scheduleMicrotask(() {
394 _chainCoreFuture(coreFuture, this); 502 _chainCoreFuture(coreFuture, this);
395 }); 503 });
396 } else { 504 } else {
397 _chainCoreFuture(coreFuture, this); 505 _chainCoreFuture(coreFuture, this);
398 } 506 }
399 } else { 507 } else {
400 // Case 2 from above. Chain the future immidiately. 508 // Case 2 from above. Chain the future immediately.
401 // Note that we are still completing asynchronously (through 509 // Note that we are still completing asynchronously (through
402 // _chainForeignFuture).. 510 // _chainForeignFuture).
403 _chainForeignFuture(typedFuture, this); 511 _chainForeignFuture(typedFuture, this);
404 } 512 }
405 return; 513 return;
406 } else {
407 T typedValue = value;
408 } 514 }
515 T typedValue = value as Object /*=T*/;
409 516
410 _markPendingCompletion(); 517 _setPendingComplete();
411 _zone.scheduleMicrotask(() { 518 _zone.scheduleMicrotask(() {
412 _completeWithValue(value); 519 _completeWithValue(typedValue);
413 }); 520 });
414 } 521 }
415 522
416 void _asyncCompleteError(error, StackTrace stackTrace) { 523 void _asyncCompleteError(error, StackTrace stackTrace) {
417 assert(!_isComplete); 524 assert(!_isComplete);
418 525
419 _markPendingCompletion(); 526 _setPendingComplete();
420 _zone.scheduleMicrotask(() { 527 _zone.scheduleMicrotask(() {
421 _completeError(error, stackTrace); 528 _completeError(error, stackTrace);
422 }); 529 });
423 } 530 }
424 531
425 /** 532 /**
426 * Propagates the value/error of [source] to its [listeners], executing the 533 * Propagates the value/error of [source] to its [listeners], executing the
427 * listeners' callbacks. 534 * listeners' callbacks.
428 */ 535 */
429 static void _propagateToListeners(_Future source, _FutureListener listeners) { 536 static void _propagateToListeners(_Future source, _FutureListener listeners) {
(...skipping 11 matching lines...) Expand all
441 // Usually futures only have one listener. If they have several, we 548 // Usually futures only have one listener. If they have several, we
442 // call handle them separately in recursive calls, continuing 549 // call handle them separately in recursive calls, continuing
443 // here only when there is only one listener left. 550 // here only when there is only one listener left.
444 while (listeners._nextListener != null) { 551 while (listeners._nextListener != null) {
445 _FutureListener listener = listeners; 552 _FutureListener listener = listeners;
446 listeners = listener._nextListener; 553 listeners = listener._nextListener;
447 listener._nextListener = null; 554 listener._nextListener = null;
448 _propagateToListeners(source, listener); 555 _propagateToListeners(source, listener);
449 } 556 }
450 _FutureListener listener = listeners; 557 _FutureListener listener = listeners;
558 final sourceResult = source._resultOrListeners;
451 // Do the actual propagation. 559 // Do the actual propagation.
452 // Set initial state of listenerHasValue and listenerValueOrError. These 560 // Set initial state of listenerHasError and listenerValueOrError. These
453 // variables are updated, with the outcome of potential callbacks. 561 // variables are updated with the outcome of potential callbacks.
454 bool listenerHasValue = true; 562 // Non-error results, including futures, are stored in
455 final sourceValue = hasError ? null : source._value; 563 // listenerValueOrError and listenerHasError is set to false. Errors
456 var listenerValueOrError = sourceValue; 564 // are stored in listenerValueOrError as an [AsyncError] and
457 // Set to true if a whenComplete needs to wait for a future. 565 // listenerHasError is set to true.
458 // The whenComplete action will resume the propagation by itself. 566 bool listenerHasError = hasError;
459 bool isPropagationAborted = false; 567 var listenerValueOrError = sourceResult;
460 // TODO(floitsch): mark the listener as pending completion. Currently 568
461 // we can't do this, since the markPendingCompletion verifies that
462 // the future is not already marked (or chained).
463 // Only if we either have an error or callbacks, go into this, somewhat 569 // Only if we either have an error or callbacks, go into this, somewhat
464 // expensive, branch. Here we'll enter/leave the zone. Many futures 570 // expensive, branch. Here we'll enter/leave the zone. Many futures
465 // doesn't have callbacks, so this is a significant optimization. 571 // don't have callbacks, so this is a significant optimization.
466 if (hasError || (listener.handlesValue || listener.handlesComplete)) { 572 if (hasError || listener.handlesValue || listener.handlesComplete) {
467 Zone zone = listener._zone; 573 Zone zone = listener._zone;
468 if (hasError && !source._zone.inSameErrorZone(zone)) { 574 if (hasError && !source._zone.inSameErrorZone(zone)) {
469 // Don't cross zone boundaries with errors. 575 // Don't cross zone boundaries with errors.
470 AsyncError asyncError = source._error; 576 AsyncError asyncError = source._error;
471 source._zone.handleUncaughtError( 577 source._zone.handleUncaughtError(
472 asyncError.error, asyncError.stackTrace); 578 asyncError.error, asyncError.stackTrace);
473 return; 579 return;
474 } 580 }
475 581
476 Zone oldZone; 582 Zone oldZone;
477 if (!identical(Zone.current, zone)) { 583 if (!identical(Zone.current, zone)) {
478 // Change zone if it's not current. 584 // Change zone if it's not current.
479 oldZone = Zone._enter(zone); 585 oldZone = Zone._enter(zone);
480 } 586 }
481 587
482 bool handleValueCallback() { 588 // These callbacks are abstracted to isolate the try/catch blocks
483 try { 589 // from the rest of the code to work around a V8 glass jaw.
484 listenerValueOrError = zone.runUnary(listener._onValue,
485 sourceValue);
486 return true;
487 } catch (e, s) {
488 listenerValueOrError = new AsyncError(e, s);
489 return false;
490 }
491 }
492
493 void handleError() {
494 AsyncError asyncError = source._error;
495 bool matchesTest = true;
496 if (listener.hasErrorTest) {
497 _FutureErrorTest test = listener._errorTest;
498 try {
499 matchesTest = zone.runUnary(test, asyncError.error);
500 } catch (e, s) {
501 listenerValueOrError = identical(asyncError.error, e) ?
502 asyncError : new AsyncError(e, s);
503 listenerHasValue = false;
504 return;
505 }
506 }
507 Function errorCallback = listener._onError;
508 if (matchesTest && errorCallback != null) {
509 try {
510 if (errorCallback is ZoneBinaryCallback) {
511 listenerValueOrError = zone.runBinary(errorCallback,
512 asyncError.error,
513 asyncError.stackTrace);
514 } else {
515 listenerValueOrError = zone.runUnary(errorCallback,
516 asyncError.error);
517 }
518 } catch (e, s) {
519 listenerValueOrError = identical(asyncError.error, e) ?
520 asyncError : new AsyncError(e, s);
521 listenerHasValue = false;
522 return;
523 }
524 listenerHasValue = true;
525 } else {
526 // Copy over the error from the source.
527 listenerValueOrError = asyncError;
528 listenerHasValue = false;
529 }
530 }
531
532 void handleWhenCompleteCallback() { 590 void handleWhenCompleteCallback() {
591 // The whenComplete-handler is not combined with normal value/error
592 // handling. This means at most one handleX method is called per
593 // listener.
594 assert(!listener.handlesValue);
595 assert(!listener.handlesError);
533 var completeResult; 596 var completeResult;
534 try { 597 try {
535 completeResult = zone.run(listener._whenCompleteAction); 598 completeResult = listener.handleWhenComplete();
536 } catch (e, s) { 599 } catch (e, s) {
537 if (hasError && identical(source._error.error, e)) { 600 if (hasError && identical(source._error.error, e)) {
538 listenerValueOrError = source._error; 601 listenerValueOrError = source._error;
539 } else { 602 } else {
540 listenerValueOrError = new AsyncError(e, s); 603 listenerValueOrError = new AsyncError(e, s);
541 } 604 }
542 listenerHasValue = false; 605 listenerHasError = true;
543 return; 606 return;
544 } 607 }
545 if (completeResult is Future) { 608 if (completeResult is Future) {
546 _Future result = listener.result; 609 if (completeResult is _Future && completeResult._isComplete) {
547 result._isChained = true; 610 if (completeResult._hasError) {
548 isPropagationAborted = true; 611 listenerValueOrError = completeResult._error;
549 completeResult.then((ignored) { 612 listenerHasError = true;
550 _propagateToListeners(source, new _FutureListener.chain(result));
551 }, onError: (error, [stackTrace]) {
552 // When there is an error, we have to make the error the new
553 // result of the current listener.
554 if (completeResult is! _Future) {
555 // This should be a rare case.
556 completeResult = new _Future();
557 completeResult._setError(error, stackTrace);
558 } 613 }
559 _propagateToListeners(completeResult, 614 // Otherwise use the existing result of source.
560 new _FutureListener.chain(result)); 615 return;
561 }); 616 }
617 // We have to wait for the completeResult future to complete
618 // before knowing if it's an error or we should use the result
619 // of source.
620 var originalSource = source;
621 listenerValueOrError = completeResult.then((_) => originalSource);
622 listenerHasError = false;
562 } 623 }
563 } 624 }
564 625
565 if (!hasError) { 626 void handleValueCallback() {
627 try {
628 listenerValueOrError = listener.handleValue(sourceResult);
629 } catch (e, s) {
630 listenerValueOrError = new AsyncError(e, s);
631 listenerHasError = true;
632 }
633 }
634
635 void handleError() {
636 try {
637 AsyncError asyncError = source._error;
638 if (listener.matchesErrorTest(asyncError) &&
639 listener.hasErrorCallback) {
640 listenerValueOrError = listener.handleError(asyncError);
641 listenerHasError = false;
642 }
643 } catch (e, s) {
644 if (identical(source._error.error, e)) {
645 listenerValueOrError = source._error;
646 } else {
647 listenerValueOrError = new AsyncError(e, s);
648 }
649 listenerHasError = true;
650 }
651 }
652
653
654 if (listener.handlesComplete) {
655 handleWhenCompleteCallback();
656 } else if (!hasError) {
566 if (listener.handlesValue) { 657 if (listener.handlesValue) {
567 listenerHasValue = handleValueCallback(); 658 handleValueCallback();
568 } 659 }
569 } else { 660 } else {
570 handleError(); 661 if (listener.handlesError) {
662 handleError();
663 }
571 } 664 }
572 if (listener.handlesComplete) { 665
573 handleWhenCompleteCallback();
574 }
575 // If we changed zone, oldZone will not be null. 666 // If we changed zone, oldZone will not be null.
576 if (oldZone != null) Zone._leave(oldZone); 667 if (oldZone != null) Zone._leave(oldZone);
577 668
578 if (isPropagationAborted) return;
579 // If the listener's value is a future we need to chain it. Note that 669 // If the listener's value is a future we need to chain it. Note that
580 // this can only happen if there is a callback. Since 'is' checks 670 // this can only happen if there is a callback.
581 // can be expensive, we're trying to avoid it. 671 if (listenerValueOrError is Future) {
582 if (listenerHasValue &&
583 !identical(sourceValue, listenerValueOrError) &&
584 listenerValueOrError is Future) {
585 Future chainSource = listenerValueOrError; 672 Future chainSource = listenerValueOrError;
586 // Shortcut if the chain-source is already completed. Just continue 673 // Shortcut if the chain-source is already completed. Just continue
587 // the loop. 674 // the loop.
588 _Future result = listener.result; 675 _Future result = listener.result;
589 if (chainSource is _Future) { 676 if (chainSource is _Future) {
590 if (chainSource._isComplete) { 677 if (chainSource._isComplete) {
591 // propagate the value (simulating a tail call). 678 listeners = result._removeListeners();
592 result._isChained = true; 679 result._cloneResult(chainSource);
593 source = chainSource; 680 source = chainSource;
594 listeners = new _FutureListener.chain(result);
595 continue; 681 continue;
596 } else { 682 } else {
597 _chainCoreFuture(chainSource, result); 683 _chainCoreFuture(chainSource, result);
598 } 684 }
599 } else { 685 } else {
600 _chainForeignFuture(chainSource, result); 686 _chainForeignFuture(chainSource, result);
601 } 687 }
602 return; 688 return;
603 } 689 }
604 } 690 }
605 _Future result = listener.result; 691 _Future result = listener.result;
606 listeners = result._removeListeners(); 692 listeners = result._removeListeners();
607 if (listenerHasValue) { 693 if (!listenerHasError) {
608 result._setValue(listenerValueOrError); 694 result._setValue(listenerValueOrError);
609 } else { 695 } else {
610 AsyncError asyncError = listenerValueOrError; 696 AsyncError asyncError = listenerValueOrError;
611 result._setErrorObject(asyncError); 697 result._setErrorObject(asyncError);
612 } 698 }
613 // Prepare for next round. 699 // Prepare for next round.
614 source = result; 700 source = result;
615 } 701 }
616 } 702 }
617 703
618 Future timeout(Duration timeLimit, {onTimeout()}) { 704 Future<T> timeout(Duration timeLimit, {onTimeout()}) {
619 if (_isComplete) return new _Future.immediate(this); 705 if (_isComplete) return new _Future.immediate(this);
620 _Future result = new _Future(); 706 _Future<T> result = new _Future<T>();
621 Timer timer; 707 Timer timer;
622 if (onTimeout == null) { 708 if (onTimeout == null) {
623 timer = new Timer(timeLimit, () { 709 timer = new Timer(timeLimit, () {
624 result._completeError(new TimeoutException("Future not completed", 710 result._completeError(new TimeoutException("Future not completed",
625 timeLimit)); 711 timeLimit));
626 }); 712 });
627 } else { 713 } else {
628 Zone zone = Zone.current; 714 Zone zone = Zone.current;
629 onTimeout = zone.registerCallback(onTimeout); 715 onTimeout = zone.registerCallback(onTimeout);
630 timer = new Timer(timeLimit, () { 716 timer = new Timer(timeLimit, () {
(...skipping 11 matching lines...) Expand all
642 } 728 }
643 }, onError: (e, s) { 729 }, onError: (e, s) {
644 if (timer.isActive) { 730 if (timer.isActive) {
645 timer.cancel(); 731 timer.cancel();
646 result._completeError(e, s); 732 result._completeError(e, s);
647 } 733 }
648 }); 734 });
649 return result; 735 return result;
650 } 736 }
651 } 737 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698