Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(725)

Side by Side Diff: sdk/lib/async/future_impl.dart

Issue 16240008: Make StreamController be a StreamSink, not just an EventSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Complete rewrite. StreamController is now itself a StreamSink. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 deprecatedFutureValue(_FutureImpl future) => 7 deprecatedFutureValue(_FutureImpl future) =>
8 future._isComplete ? future._resultOrListeners : null; 8 future._isComplete ? future._resultOrListeners : null;
9 9
10 abstract class _Completer<T> implements Completer<T> { 10 abstract class _Completer<T> implements Completer<T> {
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
42 _Zone.current.handleUncaughtError(error); 42 _Zone.current.handleUncaughtError(error);
43 } 43 }
44 } 44 }
45 45
46 bool get isCompleted => _isComplete; 46 bool get isCompleted => _isComplete;
47 } 47 }
48 48
49 class _AsyncCompleter<T> extends _Completer<T> { 49 class _AsyncCompleter<T> extends _Completer<T> {
50 void _setFutureValue(T value) { 50 void _setFutureValue(T value) {
51 _FutureImpl future = this.future; 51 _FutureImpl future = this.future;
52 runAsync(() { future._setValue(value); }); 52 future._asyncSetValue(value);
53 } 53 }
54 54
55 void _setFutureError(error) { 55 void _setFutureError(error) {
56 _FutureImpl future = this.future; 56 _FutureImpl future = this.future;
57 runAsync(() { future._setError(error); }); 57 future._asyncSetError(error);
58 } 58 }
59 } 59 }
60 60
61 class _SyncCompleter<T> extends _Completer<T> { 61 class _SyncCompleter<T> extends _Completer<T> {
62 void _setFutureValue(T value) { 62 void _setFutureValue(T value) {
63 _FutureImpl future = this.future; 63 _FutureImpl future = this.future;
64 future._setValue(value); 64 future._setValue(value);
65 } 65 }
66 66
67 void _setFutureError(error) { 67 void _setFutureError(error) {
(...skipping 19 matching lines...) Expand all
87 void _sendError(error); 87 void _sendError(error);
88 88
89 bool _inSameErrorZone(_Zone otherZone); 89 bool _inSameErrorZone(_Zone otherZone);
90 } 90 }
91 91
92 /** Adapter for a [_FutureImpl] to be a future result listener. */ 92 /** Adapter for a [_FutureImpl] to be a future result listener. */
93 class _FutureListenerWrapper<T> implements _FutureListener<T> { 93 class _FutureListenerWrapper<T> implements _FutureListener<T> {
94 _FutureImpl future; 94 _FutureImpl future;
95 _FutureListener _nextListener; 95 _FutureListener _nextListener;
96 _FutureListenerWrapper(this.future); 96 _FutureListenerWrapper(this.future);
97 _sendValue(T value) { future._setValue(value); } 97 _sendValue(T value) { future._setValueUnchecked(value); }
98 _sendError(error) { future._setError(error); } 98 _sendError(error) { future._setErrorUnchecked(error); }
99 bool _inSameErrorZone(_Zone otherZone) => future._inSameErrorZone(otherZone); 99 bool _inSameErrorZone(_Zone otherZone) => future._inSameErrorZone(otherZone);
100 } 100 }
101 101
102 /** 102 /**
103 * This listener is installed at error-zone boundaries. It signals an 103 * This listener is installed at error-zone boundaries. It signals an
104 * uncaught error in the zone of origin when an error is sent from one error 104 * uncaught error in the zone of origin when an error is sent from one error
105 * zone to another. 105 * zone to another.
106 * 106 *
107 * When a Future is listening to another Future and they have not been 107 * When a Future is listening to another Future and they have not been
108 * instantiated in the same error-zone then Futures put an instance of this 108 * instantiated in the same error-zone then Futures put an instance of this
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
155 155
156 class _FutureImpl<T> implements Future<T> { 156 class _FutureImpl<T> implements Future<T> {
157 // State of the future. The state determines the interpretation of the 157 // State of the future. The state determines the interpretation of the
158 // [resultOrListeners] field. 158 // [resultOrListeners] field.
159 // TODO(lrn): rename field since it can also contain a chained future. 159 // TODO(lrn): rename field since it can also contain a chained future.
160 160
161 /// Initial state, waiting for a result. In this state, the 161 /// Initial state, waiting for a result. In this state, the
162 /// [resultOrListeners] field holds a single-linked list of 162 /// [resultOrListeners] field holds a single-linked list of
163 /// [FutureListener] listeners. 163 /// [FutureListener] listeners.
164 static const int _INCOMPLETE = 0; 164 static const int _INCOMPLETE = 0;
165 /// Pending completion. Set when completed using [_asyncSetValue] or
166 /// [_asyncSetError]. It is an error to try to complete it again.
167 static const int _PENDING_COMPLETE = 1;
165 /// The future has been chained to another future. The result of that 168 /// The future has been chained to another future. The result of that
166 /// other future becomes the result of this future as well. 169 /// other future becomes the result of this future as well.
167 /// In this state, the [resultOrListeners] field holds the future that 170 /// In this state, the [resultOrListeners] field holds the future that
168 /// will give the result to this future. Both existing and new listeners are 171 /// will give the result to this future. Both existing and new listeners are
169 /// forwarded directly to the other future. 172 /// forwarded directly to the other future.
170 static const int _CHAINED = 1; 173 static const int _CHAINED = 2;
171 /// The future has been chained to another future, but there hasn't been 174 /// The future has been chained to another future, but there hasn't been
172 /// any listeners added to this future yet. If it is completed with an 175 /// any listeners added to this future yet. If it is completed with an
173 /// error, the error will be considered unhandled. 176 /// error, the error will be considered unhandled.
174 static const int _CHAINED_UNLISTENED = 3; 177 static const int _CHAINED_UNLISTENED = 6;
175 /// The future has been completed with a value result. 178 /// The future has been completed with a value result.
176 static const int _VALUE = 4; 179 static const int _VALUE = 8;
177 /// The future has been completed with an error result. 180 /// The future has been completed with an error result.
178 static const int _ERROR = 6; 181 static const int _ERROR = 12;
179 /// Extra bit set when the future has been completed with an error result. 182 /// Extra bit set when the future has been completed with an error result.
180 /// but no listener has been scheduled to receive the error. 183 /// but no listener has been scheduled to receive the error.
181 /// If the bit is still set when a [runAsync] call triggers, the error will 184 /// If the bit is still set when a [runAsync] call triggers, the error will
182 /// be reported to the top-level handler. 185 /// be reported to the top-level handler.
183 /// Assigning a listener before that time will clear the bit. 186 /// Assigning a listener before that time will clear the bit.
184 static const int _UNHANDLED_ERROR = 8; 187 static const int _UNHANDLED_ERROR = 16;
185 188
186 /** Whether the future is complete, and as what. */ 189 /** Whether the future is complete, and as what. */
187 int _state = _INCOMPLETE; 190 int _state = _INCOMPLETE;
188 191
189 final _Zone _zone = _Zone.current.fork(); 192 final _Zone _zone = _Zone.current.fork();
190 193
191 bool get _isChained => (_state & _CHAINED) != 0; 194 bool get _isChained => (_state & _CHAINED) != 0;
192 bool get _hasChainedListener => _state == _CHAINED; 195 bool get _hasChainedListener => _state == _CHAINED;
193 bool get _isComplete => _state >= _VALUE; 196 bool get _isComplete => _state >= _VALUE;
197 bool get _mayComplete => _state == _INCOMPLETE;
194 bool get _hasValue => _state == _VALUE; 198 bool get _hasValue => _state == _VALUE;
195 bool get _hasError => _state >= _ERROR; 199 bool get _hasError => _state >= _ERROR;
196 bool get _hasUnhandledError => _state >= _UNHANDLED_ERROR; 200 bool get _hasUnhandledError => _state >= _UNHANDLED_ERROR;
197 201
198 void _clearUnhandledError() { 202 void _clearUnhandledError() {
199 _state &= ~_UNHANDLED_ERROR; 203 _state &= ~_UNHANDLED_ERROR;
200 } 204 }
201 205
202 /** 206 /**
203 * Either the result, a list of listeners or another future. 207 * Either the result, a list of listeners or another future.
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
283 return new _WhenFuture<T>(action).._subscribeTo(this); 287 return new _WhenFuture<T>(action).._subscribeTo(this);
284 } 288 }
285 289
286 Stream<T> asStream() => new Stream.fromFuture(this); 290 Stream<T> asStream() => new Stream.fromFuture(this);
287 291
288 bool _inSameErrorZone(_Zone otherZone) { 292 bool _inSameErrorZone(_Zone otherZone) {
289 return _zone.inSameErrorZone(otherZone); 293 return _zone.inSameErrorZone(otherZone);
290 } 294 }
291 295
292 void _setValue(T value) { 296 void _setValue(T value) {
293 if (_isComplete) throw new StateError("Future already completed"); 297 if (!_mayComplete) throw new StateError("Future already completed");
298 _setValueUnchecked(value);
299 }
300
301 void _setValueUnchecked(T value) {
294 _FutureListener listeners = _isChained ? null : _removeListeners(); 302 _FutureListener listeners = _isChained ? null : _removeListeners();
295 _state = _VALUE; 303 _state = _VALUE;
296 _resultOrListeners = value; 304 _resultOrListeners = value;
297 while (listeners != null) { 305 while (listeners != null) {
298 _FutureListener listener = listeners; 306 _FutureListener listener = listeners;
299 listeners = listener._nextListener; 307 listeners = listener._nextListener;
300 listener._nextListener = null; 308 listener._nextListener = null;
301 listener._sendValue(value); 309 listener._sendValue(value);
302 } 310 }
303 } 311 }
304 312
305 void _setError(error) { 313 void _setError(Object error) {
306 if (_isComplete) throw new StateError("Future already completed"); 314 if (!_mayComplete) throw new StateError("Future already completed");
315 _setErrorUnchecked(error);
316 }
307 317
318 void _setErrorUnchecked(error) {
floitsch 2013/06/27 15:15:19 If you add the "Object" above, also add it here.
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
308 _FutureListener listeners; 319 _FutureListener listeners;
309 bool hasListeners; 320 bool hasListeners;
310 if (_isChained) { 321 if (_isChained) {
311 listeners = null; 322 listeners = null;
312 hasListeners = (_state == _CHAINED); // and not _CHAINED_UNLISTENED. 323 hasListeners = (_state == _CHAINED); // and not _CHAINED_UNLISTENED.
313 } else { 324 } else {
314 listeners = _removeListeners(); 325 listeners = _removeListeners();
315 hasListeners = (listeners != null); 326 hasListeners = (listeners != null);
316 } 327 }
317 328
318 _state = _ERROR; 329 _state = _ERROR;
319 _resultOrListeners = error; 330 _resultOrListeners = error;
320 331
321 if (!hasListeners) { 332 if (!hasListeners) {
322 _scheduleUnhandledError(); 333 _scheduleUnhandledError();
323 return; 334 return;
324 } 335 }
325 while (listeners != null) { 336 while (listeners != null) {
326 _FutureListener listener = listeners; 337 _FutureListener listener = listeners;
327 listeners = listener._nextListener; 338 listeners = listener._nextListener;
328 listener._nextListener = null; 339 listener._nextListener = null;
329 listener._sendError(error); 340 listener._sendError(error);
330 } 341 }
331 } 342 }
332 343
344 void _asyncSetValue(T value) {
345 if (!_mayComplete) throw new StateError("Future already completed");
346 _state = _PENDING_COMPLETE;
347 runAsync(() { _setValueUnchecked(value); });
348 }
349
350 void _asyncSetError(Object error) {
351 if (!_mayComplete) throw new StateError("Future already completed");
352 _state = _PENDING_COMPLETE;
353 runAsync(() { _setErrorUnchecked(error); });
354 }
355
333 void _scheduleUnhandledError() { 356 void _scheduleUnhandledError() {
334 assert(_state == _ERROR); 357 assert(_state == _ERROR);
335 _state = _ERROR | _UNHANDLED_ERROR; 358 _state = _ERROR | _UNHANDLED_ERROR;
336 // Wait for the rest of the current event's duration to see 359 // Wait for the rest of the current event's duration to see
337 // if a subscriber is added to handle the error. 360 // if a subscriber is added to handle the error.
338 runAsync(() { 361 runAsync(() {
339 if (_hasUnhandledError) { 362 if (_hasUnhandledError) {
340 // No error handler has been added since the error was set. 363 // No error handler has been added since the error was set.
341 _clearUnhandledError(); 364 _clearUnhandledError();
342 // TODO(floitsch): Hook this into unhandled error handling. 365 // TODO(floitsch): Hook this into unhandled error handling.
(...skipping 310 matching lines...) Expand 10 before | Expand all | Expand 10 after
653 _setError(error); 676 _setError(error);
654 }, onError: _setError); 677 }, onError: _setError);
655 return; 678 return;
656 } 679 }
657 } catch (e, s) { 680 } catch (e, s) {
658 error = _asyncError(e, s); 681 error = _asyncError(e, s);
659 } 682 }
660 _setError(error); 683 _setError(error);
661 } 684 }
662 } 685 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698