OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** The onValue and onError handlers return either a value or a future */ | 7 /** The onValue and onError handlers return either a value or a future */ |
8 typedef dynamic _FutureOnValue<T>(T value); | 8 typedef dynamic/*T|Future<T>*/ _FutureOnValue<S, T>(S value); |
9 /** Test used by [Future.catchError] to handle skip some errors. */ | 9 /** Test used by [Future.catchError] to handle skip some errors. */ |
10 typedef bool _FutureErrorTest(var error); | 10 typedef bool _FutureErrorTest(var error); |
11 /** Used by [WhenFuture]. */ | 11 /** Used by [WhenFuture]. */ |
12 typedef _FutureAction(); | 12 typedef _FutureAction(); |
13 | 13 |
14 abstract class _Completer<T> implements Completer<T> { | 14 abstract class _Completer<T> implements Completer<T> { |
15 final _Future<T> future = new _Future<T>(); | 15 final _Future<T> future = new _Future<T>(); |
16 | 16 |
17 void complete([value]); | 17 void complete([value]); |
18 | 18 |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
50 void complete([value]) { | 50 void complete([value]) { |
51 if (!future._mayComplete) throw new StateError("Future already completed"); | 51 if (!future._mayComplete) throw new StateError("Future already completed"); |
52 future._complete(value); | 52 future._complete(value); |
53 } | 53 } |
54 | 54 |
55 void _completeError(Object error, StackTrace stackTrace) { | 55 void _completeError(Object error, StackTrace stackTrace) { |
56 future._completeError(error, stackTrace); | 56 future._completeError(error, stackTrace); |
57 } | 57 } |
58 } | 58 } |
59 | 59 |
60 class _FutureListener { | 60 class _FutureListener<S, T> { |
61 static const int MASK_VALUE = 1; | 61 static const int MASK_VALUE = 1; |
62 static const int MASK_ERROR = 2; | 62 static const int MASK_ERROR = 2; |
63 static const int MASK_TEST_ERROR = 4; | 63 static const int MASK_TEST_ERROR = 4; |
64 static const int MASK_WHENCOMPLETE = 8; | 64 static const int MASK_WHENCOMPLETE = 8; |
65 static const int STATE_CHAIN = 0; | 65 static const int STATE_CHAIN = 0; |
66 static const int STATE_THEN = MASK_VALUE; | 66 static const int STATE_THEN = MASK_VALUE; |
67 static const int STATE_THEN_ONERROR = MASK_VALUE | MASK_ERROR; | 67 // TODO(johnmccutchan): Remove the hard coded value. See #26030. |
| 68 static const int STATE_THEN_ONERROR = 3; // MASK_VALUE | MASK_ERROR. |
68 static const int STATE_CATCHERROR = MASK_ERROR; | 69 static const int STATE_CATCHERROR = MASK_ERROR; |
69 static const int STATE_CATCHERROR_TEST = MASK_ERROR | MASK_TEST_ERROR; | 70 // TODO(johnmccutchan): Remove the hard coded value. See #26030. |
| 71 static const int STATE_CATCHERROR_TEST = 6; // MASK_ERROR | MASK_TEST_ERROR. |
70 static const int STATE_WHENCOMPLETE = MASK_WHENCOMPLETE; | 72 static const int STATE_WHENCOMPLETE = MASK_WHENCOMPLETE; |
71 // Listeners on the same future are linked through this link. | 73 // Listeners on the same future are linked through this link. |
72 _FutureListener _nextListener = null; | 74 _FutureListener _nextListener = null; |
73 // The future to complete when this listener is activated. | 75 // The future to complete when this listener is activated. |
74 final _Future result; | 76 final _Future<T> result; |
75 // Which fields means what. | 77 // Which fields means what. |
76 final int state; | 78 final int state; |
77 // Used for then/whenDone callback and error test | 79 // Used for then/whenDone callback and error test |
78 final Function callback; | 80 final Function callback; |
79 // Used for error callbacks. | 81 // Used for error callbacks. |
80 final Function errorCallback; | 82 final Function errorCallback; |
81 | 83 |
82 _FutureListener.then(this.result, | 84 _FutureListener.then(this.result, |
83 _FutureOnValue onValue, Function errorCallback) | 85 _FutureOnValue<S, T> onValue, Function errorCallback) |
84 : callback = onValue, | 86 : callback = onValue, |
85 errorCallback = errorCallback, | 87 errorCallback = errorCallback, |
86 state = (errorCallback == null) ? STATE_THEN : STATE_THEN_ONERROR; | 88 state = (errorCallback == null) ? STATE_THEN : STATE_THEN_ONERROR; |
87 | 89 |
88 _FutureListener.catchError(this.result, | 90 _FutureListener.catchError(this.result, |
89 this.errorCallback, _FutureErrorTest test) | 91 this.errorCallback, _FutureErrorTest test) |
90 : callback = test, | 92 : callback = test, |
91 state = (test == null) ? STATE_CATCHERROR : STATE_CATCHERROR_TEST; | 93 state = (test == null) ? STATE_CATCHERROR : STATE_CATCHERROR_TEST; |
92 | 94 |
93 _FutureListener.whenComplete(this.result, _FutureAction onComplete) | 95 _FutureListener.whenComplete(this.result, _FutureAction onComplete) |
94 : callback = onComplete, | 96 : callback = onComplete, |
95 errorCallback = null, | 97 errorCallback = null, |
96 state = STATE_WHENCOMPLETE; | 98 state = STATE_WHENCOMPLETE; |
97 | 99 |
98 _FutureListener.chain(this.result) | |
99 : callback = null, | |
100 errorCallback = null, | |
101 state = STATE_CHAIN; | |
102 | |
103 Zone get _zone => result._zone; | 100 Zone get _zone => result._zone; |
104 | 101 |
105 bool get handlesValue => (state & MASK_VALUE != 0); | 102 bool get handlesValue => (state & MASK_VALUE != 0); |
106 bool get handlesError => (state & MASK_ERROR != 0); | 103 bool get handlesError => (state & MASK_ERROR != 0); |
107 bool get hasErrorTest => (state == STATE_CATCHERROR_TEST); | 104 bool get hasErrorTest => (state == STATE_CATCHERROR_TEST); |
108 bool get handlesComplete => (state == STATE_WHENCOMPLETE); | 105 bool get handlesComplete => (state == STATE_WHENCOMPLETE); |
109 | 106 |
110 _FutureOnValue get _onValue { | 107 |
| 108 _FutureOnValue<S, T> get _onValue { |
111 assert(handlesValue); | 109 assert(handlesValue); |
112 return callback; | 110 return callback as Object /*=_FutureOnValue<S, T>*/; |
113 } | 111 } |
114 Function get _onError => errorCallback; | 112 Function get _onError => errorCallback; |
115 _FutureErrorTest get _errorTest { | 113 _FutureErrorTest get _errorTest { |
116 assert(hasErrorTest); | 114 assert(hasErrorTest); |
117 return callback; | 115 return callback as Object /*=_FutureErrorTest*/; |
118 } | 116 } |
119 _FutureAction get _whenCompleteAction { | 117 _FutureAction get _whenCompleteAction { |
120 assert(handlesComplete); | 118 assert(handlesComplete); |
121 return callback; | 119 return callback as Object /*=_FutureAction*/; |
| 120 } |
| 121 |
| 122 /// Whether this listener has an error callback. |
| 123 /// |
| 124 /// This function must only be called if the listener [handlesError]. |
| 125 bool get hasErrorCallback { |
| 126 assert(handlesError); |
| 127 return _onError != null; |
| 128 } |
| 129 |
| 130 dynamic/*T|Future<T>*/ handleValue(S sourceResult) { |
| 131 return _zone.runUnary/*<dynamic/*T|Future<T>*/, S>*/( |
| 132 _onValue, sourceResult); |
| 133 } |
| 134 |
| 135 bool matchesErrorTest(AsyncError asyncError) { |
| 136 if (!hasErrorTest) return true; |
| 137 _FutureErrorTest test = _errorTest; |
| 138 return _zone.runUnary/*<bool, dynamic>*/(_errorTest, asyncError.error); |
| 139 } |
| 140 |
| 141 dynamic/*T|Future<T>*/ handleError(AsyncError asyncError) { |
| 142 assert(handlesError && hasErrorCallback); |
| 143 if (errorCallback is ZoneBinaryCallback) { |
| 144 var typedErrorCallback = errorCallback as Object |
| 145 /*=ZoneBinaryCallback<Object/*T|Future<T>*/, Object, StackTrace>*/; |
| 146 return _zone.runBinary(typedErrorCallback, |
| 147 asyncError.error, |
| 148 asyncError.stackTrace); |
| 149 } else { |
| 150 return _zone.runUnary/*<dynamic/*T|Future<T>*/, dynamic>*/( |
| 151 errorCallback, asyncError.error); |
| 152 } |
| 153 } |
| 154 |
| 155 dynamic handleWhenComplete() { |
| 156 assert(!handlesError); |
| 157 return _zone.run(_whenCompleteAction); |
122 } | 158 } |
123 } | 159 } |
124 | 160 |
125 class _Future<T> implements Future<T> { | 161 class _Future<T> implements Future<T> { |
126 /// Initial state, waiting for a result. In this state, the | 162 /// Initial state, waiting for a result. In this state, the |
127 /// [resultOrListeners] field holds a single-linked list of | 163 /// [resultOrListeners] field holds a single-linked list of |
128 /// [_FutureListener] listeners. | 164 /// [_FutureListener] listeners. |
129 static const int _INCOMPLETE = 0; | 165 static const int _INCOMPLETE = 0; |
130 /// Pending completion. Set when completed using [_asyncComplete] or | 166 /// Pending completion. Set when completed using [_asyncComplete] or |
131 /// [_asyncCompleteError]. It is an error to try to complete it again. | 167 /// [_asyncCompleteError]. It is an error to try to complete it again. |
132 /// [resultOrListeners] holds listeners. | 168 /// [resultOrListeners] holds listeners. |
133 static const int _PENDING_COMPLETE = 1; | 169 static const int _PENDING_COMPLETE = 1; |
134 /// The future has been chained to another future. The result of that | 170 /// The future has been chained to another future. The result of that |
135 /// other future becomes the result of this future as well. | 171 /// other future becomes the result of this future as well. |
136 // TODO(floitsch): we don't really need a special "_CHAINED" state. We could | 172 /// [resultOrListeners] contains the source future. |
137 // just use the PENDING_COMPLETE state instead. | |
138 static const int _CHAINED = 2; | 173 static const int _CHAINED = 2; |
139 /// The future has been completed with a value result. | 174 /// The future has been completed with a value result. |
140 static const int _VALUE = 4; | 175 static const int _VALUE = 4; |
141 /// The future has been completed with an error result. | 176 /// The future has been completed with an error result. |
142 static const int _ERROR = 8; | 177 static const int _ERROR = 8; |
143 | 178 |
144 /** Whether the future is complete, and as what. */ | 179 /** Whether the future is complete, and as what. */ |
145 int _state = _INCOMPLETE; | 180 int _state = _INCOMPLETE; |
146 | 181 |
147 /** | 182 /** |
(...skipping 12 matching lines...) Expand all Loading... |
160 * A result is only stored when the future has completed. | 195 * A result is only stored when the future has completed. |
161 * | 196 * |
162 * The listeners is an internally linked list of [_FutureListener]s. | 197 * The listeners is an internally linked list of [_FutureListener]s. |
163 * Listeners are only remembered while the future is not yet complete, | 198 * Listeners are only remembered while the future is not yet complete, |
164 * and it is not chained to another future. | 199 * and it is not chained to another future. |
165 * | 200 * |
166 * The future is another future that his future is chained to. This future | 201 * The future is another future that his future is chained to. This future |
167 * is waiting for the other future to complete, and when it does, this future | 202 * is waiting for the other future to complete, and when it does, this future |
168 * will complete with the same result. | 203 * will complete with the same result. |
169 * All listeners are forwarded to the other future. | 204 * All listeners are forwarded to the other future. |
170 * | |
171 * The cases are disjoint - incomplete and unchained ([_INCOMPLETE]), | |
172 * incomplete and chained ([_CHAINED]), or completed with value or error | |
173 * ([_VALUE] or [_ERROR]) - so the field only needs to hold | |
174 * one value at a time. | |
175 */ | 205 */ |
176 var _resultOrListeners; | 206 var _resultOrListeners; |
177 | 207 |
| 208 // This constructor is used by async/await. |
178 _Future(); | 209 _Future(); |
179 | 210 |
180 /// Valid types for value: `T` or `Future<T>`. | 211 /// Valid types for value: `T` or `Future<T>`. |
181 _Future.immediate(value) { | 212 _Future.immediate(value) { |
182 _asyncComplete(value); | 213 _asyncComplete(value); |
183 } | 214 } |
184 | 215 |
185 _Future.immediateError(var error, [StackTrace stackTrace]) { | 216 _Future.immediateError(var error, [StackTrace stackTrace]) { |
186 _asyncCompleteError(error, stackTrace); | 217 _asyncCompleteError(error, stackTrace); |
187 } | 218 } |
188 | 219 |
189 bool get _mayComplete => _state == _INCOMPLETE; | 220 bool get _mayComplete => _state == _INCOMPLETE; |
190 bool get _isChained => _state == _CHAINED; | 221 bool get _isPendingComplete => _state == _PENDING_COMPLETE; |
191 bool get _isComplete => _state >= _VALUE; | 222 bool get _mayAddListener => _state <= _PENDING_COMPLETE; |
192 bool get _hasValue => _state == _VALUE; | 223 bool get _isChained => _state == _CHAINED; |
193 bool get _hasError => _state == _ERROR; | 224 bool get _isComplete => _state >= _VALUE; |
| 225 bool get _hasError => _state == _ERROR; |
194 | 226 |
195 set _isChained(bool value) { | 227 void _setChained(_Future source) { |
196 if (value) { | 228 assert(_mayAddListener); |
197 assert(!_isComplete); | 229 _state = _CHAINED; |
198 _state = _CHAINED; | 230 _resultOrListeners = source; |
199 } else { | |
200 assert(_isChained); | |
201 _state = _INCOMPLETE; | |
202 } | |
203 } | 231 } |
204 | 232 |
205 Future/*<S>*/ then/*<S>*/(/*=S*/ f(T value), { Function onError }) { | 233 Future/*<E>*/ then/*<E>*/( |
206 _Future/*<S>*/ result = new _Future(); | 234 /*=dynamic/*E|Future<E>*/*/ f(T value), { Function onError }) { |
207 if (!identical(result._zone, _ROOT_ZONE)) { | 235 Zone currentZone = Zone.current; |
208 f = result._zone.registerUnaryCallback(f); | 236 ZoneUnaryCallback registered; |
| 237 if (!identical(currentZone, _ROOT_ZONE)) { |
| 238 f = currentZone.registerUnaryCallback/*<dynamic, T>*/(f); |
209 if (onError != null) { | 239 if (onError != null) { |
210 onError = _registerErrorHandler(onError, result._zone); | 240 onError = _registerErrorHandler/*<T>*/(onError, currentZone); |
211 } | 241 } |
212 } | 242 } |
213 _addListener(new _FutureListener.then(result, f, onError)); | 243 return _thenNoZoneRegistration/*<E>*/(f, onError); |
| 244 } |
| 245 |
| 246 // This method is used by async/await. |
| 247 Future/*<E>*/ _thenNoZoneRegistration/*<E>*/(f(T value), Function onError) { |
| 248 _Future/*<E>*/ result = new _Future/*<E>*/(); |
| 249 _addListener(new _FutureListener/*<T, E>*/.then(result, f, onError)); |
214 return result; | 250 return result; |
215 } | 251 } |
216 | 252 |
217 Future catchError(Function onError, { bool test(error) }) { | 253 Future/*<T>*/ catchError(Function onError, { bool test(error) }) { |
218 _Future result = new _Future(); | 254 _Future/*<T>*/ result = new _Future/*<T>*/(); |
219 if (!identical(result._zone, _ROOT_ZONE)) { | 255 if (!identical(result._zone, _ROOT_ZONE)) { |
220 onError = _registerErrorHandler(onError, result._zone); | 256 onError = _registerErrorHandler/*<T>*/(onError, result._zone); |
221 if (test != null) test = result._zone.registerUnaryCallback(test); | 257 if (test != null) test = result._zone.registerUnaryCallback(test); |
222 } | 258 } |
223 _addListener(new _FutureListener.catchError(result, onError, test)); | 259 _addListener(new _FutureListener/*<T, T>*/.catchError( |
| 260 result, onError, test)); |
224 return result; | 261 return result; |
225 } | 262 } |
226 | 263 |
227 Future<T> whenComplete(action()) { | 264 Future<T> whenComplete(action()) { |
228 _Future result = new _Future<T>(); | 265 _Future<T> result = new _Future<T>(); |
229 if (!identical(result._zone, _ROOT_ZONE)) { | 266 if (!identical(result._zone, _ROOT_ZONE)) { |
230 action = result._zone.registerCallback(action); | 267 action = result._zone.registerCallback/*<dynamic>*/(action); |
231 } | 268 } |
232 _addListener(new _FutureListener.whenComplete(result, action)); | 269 _addListener(new _FutureListener/*<T, T>*/.whenComplete(result, action)); |
233 return result; | 270 return result; |
234 } | 271 } |
235 | 272 |
236 Stream<T> asStream() => new Stream.fromFuture(this); | 273 Stream<T> asStream() => new Stream<T>.fromFuture(this); |
237 | 274 |
238 void _markPendingCompletion() { | 275 void _setPendingComplete() { |
239 if (!_mayComplete) throw new StateError("Future already completed"); | 276 assert(_mayComplete); |
240 _state = _PENDING_COMPLETE; | 277 _state = _PENDING_COMPLETE; |
241 } | 278 } |
242 | 279 |
243 T get _value { | 280 AsyncError get _error { |
244 assert(_isComplete && _hasValue); | 281 assert(_hasError); |
245 return _resultOrListeners; | 282 return _resultOrListeners; |
246 } | 283 } |
247 | 284 |
248 AsyncError get _error { | 285 _Future get _chainSource { |
249 assert(_isComplete && _hasError); | 286 assert(_isChained); |
250 return _resultOrListeners; | 287 return _resultOrListeners; |
251 } | 288 } |
252 | 289 |
| 290 // This method is used by async/await. |
253 void _setValue(T value) { | 291 void _setValue(T value) { |
254 assert(!_isComplete); // But may have a completion pending. | 292 assert(!_isComplete); // But may have a completion pending. |
255 _state = _VALUE; | 293 _state = _VALUE; |
256 _resultOrListeners = value; | 294 _resultOrListeners = value; |
257 } | 295 } |
258 | 296 |
259 void _setErrorObject(AsyncError error) { | 297 void _setErrorObject(AsyncError error) { |
260 assert(!_isComplete); // But may have a completion pending. | 298 assert(!_isComplete); // But may have a completion pending. |
261 _state = _ERROR; | 299 _state = _ERROR; |
262 _resultOrListeners = error; | 300 _resultOrListeners = error; |
263 } | 301 } |
264 | 302 |
265 void _setError(Object error, StackTrace stackTrace) { | 303 void _setError(Object error, StackTrace stackTrace) { |
266 _setErrorObject(new AsyncError(error, stackTrace)); | 304 _setErrorObject(new AsyncError(error, stackTrace)); |
267 } | 305 } |
268 | 306 |
| 307 /// Copy the completion result of [source] into this future. |
| 308 /// |
| 309 /// Used when a chained future notices that its source is completed. |
| 310 void _cloneResult(_Future source) { |
| 311 assert(!_isComplete); |
| 312 assert(source._isComplete); |
| 313 _state = source._state; |
| 314 _resultOrListeners = source._resultOrListeners; |
| 315 } |
| 316 |
269 void _addListener(_FutureListener listener) { | 317 void _addListener(_FutureListener listener) { |
270 assert(listener._nextListener == null); | 318 assert(listener._nextListener == null); |
271 if (_isComplete) { | 319 if (_mayAddListener) { |
| 320 listener._nextListener = _resultOrListeners; |
| 321 _resultOrListeners = listener; |
| 322 } else { |
| 323 if (_isChained) { |
| 324 // Delegate listeners to chained source future. |
| 325 // If the source is complete, instead copy its values and |
| 326 // drop the chaining. |
| 327 _Future source = _chainSource; |
| 328 if (!source._isComplete) { |
| 329 source._addListener(listener); |
| 330 return; |
| 331 } |
| 332 _cloneResult(source); |
| 333 } |
| 334 assert(_isComplete); |
272 // Handle late listeners asynchronously. | 335 // Handle late listeners asynchronously. |
273 _zone.scheduleMicrotask(() { | 336 _zone.scheduleMicrotask(() { |
274 _propagateToListeners(this, listener); | 337 _propagateToListeners(this, listener); |
275 }); | 338 }); |
276 } else { | |
277 listener._nextListener = _resultOrListeners; | |
278 _resultOrListeners = listener; | |
279 } | 339 } |
280 } | 340 } |
281 | 341 |
| 342 void _prependListeners(_FutureListener listeners) { |
| 343 if (listeners == null) return; |
| 344 if (_mayAddListener) { |
| 345 _FutureListener existingListeners = _resultOrListeners; |
| 346 _resultOrListeners = listeners; |
| 347 if (existingListeners != null) { |
| 348 _FutureListener cursor = listeners; |
| 349 while (cursor._nextListener != null) { |
| 350 cursor = cursor._nextListener; |
| 351 } |
| 352 cursor._nextListener = existingListeners; |
| 353 } |
| 354 } else { |
| 355 if (_isChained) { |
| 356 // Delegate listeners to chained source future. |
| 357 // If the source is complete, instead copy its values and |
| 358 // drop the chaining. |
| 359 _Future source = _chainSource; |
| 360 if (!source._isComplete) { |
| 361 source._prependListeners(listeners); |
| 362 return; |
| 363 } |
| 364 _cloneResult(source); |
| 365 } |
| 366 assert(_isComplete); |
| 367 listeners = _reverseListeners(listeners); |
| 368 _zone.scheduleMicrotask(() { |
| 369 _propagateToListeners(this, listeners); |
| 370 }); |
| 371 } |
| 372 } |
| 373 |
282 _FutureListener _removeListeners() { | 374 _FutureListener _removeListeners() { |
283 // Reverse listeners before returning them, so the resulting list is in | 375 // Reverse listeners before returning them, so the resulting list is in |
284 // subscription order. | 376 // subscription order. |
285 assert(!_isComplete); | 377 assert(!_isComplete); |
286 _FutureListener current = _resultOrListeners; | 378 _FutureListener current = _resultOrListeners; |
287 _resultOrListeners = null; | 379 _resultOrListeners = null; |
| 380 return _reverseListeners(current); |
| 381 } |
| 382 |
| 383 _FutureListener _reverseListeners(_FutureListener listeners) { |
288 _FutureListener prev = null; | 384 _FutureListener prev = null; |
| 385 _FutureListener current = listeners; |
289 while (current != null) { | 386 while (current != null) { |
290 _FutureListener next = current._nextListener; | 387 _FutureListener next = current._nextListener; |
291 current._nextListener = prev; | 388 current._nextListener = prev; |
292 prev = current; | 389 prev = current; |
293 current = next; | 390 current = next; |
294 } | 391 } |
295 return prev; | 392 return prev; |
296 } | 393 } |
297 | 394 |
298 // Take the value (when completed) of source and complete target with that | 395 // Take the value (when completed) of source and complete target with that |
299 // value (or error). This function can chain all Futures, but is slower | 396 // value (or error). This function could chain all Futures, but is slower |
300 // for _Future than _chainCoreFuture - Use _chainCoreFuture in that case. | 397 // for _Future than _chainCoreFuture, so you must use _chainCoreFuture |
| 398 // in that case. |
301 static void _chainForeignFuture(Future source, _Future target) { | 399 static void _chainForeignFuture(Future source, _Future target) { |
302 assert(!target._isComplete); | 400 assert(!target._isComplete); |
303 assert(source is! _Future); | 401 assert(source is! _Future); |
304 | 402 |
305 // Mark the target as chained (and as such half-completed). | 403 // Mark the target as chained (and as such half-completed). |
306 target._isChained = true; | 404 target._setPendingComplete(); |
307 source.then((value) { | 405 try { |
308 assert(target._isChained); | 406 source.then((value) { |
309 target._completeWithValue(value); | 407 assert(target._isPendingComplete); |
310 }, | 408 target._completeWithValue(value); |
311 // TODO(floitsch): eventually we would like to make this non-optional | 409 }, |
312 // and dependent on the listeners of the target future. If none of | 410 // TODO(floitsch): eventually we would like to make this non-optional |
313 // the target future's listeners want to have the stack trace we don't | 411 // and dependent on the listeners of the target future. If none of |
314 // need a trace. | 412 // the target future's listeners want to have the stack trace we don't |
315 onError: (error, [stackTrace]) { | 413 // need a trace. |
316 assert(target._isChained); | 414 onError: (error, [stackTrace]) { |
317 target._completeError(error, stackTrace); | 415 assert(target._isPendingComplete); |
| 416 target._completeError(error, stackTrace); |
| 417 }); |
| 418 } catch (e, s) { |
| 419 // This only happens if the `then` call threw synchronously when given |
| 420 // valid arguments. |
| 421 // That requires a non-conforming implementation of the Future interface, |
| 422 // which should, hopefully, never happen. |
| 423 scheduleMicrotask(() { |
| 424 target._completeError(e, s); |
318 }); | 425 }); |
| 426 } |
319 } | 427 } |
320 | 428 |
321 // Take the value (when completed) of source and complete target with that | 429 // Take the value (when completed) of source and complete target with that |
322 // value (or error). This function expects that source is a _Future. | 430 // value (or error). This function expects that source is a _Future. |
323 static void _chainCoreFuture(_Future source, _Future target) { | 431 static void _chainCoreFuture(_Future source, _Future target) { |
324 assert(!target._isComplete); | 432 assert(target._mayAddListener); // Not completed, not already chained. |
325 assert(source is _Future); | 433 while (source._isChained) { |
326 | 434 source = source._chainSource; |
327 // Mark the target as chained (and as such half-completed). | 435 } |
328 target._isChained = true; | |
329 _FutureListener listener = new _FutureListener.chain(target); | |
330 if (source._isComplete) { | 436 if (source._isComplete) { |
331 _propagateToListeners(source, listener); | 437 _FutureListener listeners = target._removeListeners(); |
| 438 target._cloneResult(source); |
| 439 _propagateToListeners(target, listeners); |
332 } else { | 440 } else { |
333 source._addListener(listener); | 441 _FutureListener listeners = target._resultOrListeners; |
| 442 target._setChained(source); |
| 443 source._prependListeners(listeners); |
334 } | 444 } |
335 } | 445 } |
336 | 446 |
337 void _complete(value) { | 447 void _complete(value) { |
338 assert(!_isComplete); | 448 assert(!_isComplete); |
339 if (value is Future) { | 449 if (value is Future) { |
340 if (value is _Future) { | 450 if (value is _Future) { |
341 _chainCoreFuture(value, this); | 451 _chainCoreFuture(value, this); |
342 } else { | 452 } else { |
343 _chainForeignFuture(value, this); | 453 _chainForeignFuture(value, this); |
344 } | 454 } |
345 } else { | 455 } else { |
346 _FutureListener listeners = _removeListeners(); | 456 _FutureListener listeners = _removeListeners(); |
347 _setValue(value); | 457 _setValue(value as Object /*=T*/); |
348 _propagateToListeners(this, listeners); | 458 _propagateToListeners(this, listeners); |
349 } | 459 } |
350 } | 460 } |
351 | 461 |
352 void _completeWithValue(value) { | 462 void _completeWithValue(T value) { |
353 assert(!_isComplete); | 463 assert(!_isComplete); |
354 assert(value is! Future); | 464 assert(value is! Future); |
355 | 465 |
356 _FutureListener listeners = _removeListeners(); | 466 _FutureListener listeners = _removeListeners(); |
357 _setValue(value); | 467 _setValue(value); |
358 _propagateToListeners(this, listeners); | 468 _propagateToListeners(this, listeners); |
359 } | 469 } |
360 | 470 |
361 void _completeError(error, [StackTrace stackTrace]) { | 471 void _completeError(error, [StackTrace stackTrace]) { |
362 assert(!_isComplete); | 472 assert(!_isComplete); |
363 | 473 |
364 _FutureListener listeners = _removeListeners(); | 474 _FutureListener listeners = _removeListeners(); |
365 _setError(error, stackTrace); | 475 _setError(error, stackTrace); |
366 _propagateToListeners(this, listeners); | 476 _propagateToListeners(this, listeners); |
367 } | 477 } |
368 | 478 |
369 void _asyncComplete(value) { | 479 void _asyncComplete(value) { |
370 assert(!_isComplete); | 480 assert(!_isComplete); |
371 // Two corner cases if the value is a future: | 481 // Two corner cases if the value is a future: |
372 // 1. the future is already completed and an error. | 482 // 1. the future is already completed and an error. |
373 // 2. the future is not yet completed but might become an error. | 483 // 2. the future is not yet completed but might become an error. |
374 // The first case means that we must not immediately complete the Future, | 484 // The first case means that we must not immediately complete the Future, |
375 // as our code would immediately start propagating the error without | 485 // as our code would immediately start propagating the error without |
376 // giving the time to install error-handlers. | 486 // giving the time to install error-handlers. |
377 // However the second case requires us to deal with the value immediately. | 487 // However the second case requires us to deal with the value immediately. |
378 // Otherwise the value could complete with an error and report an | 488 // Otherwise the value could complete with an error and report an |
379 // unhandled error, even though we know we are already going to listen to | 489 // unhandled error, even though we know we are already going to listen to |
380 // it. | 490 // it. |
381 | 491 |
382 if (value == null) { | 492 if (value is Future) { |
383 // No checks for `null`. | |
384 } else if (value is Future) { | |
385 // Assign to typed variables so we get earlier checks in checked mode. | 493 // Assign to typed variables so we get earlier checks in checked mode. |
386 Future<T> typedFuture = value; | 494 Future<T> typedFuture = value as Object /*=Future<T>*/; |
387 if (typedFuture is _Future) { | 495 if (typedFuture is _Future) { |
388 _Future<T> coreFuture = typedFuture; | 496 _Future<T> coreFuture = typedFuture; |
389 if (coreFuture._isComplete && coreFuture._hasError) { | 497 if (coreFuture._hasError) { |
390 // Case 1 from above. Delay completion to enable the user to register | 498 // Case 1 from above. Delay completion to enable the user to register |
391 // callbacks. | 499 // callbacks. |
392 _markPendingCompletion(); | 500 _setPendingComplete(); |
393 _zone.scheduleMicrotask(() { | 501 _zone.scheduleMicrotask(() { |
394 _chainCoreFuture(coreFuture, this); | 502 _chainCoreFuture(coreFuture, this); |
395 }); | 503 }); |
396 } else { | 504 } else { |
397 _chainCoreFuture(coreFuture, this); | 505 _chainCoreFuture(coreFuture, this); |
398 } | 506 } |
399 } else { | 507 } else { |
400 // Case 2 from above. Chain the future immidiately. | 508 // Case 2 from above. Chain the future immediately. |
401 // Note that we are still completing asynchronously (through | 509 // Note that we are still completing asynchronously (through |
402 // _chainForeignFuture).. | 510 // _chainForeignFuture). |
403 _chainForeignFuture(typedFuture, this); | 511 _chainForeignFuture(typedFuture, this); |
404 } | 512 } |
405 return; | 513 return; |
406 } else { | |
407 T typedValue = value; | |
408 } | 514 } |
| 515 T typedValue = value as Object /*=T*/; |
409 | 516 |
410 _markPendingCompletion(); | 517 _setPendingComplete(); |
411 _zone.scheduleMicrotask(() { | 518 _zone.scheduleMicrotask(() { |
412 _completeWithValue(value); | 519 _completeWithValue(typedValue); |
413 }); | 520 }); |
414 } | 521 } |
415 | 522 |
416 void _asyncCompleteError(error, StackTrace stackTrace) { | 523 void _asyncCompleteError(error, StackTrace stackTrace) { |
417 assert(!_isComplete); | 524 assert(!_isComplete); |
418 | 525 |
419 _markPendingCompletion(); | 526 _setPendingComplete(); |
420 _zone.scheduleMicrotask(() { | 527 _zone.scheduleMicrotask(() { |
421 _completeError(error, stackTrace); | 528 _completeError(error, stackTrace); |
422 }); | 529 }); |
423 } | 530 } |
424 | 531 |
425 /** | 532 /** |
426 * Propagates the value/error of [source] to its [listeners], executing the | 533 * Propagates the value/error of [source] to its [listeners], executing the |
427 * listeners' callbacks. | 534 * listeners' callbacks. |
428 */ | 535 */ |
429 static void _propagateToListeners(_Future source, _FutureListener listeners) { | 536 static void _propagateToListeners(_Future source, _FutureListener listeners) { |
(...skipping 11 matching lines...) Expand all Loading... |
441 // Usually futures only have one listener. If they have several, we | 548 // Usually futures only have one listener. If they have several, we |
442 // call handle them separately in recursive calls, continuing | 549 // call handle them separately in recursive calls, continuing |
443 // here only when there is only one listener left. | 550 // here only when there is only one listener left. |
444 while (listeners._nextListener != null) { | 551 while (listeners._nextListener != null) { |
445 _FutureListener listener = listeners; | 552 _FutureListener listener = listeners; |
446 listeners = listener._nextListener; | 553 listeners = listener._nextListener; |
447 listener._nextListener = null; | 554 listener._nextListener = null; |
448 _propagateToListeners(source, listener); | 555 _propagateToListeners(source, listener); |
449 } | 556 } |
450 _FutureListener listener = listeners; | 557 _FutureListener listener = listeners; |
| 558 final sourceResult = source._resultOrListeners; |
451 // Do the actual propagation. | 559 // Do the actual propagation. |
452 // Set initial state of listenerHasValue and listenerValueOrError. These | 560 // Set initial state of listenerHasError and listenerValueOrError. These |
453 // variables are updated, with the outcome of potential callbacks. | 561 // variables are updated with the outcome of potential callbacks. |
454 bool listenerHasValue = true; | 562 // Non-error results, including futures, are stored in |
455 final sourceValue = hasError ? null : source._value; | 563 // listenerValueOrError and listenerHasError is set to false. Errors |
456 var listenerValueOrError = sourceValue; | 564 // are stored in listenerValueOrError as an [AsyncError] and |
457 // Set to true if a whenComplete needs to wait for a future. | 565 // listenerHasError is set to true. |
458 // The whenComplete action will resume the propagation by itself. | 566 bool listenerHasError = hasError; |
459 bool isPropagationAborted = false; | 567 var listenerValueOrError = sourceResult; |
460 // TODO(floitsch): mark the listener as pending completion. Currently | 568 |
461 // we can't do this, since the markPendingCompletion verifies that | |
462 // the future is not already marked (or chained). | |
463 // Only if we either have an error or callbacks, go into this, somewhat | 569 // Only if we either have an error or callbacks, go into this, somewhat |
464 // expensive, branch. Here we'll enter/leave the zone. Many futures | 570 // expensive, branch. Here we'll enter/leave the zone. Many futures |
465 // doesn't have callbacks, so this is a significant optimization. | 571 // don't have callbacks, so this is a significant optimization. |
466 if (hasError || (listener.handlesValue || listener.handlesComplete)) { | 572 if (hasError || listener.handlesValue || listener.handlesComplete) { |
467 Zone zone = listener._zone; | 573 Zone zone = listener._zone; |
468 if (hasError && !source._zone.inSameErrorZone(zone)) { | 574 if (hasError && !source._zone.inSameErrorZone(zone)) { |
469 // Don't cross zone boundaries with errors. | 575 // Don't cross zone boundaries with errors. |
470 AsyncError asyncError = source._error; | 576 AsyncError asyncError = source._error; |
471 source._zone.handleUncaughtError( | 577 source._zone.handleUncaughtError( |
472 asyncError.error, asyncError.stackTrace); | 578 asyncError.error, asyncError.stackTrace); |
473 return; | 579 return; |
474 } | 580 } |
475 | 581 |
476 Zone oldZone; | 582 Zone oldZone; |
477 if (!identical(Zone.current, zone)) { | 583 if (!identical(Zone.current, zone)) { |
478 // Change zone if it's not current. | 584 // Change zone if it's not current. |
479 oldZone = Zone._enter(zone); | 585 oldZone = Zone._enter(zone); |
480 } | 586 } |
481 | 587 |
482 bool handleValueCallback() { | 588 // These callbacks are abstracted to isolate the try/catch blocks |
483 try { | 589 // from the rest of the code to work around a V8 glass jaw. |
484 listenerValueOrError = zone.runUnary(listener._onValue, | |
485 sourceValue); | |
486 return true; | |
487 } catch (e, s) { | |
488 listenerValueOrError = new AsyncError(e, s); | |
489 return false; | |
490 } | |
491 } | |
492 | |
493 void handleError() { | |
494 AsyncError asyncError = source._error; | |
495 bool matchesTest = true; | |
496 if (listener.hasErrorTest) { | |
497 _FutureErrorTest test = listener._errorTest; | |
498 try { | |
499 matchesTest = zone.runUnary(test, asyncError.error); | |
500 } catch (e, s) { | |
501 listenerValueOrError = identical(asyncError.error, e) ? | |
502 asyncError : new AsyncError(e, s); | |
503 listenerHasValue = false; | |
504 return; | |
505 } | |
506 } | |
507 Function errorCallback = listener._onError; | |
508 if (matchesTest && errorCallback != null) { | |
509 try { | |
510 if (errorCallback is ZoneBinaryCallback) { | |
511 listenerValueOrError = zone.runBinary(errorCallback, | |
512 asyncError.error, | |
513 asyncError.stackTrace); | |
514 } else { | |
515 listenerValueOrError = zone.runUnary(errorCallback, | |
516 asyncError.error); | |
517 } | |
518 } catch (e, s) { | |
519 listenerValueOrError = identical(asyncError.error, e) ? | |
520 asyncError : new AsyncError(e, s); | |
521 listenerHasValue = false; | |
522 return; | |
523 } | |
524 listenerHasValue = true; | |
525 } else { | |
526 // Copy over the error from the source. | |
527 listenerValueOrError = asyncError; | |
528 listenerHasValue = false; | |
529 } | |
530 } | |
531 | |
532 void handleWhenCompleteCallback() { | 590 void handleWhenCompleteCallback() { |
| 591 // The whenComplete-handler is not combined with normal value/error |
| 592 // handling. This means at most one handleX method is called per |
| 593 // listener. |
| 594 assert(!listener.handlesValue); |
| 595 assert(!listener.handlesError); |
533 var completeResult; | 596 var completeResult; |
534 try { | 597 try { |
535 completeResult = zone.run(listener._whenCompleteAction); | 598 completeResult = listener.handleWhenComplete(); |
536 } catch (e, s) { | 599 } catch (e, s) { |
537 if (hasError && identical(source._error.error, e)) { | 600 if (hasError && identical(source._error.error, e)) { |
538 listenerValueOrError = source._error; | 601 listenerValueOrError = source._error; |
539 } else { | 602 } else { |
540 listenerValueOrError = new AsyncError(e, s); | 603 listenerValueOrError = new AsyncError(e, s); |
541 } | 604 } |
542 listenerHasValue = false; | 605 listenerHasError = true; |
543 return; | 606 return; |
544 } | 607 } |
545 if (completeResult is Future) { | 608 if (completeResult is Future) { |
546 _Future result = listener.result; | 609 if (completeResult is _Future && completeResult._isComplete) { |
547 result._isChained = true; | 610 if (completeResult._hasError) { |
548 isPropagationAborted = true; | 611 listenerValueOrError = completeResult._error; |
549 completeResult.then((ignored) { | 612 listenerHasError = true; |
550 _propagateToListeners(source, new _FutureListener.chain(result)); | |
551 }, onError: (error, [stackTrace]) { | |
552 // When there is an error, we have to make the error the new | |
553 // result of the current listener. | |
554 if (completeResult is! _Future) { | |
555 // This should be a rare case. | |
556 completeResult = new _Future(); | |
557 completeResult._setError(error, stackTrace); | |
558 } | 613 } |
559 _propagateToListeners(completeResult, | 614 // Otherwise use the existing result of source. |
560 new _FutureListener.chain(result)); | 615 return; |
561 }); | 616 } |
| 617 // We have to wait for the completeResult future to complete |
| 618 // before knowing if it's an error or we should use the result |
| 619 // of source. |
| 620 var originalSource = source; |
| 621 listenerValueOrError = completeResult.then((_) => originalSource); |
| 622 listenerHasError = false; |
562 } | 623 } |
563 } | 624 } |
564 | 625 |
565 if (!hasError) { | 626 void handleValueCallback() { |
| 627 try { |
| 628 listenerValueOrError = listener.handleValue(sourceResult); |
| 629 } catch (e, s) { |
| 630 listenerValueOrError = new AsyncError(e, s); |
| 631 listenerHasError = true; |
| 632 } |
| 633 } |
| 634 |
| 635 void handleError() { |
| 636 try { |
| 637 AsyncError asyncError = source._error; |
| 638 if (listener.matchesErrorTest(asyncError) && |
| 639 listener.hasErrorCallback) { |
| 640 listenerValueOrError = listener.handleError(asyncError); |
| 641 listenerHasError = false; |
| 642 } |
| 643 } catch (e, s) { |
| 644 if (identical(source._error.error, e)) { |
| 645 listenerValueOrError = source._error; |
| 646 } else { |
| 647 listenerValueOrError = new AsyncError(e, s); |
| 648 } |
| 649 listenerHasError = true; |
| 650 } |
| 651 } |
| 652 |
| 653 |
| 654 if (listener.handlesComplete) { |
| 655 handleWhenCompleteCallback(); |
| 656 } else if (!hasError) { |
566 if (listener.handlesValue) { | 657 if (listener.handlesValue) { |
567 listenerHasValue = handleValueCallback(); | 658 handleValueCallback(); |
568 } | 659 } |
569 } else { | 660 } else { |
570 handleError(); | 661 if (listener.handlesError) { |
| 662 handleError(); |
| 663 } |
571 } | 664 } |
572 if (listener.handlesComplete) { | 665 |
573 handleWhenCompleteCallback(); | |
574 } | |
575 // If we changed zone, oldZone will not be null. | 666 // If we changed zone, oldZone will not be null. |
576 if (oldZone != null) Zone._leave(oldZone); | 667 if (oldZone != null) Zone._leave(oldZone); |
577 | 668 |
578 if (isPropagationAborted) return; | |
579 // If the listener's value is a future we need to chain it. Note that | 669 // If the listener's value is a future we need to chain it. Note that |
580 // this can only happen if there is a callback. Since 'is' checks | 670 // this can only happen if there is a callback. |
581 // can be expensive, we're trying to avoid it. | 671 if (listenerValueOrError is Future) { |
582 if (listenerHasValue && | |
583 !identical(sourceValue, listenerValueOrError) && | |
584 listenerValueOrError is Future) { | |
585 Future chainSource = listenerValueOrError; | 672 Future chainSource = listenerValueOrError; |
586 // Shortcut if the chain-source is already completed. Just continue | 673 // Shortcut if the chain-source is already completed. Just continue |
587 // the loop. | 674 // the loop. |
588 _Future result = listener.result; | 675 _Future result = listener.result; |
589 if (chainSource is _Future) { | 676 if (chainSource is _Future) { |
590 if (chainSource._isComplete) { | 677 if (chainSource._isComplete) { |
591 // propagate the value (simulating a tail call). | 678 listeners = result._removeListeners(); |
592 result._isChained = true; | 679 result._cloneResult(chainSource); |
593 source = chainSource; | 680 source = chainSource; |
594 listeners = new _FutureListener.chain(result); | |
595 continue; | 681 continue; |
596 } else { | 682 } else { |
597 _chainCoreFuture(chainSource, result); | 683 _chainCoreFuture(chainSource, result); |
598 } | 684 } |
599 } else { | 685 } else { |
600 _chainForeignFuture(chainSource, result); | 686 _chainForeignFuture(chainSource, result); |
601 } | 687 } |
602 return; | 688 return; |
603 } | 689 } |
604 } | 690 } |
605 _Future result = listener.result; | 691 _Future result = listener.result; |
606 listeners = result._removeListeners(); | 692 listeners = result._removeListeners(); |
607 if (listenerHasValue) { | 693 if (!listenerHasError) { |
608 result._setValue(listenerValueOrError); | 694 result._setValue(listenerValueOrError); |
609 } else { | 695 } else { |
610 AsyncError asyncError = listenerValueOrError; | 696 AsyncError asyncError = listenerValueOrError; |
611 result._setErrorObject(asyncError); | 697 result._setErrorObject(asyncError); |
612 } | 698 } |
613 // Prepare for next round. | 699 // Prepare for next round. |
614 source = result; | 700 source = result; |
615 } | 701 } |
616 } | 702 } |
617 | 703 |
618 Future timeout(Duration timeLimit, {onTimeout()}) { | 704 Future<T> timeout(Duration timeLimit, {onTimeout()}) { |
619 if (_isComplete) return new _Future.immediate(this); | 705 if (_isComplete) return new _Future.immediate(this); |
620 _Future result = new _Future(); | 706 _Future<T> result = new _Future<T>(); |
621 Timer timer; | 707 Timer timer; |
622 if (onTimeout == null) { | 708 if (onTimeout == null) { |
623 timer = new Timer(timeLimit, () { | 709 timer = new Timer(timeLimit, () { |
624 result._completeError(new TimeoutException("Future not completed", | 710 result._completeError(new TimeoutException("Future not completed", |
625 timeLimit)); | 711 timeLimit)); |
626 }); | 712 }); |
627 } else { | 713 } else { |
628 Zone zone = Zone.current; | 714 Zone zone = Zone.current; |
629 onTimeout = zone.registerCallback(onTimeout); | 715 onTimeout = zone.registerCallback(onTimeout); |
630 timer = new Timer(timeLimit, () { | 716 timer = new Timer(timeLimit, () { |
(...skipping 11 matching lines...) Expand all Loading... |
642 } | 728 } |
643 }, onError: (e, s) { | 729 }, onError: (e, s) { |
644 if (timer.isActive) { | 730 if (timer.isActive) { |
645 timer.cancel(); | 731 timer.cancel(); |
646 result._completeError(e, s); | 732 result._completeError(e, s); |
647 } | 733 } |
648 }); | 734 }); |
649 return result; | 735 return result; |
650 } | 736 } |
651 } | 737 } |
OLD | NEW |