Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(415)

Side by Side Diff: sdk/lib/async/future_impl.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/future.dart ('k') | sdk/lib/async/merge_stream.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // part of dart.async;
6
7 deprecatedFutureValue(_FutureImpl future) =>
8 future._isComplete ? future._resultOrListeners : null;
9
10
11 class _CompleterImpl<T> implements Completer<T> {
12 final Future<T> future;
13 bool _isComplete = false;
14
15 _CompleterImpl() : future = new _FutureImpl<T>();
16
17 void complete(T value) {
18 if (_isComplete) throw new StateError("Future already completed");
19 _isComplete = true;
20 _FutureImpl future = this.future;
21 future._setValue(value);
22 }
23
24 void completeError(Object error, [Object stackTrace = null]) {
25 if (_isComplete) throw new StateError("Future already completed");
26 _isComplete = true;
27 new Timer(0, (_) {
28 // Never complete an error in the same cycle. Otherwise users might
29 // not have a chance to register their error-handlers.
30 _FutureImpl future = this.future;
31 future._setError(new AsyncError(error, stackTrace));
32 });
33 }
34 }
35
36 /**
37 * A listener on a future.
38 *
39 * When the future completes, the [_sendValue] or [_sendError] method
40 * is invoked with the result.
41 *
42 * Listeners are kept in a linked list.
43 */
44 abstract class _FutureListener<T> {
45 _FutureListener _nextListener;
46 factory _FutureListener.wrap(_FutureImpl future) {
47 return new _FutureListenerWrapper(future);
48 }
49 void _sendValue(T value);
50 void _sendError(AsyncError error);
51 }
52
53 /** Adapter for a [_FutureImpl] to be a future result listener. */
54 class _FutureListenerWrapper<T> implements _FutureListener<T> {
55 _FutureImpl future;
56 _FutureListener _nextListener;
57 _FutureListenerWrapper(this.future);
58 _sendValue(T value) { future._setValue(value); }
59 _sendError(AsyncError error) { future._setError(error); }
60 }
61
62 class _FutureImpl<T> implements Future<T> {
63 static const int _INCOMPLETE = 0;
64 static const int _VALUE = 1;
65 static const int _ERROR = 2;
66
67 /** Whether the future is complete, and as what. */
68 int _state = _INCOMPLETE;
69
70 bool get _isComplete => _state != _INCOMPLETE;
71 bool get _hasValue => _state == _VALUE;
72 bool get _hasError => _state == _ERROR;
73
74 /**
75 * Either the result, or a list of listeners until the future completes.
76 *
77 * The result of the future is either a value or an [AsyncError].
78 * A result is only stored when the future has completed.
79 *
80 * The listeners is an internally linked list of [_FutureListener]s.
81 * Listeners are only remembered while the future is not yet complete.
82 *
83 * Since the result and the listeners cannot occur at the same time,
84 * we can use the same field for both.
85 */
86 var _resultOrListeners;
87
88 _FutureImpl();
89
90 _FutureImpl.immediate(T value) {
91 _state = _VALUE;
92 _resultOrListeners = value;
93 }
94
95 _FutureImpl.immediateError(var error, [Object stackTrace]) {
96 new Timer(0, (_) { _setError(new AsyncError(error, stackTrace)); });
97 }
98
99 factory _FutureImpl.wait(Iterable<Future> futures) {
100 // TODO(ajohnsen): can we do better wrt the generic type T?
101 if (futures.isEmpty) {
102 return new Future<List>.immediate(const []);
103 }
104
105 Completer completer = new Completer<List>();
106 int remaining = futures.length;
107 List values = new List.fixedLength(futures.length);
108
109 // As each future completes, put its value into the corresponding
110 // position in the list of values.
111 int i = 0;
112 for (Future future in futures) {
113 int pos = i++;
114 future.then((Object value) {
115 values[pos] = value;
116 if (--remaining == 0) {
117 completer.complete(values);
118 }
119 });
120 future.catchError((error) {
121 completer.completeError(error.error, error.stackTrace);
122 });
123 }
124
125 return completer.future;
126 }
127
128 Future then(f(T value), { onError(AsyncError error) }) {
129 if (!_isComplete) {
130 if (onError == null) {
131 return new _ThenFuture(f).._subscribeTo(this);
132 }
133 return new _SubscribeFuture(f, onError).._subscribeTo(this);
134 }
135 if (_hasError) {
136 if (onError != null) {
137 return _handleError(onError, null);
138 }
139 // The "f" funtion will never be called, so just return
140 // a future that delegates to this. We don't want to return
141 // this itself to give a signal that the future is complete.
142 return new _FutureWrapper(this);
143 } else {
144 assert(_hasValue);
145 return _handleValue(f);
146 }
147 }
148
149 Future catchError(f(AsyncError asyncError), { bool test(error) }) {
150 if (_hasValue) {
151 return new _FutureWrapper(this);
152 }
153 if (!_isComplete) {
154 return new _CatchErrorFuture(f, test).._subscribeTo(this);
155 } else {
156 return _handleError(f, test);
157 }
158 }
159
160 Future<T> whenComplete(void action()) {
161 _WhenFuture<T> whenFuture = new _WhenFuture<T>(action);
162 if (!_isComplete) {
163 _addListener(whenFuture);
164 } else if (_hasValue) {
165 new Timer(0, (_) {
166 T value = _resultOrListeners;
167 whenFuture._sendValue(value);
168 });
169 } else {
170 assert(_hasError);
171 new Timer(0, (_) {
172 AsyncError error = _resultOrListeners;
173 whenFuture._sendError(error);
174 });
175 }
176 return whenFuture;
177 }
178
179 Future _handleValue(onValue(var value)) {
180 assert(_hasValue);
181 _ThenFuture thenFuture = new _ThenFuture(onValue);
182 T value = _resultOrListeners;
183 new Timer(0, (_) { thenFuture._sendValue(value); });
184 return thenFuture;
185 }
186
187 Future _handleError(onError(AsyncError error), bool test(error)) {
188 assert(_hasError);
189 AsyncError error = _resultOrListeners;
190 _CatchErrorFuture errorFuture = new _CatchErrorFuture(onError, test);
191 new Timer(0, (_) { errorFuture._sendError(error); });
192 return errorFuture;
193 }
194
195 Stream<T> asStream() => new Stream.fromFuture(this);
196
197 void _setValue(T value) {
198 if (_state != _INCOMPLETE) throw new StateError("Future already completed");
199 _FutureListener listeners = _removeListeners();
200 _state = _VALUE;
201 _resultOrListeners = value;
202 while (listeners != null) {
203 _FutureListener listener = listeners;
204 listeners = listener._nextListener;
205 listener._nextListener = null;
206 listener._sendValue(value);
207 }
208 }
209
210 void _setError(AsyncError error) {
211 if (_isComplete) throw new StateError("Future already completed");
212 _FutureListener listeners = _removeListeners();
213 _state = _ERROR;
214 _resultOrListeners = error;
215 if (listeners == null) {
216 error.throwDelayed();
217 return;
218 }
219 while (listeners != null) {
220 _FutureListener listener = listeners;
221 listeners = listener._nextListener;
222 listener._nextListener = null;
223 listener._sendError(error);
224 }
225 }
226
227 void _addListener(_FutureListener listener) {
228 assert(!_isComplete);
229 assert(listener._nextListener == null);
230 listener._nextListener = _resultOrListeners;
231 _resultOrListeners = listener;
232 }
233
234 _FutureListener _removeListeners() {
235 // Reverse listeners before returning them, so the resulting list is in
236 // subscription order.
237 assert(!_isComplete);
238 _FutureListener current = _resultOrListeners;
239 _resultOrListeners = null;
240 _FutureListener prev = null;
241 while (current != null) {
242 _FutureListener next = current._nextListener;
243 current._nextListener = prev;
244 prev = current;
245 current = next;
246 }
247 return prev;
248 }
249
250 /**
251 * Make another [_FutureImpl] receive the result of this one.
252 *
253 * If this future is already complete, the [future] is notified
254 * immediately. This function is only called during event resolution
255 * where it's acceptable to send an event.
256 */
257 void _chain(_FutureImpl future) {
258 if (!_isComplete) {
259 _addListener(future._asListener());
260 } else if (_hasValue) {
261 future._setValue(_resultOrListeners);
262 } else {
263 assert(_hasError);
264 future._setError(_resultOrListeners);
265 }
266 }
267
268 _FutureListener _asListener() => new _FutureListener.wrap(this);
269 }
270
271 /**
272 * Transforming future base class.
273 *
274 * A transforming future is itself a future and a future listener.
275 * Subclasses override [_sendValue]/[_sendError] to intercept
276 * the results of a previous future.
277 */
278 abstract class _TransformFuture<S, T> extends _FutureImpl<T>
279 implements _FutureListener<S> {
280 // _FutureListener implementation.
281 _FutureListener _nextListener;
282
283 void _sendValue(S value);
284
285 void _sendError(AsyncError error);
286
287 void _subscribeTo(_FutureImpl future) {
288 future._addListener(this);
289 }
290
291 /**
292 * Helper function to hand the result of transforming an incoming event.
293 *
294 * If the result is itself a [Future], this future is linked to that
295 * future's output. If not, this future is completed with the result.
296 */
297 void _setOrChainValue(var result) {
298 if (result is Future) {
299 // Result should be a Future<T>.
300 if (result is _FutureImpl) {
301 _FutureImpl chainFuture = result;
302 chainFuture._chain(this);
303 return;
304 } else {
305 Future future = result;
306 future.then(_setValue,
307 onError: _setError);
308 return;
309 }
310 } else {
311 // Result must be of type T.
312 _setValue(result);
313 }
314 }
315 }
316
317 /** The onValue and onError handlers return either a value or a future */
318 typedef dynamic _FutureOnValue<T>(T value);
319 typedef dynamic _FutureOnError(AsyncError error);
320 /** Test used by [Future.catchError] to handle skip some errors. */
321 typedef bool _FutureErrorTest(var error);
322 /** Used by [WhenFuture]. */
323 typedef void _FutureAction();
324
325 /** Future returned by [Future.then] with no [:onError:] parameter. */
326 class _ThenFuture<S, T> extends _TransformFuture<S, T> {
327 final _FutureOnValue<S> _onValue;
328
329 _ThenFuture(this._onValue);
330
331 _sendValue(S value) {
332 assert(_onValue != null);
333 var result;
334 try {
335 result = _onValue(value);
336 } catch (e, s) {
337 _setError(new AsyncError(e, s));
338 return;
339 }
340 _setOrChainValue(result);
341 }
342
343 void _sendError(AsyncError error) {
344 _setError(error);
345 }
346 }
347
348 /** Future returned by [Future.catchError]. */
349 class _CatchErrorFuture<T> extends _TransformFuture<T,T> {
350 final _FutureErrorTest _test;
351 final _FutureOnError _onError;
352
353 _CatchErrorFuture(this._onError, this._test);
354
355 _sendValue(T value) {
356 _setValue(value);
357 }
358
359 _sendError(AsyncError error) {
360 assert(_onError != null);
361 // if _test is supplied, check if it returns true, otherwise just
362 // forward the error unmodified.
363 if (_test != null) {
364 bool matchesTest;
365 try {
366 matchesTest = _test(error.error);
367 } catch (e, s) {
368 _setError(new AsyncError.withCause(e, s, error));
369 return;
370 }
371 if (!matchesTest) {
372 _setError(error);
373 return;
374 }
375 }
376 // Act on the error, and use the result as this future's result.
377 var result;
378 try {
379 result = _onError(error);
380 } catch (e, s) {
381 _setError(new AsyncError.withCause(e, s, error));
382 return;
383 }
384 _setOrChainValue(result);
385 }
386 }
387
388 /** Future returned by [Future.then] with an [:onError:] parameter. */
389 class _SubscribeFuture<S, T> extends _ThenFuture<S, T> {
390 final _FutureOnError _onError;
391
392 _SubscribeFuture(onValue(S value), this._onError) : super(onValue);
393
394 // The _sendValue method is inherited from ThenFuture.
395
396 void _sendError(AsyncError error) {
397 assert(_onError != null);
398 var result;
399 try {
400 result = _onError(error);
401 } catch (e, s) {
402 _setError(new AsyncError.withCause(e, s, error));
403 return;
404 }
405 _setOrChainValue(result);
406 }
407 }
408
409 /** Future returned by [Future.whenComplete]. */
410 class _WhenFuture<T> extends _TransformFuture<T, T> {
411 final _FutureAction _action;
412
413 _WhenFuture(this._action);
414
415 void _sendValue(T value) {
416 try {
417 _action();
418 } catch (e, s) {
419 _setError(new AsyncError(e, s));
420 return;
421 }
422 _setValue(value);
423 }
424
425 void _sendError(AsyncError error) {
426 try {
427 _action();
428 } catch (e, s) {
429 error = new AsyncError.withCause(e, s, error);
430 }
431 _setError(error);
432 }
433 }
434
435 /**
436 * Thin wrapper around a [Future].
437 *
438 * This is used to return a "new" [Future] that effectively work just
439 * as an existing [Future], without making this discoverable by comparing
440 * identities.
441 */
442 class _FutureWrapper<T> implements Future<T> {
443 final Future<T> _future;
444
445 _FutureWrapper(this._future);
446
447 Future then(function(T value), { onError(AsyncError error) }) {
448 return _future.then(function, onError: onError);
449 }
450
451 Future catchError(function(AsyncError error), {bool test(var error)}) {
452 return _future.catchError(function, test: test);
453 }
454
455 Future whenComplete(void action()) {
456 return _future.whenComplete(action);
457 }
458
459 Stream<T> asStream() => new Stream.fromFuture(this);
460 }
OLDNEW
« no previous file with comments | « sdk/lib/async/future.dart ('k') | sdk/lib/async/merge_stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698