Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(439)

Unified Diff: sdk/lib/async/future_impl.dart

Issue 16240008: Make StreamController be a StreamSink, not just an EventSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Complete rewrite. StreamController is now itself a StreamSink. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/future_impl.dart
diff --git a/sdk/lib/async/future_impl.dart b/sdk/lib/async/future_impl.dart
index be0b7a23229fff9d0acee3ff592698f11332e542..245c261d8479444c5f0b040fe2eb0a186f02632a 100644
--- a/sdk/lib/async/future_impl.dart
+++ b/sdk/lib/async/future_impl.dart
@@ -49,12 +49,12 @@ abstract class _Completer<T> implements Completer<T> {
class _AsyncCompleter<T> extends _Completer<T> {
void _setFutureValue(T value) {
_FutureImpl future = this.future;
- runAsync(() { future._setValue(value); });
+ future._asyncSetValue(value);
}
void _setFutureError(error) {
_FutureImpl future = this.future;
- runAsync(() { future._setError(error); });
+ future._asyncSetError(error);
}
}
@@ -94,8 +94,8 @@ class _FutureListenerWrapper<T> implements _FutureListener<T> {
_FutureImpl future;
_FutureListener _nextListener;
_FutureListenerWrapper(this.future);
- _sendValue(T value) { future._setValue(value); }
- _sendError(error) { future._setError(error); }
+ _sendValue(T value) { future._setValueUnchecked(value); }
+ _sendError(error) { future._setErrorUnchecked(error); }
bool _inSameErrorZone(_Zone otherZone) => future._inSameErrorZone(otherZone);
}
@@ -162,26 +162,29 @@ class _FutureImpl<T> implements Future<T> {
/// [resultOrListeners] field holds a single-linked list of
/// [FutureListener] listeners.
static const int _INCOMPLETE = 0;
+ /// Pending completion. Set when completed using [_asyncSetValue] or
+ /// [_asyncSetError]. It is an error to try to complete it again.
+ static const int _PENDING_COMPLETE = 1;
/// The future has been chained to another future. The result of that
/// other future becomes the result of this future as well.
/// In this state, the [resultOrListeners] field holds the future that
/// will give the result to this future. Both existing and new listeners are
/// forwarded directly to the other future.
- static const int _CHAINED = 1;
+ static const int _CHAINED = 2;
/// The future has been chained to another future, but there hasn't been
/// any listeners added to this future yet. If it is completed with an
/// error, the error will be considered unhandled.
- static const int _CHAINED_UNLISTENED = 3;
+ static const int _CHAINED_UNLISTENED = 6;
/// The future has been completed with a value result.
- static const int _VALUE = 4;
+ static const int _VALUE = 8;
/// The future has been completed with an error result.
- static const int _ERROR = 6;
+ static const int _ERROR = 12;
/// Extra bit set when the future has been completed with an error result.
/// but no listener has been scheduled to receive the error.
/// If the bit is still set when a [runAsync] call triggers, the error will
/// be reported to the top-level handler.
/// Assigning a listener before that time will clear the bit.
- static const int _UNHANDLED_ERROR = 8;
+ static const int _UNHANDLED_ERROR = 16;
/** Whether the future is complete, and as what. */
int _state = _INCOMPLETE;
@@ -191,6 +194,7 @@ class _FutureImpl<T> implements Future<T> {
bool get _isChained => (_state & _CHAINED) != 0;
bool get _hasChainedListener => _state == _CHAINED;
bool get _isComplete => _state >= _VALUE;
+ bool get _mayComplete => _state == _INCOMPLETE;
bool get _hasValue => _state == _VALUE;
bool get _hasError => _state >= _ERROR;
bool get _hasUnhandledError => _state >= _UNHANDLED_ERROR;
@@ -290,7 +294,11 @@ class _FutureImpl<T> implements Future<T> {
}
void _setValue(T value) {
- if (_isComplete) throw new StateError("Future already completed");
+ if (!_mayComplete) throw new StateError("Future already completed");
+ _setValueUnchecked(value);
+ }
+
+ void _setValueUnchecked(T value) {
_FutureListener listeners = _isChained ? null : _removeListeners();
_state = _VALUE;
_resultOrListeners = value;
@@ -302,9 +310,12 @@ class _FutureImpl<T> implements Future<T> {
}
}
- void _setError(error) {
- if (_isComplete) throw new StateError("Future already completed");
+ void _setError(Object error) {
+ if (!_mayComplete) throw new StateError("Future already completed");
+ _setErrorUnchecked(error);
+ }
+ void _setErrorUnchecked(error) {
floitsch 2013/06/27 15:15:19 If you add the "Object" above, also add it here.
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
_FutureListener listeners;
bool hasListeners;
if (_isChained) {
@@ -330,6 +341,18 @@ class _FutureImpl<T> implements Future<T> {
}
}
+ void _asyncSetValue(T value) {
+ if (!_mayComplete) throw new StateError("Future already completed");
+ _state = _PENDING_COMPLETE;
+ runAsync(() { _setValueUnchecked(value); });
+ }
+
+ void _asyncSetError(Object error) {
+ if (!_mayComplete) throw new StateError("Future already completed");
+ _state = _PENDING_COMPLETE;
+ runAsync(() { _setErrorUnchecked(error); });
+ }
+
void _scheduleUnhandledError() {
assert(_state == _ERROR);
_state = _ERROR | _UNHANDLED_ERROR;

Powered by Google App Engine
This is Rietveld 408576698