Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index 29cfbd1c3ce5cc1055f0f5240734352e7be9b7d3..164d3b242802874e9d1d4e2cbffaeaf6438a3d68 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -49,9 +49,8 @@ abstract class _EventDispatch<T> { |
* but if it happens anyway, the subscription will enqueue the events just as |
* when new events arrive while still firing an old event. |
*/ |
-class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
- _EventSink<T>, |
- _EventDispatch<T> { |
+class _BufferingStreamSubscription<T> |
+ implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> { |
/** The `cancelOnError` flag from the `listen` call. */ |
static const int _STATE_CANCEL_ON_ERROR = 1; |
/** |
@@ -107,10 +106,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
*/ |
_PendingEvents<T> _pending; |
- _BufferingStreamSubscription(void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) |
+ _BufferingStreamSubscription( |
+ void onData(T data), Function onError, void onDone(), bool cancelOnError) |
: _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
this.onData(onData); |
this.onError(onError); |
@@ -197,7 +194,9 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
_Future<E> result = new _Future<E>(); |
// Overwrite the onDone and onError handlers. |
- _onDone = () { result._complete(futureValue); }; |
+ _onDone = () { |
+ result._complete(futureValue); |
+ }; |
_onError = (error, stackTrace) { |
Future cancelFuture = cancel(); |
if (!identical(cancelFuture, Future._nullFuture)) { |
@@ -264,7 +263,7 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
void _addError(Object error, StackTrace stackTrace) { |
if (_isCanceled) return; |
if (_canFire) { |
- _sendError(error, stackTrace); // Reports cancel after sending. |
+ _sendError(error, stackTrace); // Reports cancel after sending. |
} else { |
_addPending(new _DelayedError(error, stackTrace)); |
} |
@@ -346,11 +345,11 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
_state |= _STATE_IN_CALLBACK; |
if (_onError is ZoneBinaryCallback<dynamic, Object, StackTrace>) { |
ZoneBinaryCallback<dynamic, Object, StackTrace> errorCallback = _onError |
- as Object /*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; |
+ as Object/*=ZoneBinaryCallback<dynamic, Object, StackTrace>*/; |
_zone.runBinaryGuarded(errorCallback, error, stackTrace); |
} else { |
_zone.runUnaryGuarded<dynamic, dynamic>( |
- _onError as Object /*=ZoneUnaryCallback<dynamic, dynamic>*/, error); |
+ _onError as Object/*=ZoneUnaryCallback<dynamic, dynamic>*/, error); |
} |
_state &= ~_STATE_IN_CALLBACK; |
} |
@@ -461,9 +460,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
// Stream interface. |
StreamSubscription<T> listen(void onData(T data), |
- { Function onError, |
- void onDone(), |
- bool cancelOnError }) { |
+ {Function onError, void onDone(), bool cancelOnError}) { |
cancelOnError = identical(true, cancelOnError); |
StreamSubscription<T> subscription = |
_createSubscription(onData, onError, onDone, cancelOnError); |
@@ -473,13 +470,10 @@ abstract class _StreamImpl<T> extends Stream<T> { |
// ------------------------------------------------------------------- |
/** Create a subscription object. Called by [subcribe]. */ |
- StreamSubscription<T> _createSubscription( |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) { |
- return new _BufferingStreamSubscription<T>(onData, onError, onDone, |
- cancelOnError); |
+ StreamSubscription<T> _createSubscription(void onData(T data), |
+ Function onError, void onDone(), bool cancelOnError) { |
+ return new _BufferingStreamSubscription<T>( |
+ onData, onError, onDone, cancelOnError); |
} |
/** Hook called when the subscription has been created. */ |
@@ -500,11 +494,8 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
*/ |
_GeneratedStreamImpl(this._pending); |
- StreamSubscription<T> _createSubscription( |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) { |
+ StreamSubscription<T> _createSubscription(void onData(T data), |
+ Function onError, void onDone(), bool cancelOnError) { |
if (_isUsed) throw new StateError("Stream has already been listened to."); |
_isUsed = true; |
return new _BufferingStreamSubscription<T>( |
@@ -512,7 +503,6 @@ class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
} |
} |
- |
/** Pending events object that gets its events from an [Iterable]. */ |
class _IterablePendingEvents<T> extends _PendingEvents<T> { |
// The iterator providing data for data events. |
@@ -554,14 +544,12 @@ class _IterablePendingEvents<T> extends _PendingEvents<T> { |
} |
} |
- |
// Internal helpers. |
// Types of the different handlers on a stream. Types used to type fields. |
typedef void _DataHandler<T>(T value); |
typedef void _DoneHandler(); |
- |
/** Default data handler, does nothing. */ |
void _nullDataHandler(var value) {} |
@@ -573,7 +561,6 @@ void _nullErrorHandler(error, [StackTrace stackTrace]) { |
/** Default done handler, does nothing. */ |
void _nullDoneHandler() {} |
- |
/** A delayed event on a buffering stream subscription. */ |
abstract class _DelayedEvent<T> { |
/** Added as a linked list on the [StreamController]. */ |
@@ -678,11 +665,11 @@ abstract class _PendingEvents<T> { |
void clear(); |
} |
- |
/** Class holding pending events for a [_StreamImpl]. */ |
class _StreamImplEvents<T> extends _PendingEvents<T> { |
/// Single linked list of [_DelayedEvent] objects. |
_DelayedEvent firstPendingEvent = null; |
+ |
/// Last element in the list of pending events. New events are added after it. |
_DelayedEvent lastPendingEvent = null; |
@@ -742,7 +729,9 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
void onData(void handleData(T data)) {} |
void onError(Function handleError) {} |
- void onDone(void handleDone()) { _onDone = handleDone; } |
+ void onDone(void handleDone()) { |
+ _onDone = handleDone; |
+ } |
void pause([Future resumeSignal]) { |
_state += _PAUSED; |
@@ -762,7 +751,9 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> { |
Future<E> asFuture<E>([E futureValue]) { |
_Future<E> result = new _Future<E>(); |
- _onDone = () { result._completeWithValue(null); }; |
+ _onDone = () { |
+ result._completeWithValue(null); |
+ }; |
return result; |
} |
@@ -783,15 +774,18 @@ class _AsBroadcastStream<T> extends Stream<T> { |
_AsBroadcastStreamController<T> _controller; |
StreamSubscription<T> _subscription; |
- _AsBroadcastStream(this._source, |
- void onListenHandler(StreamSubscription<T> subscription), |
- void onCancelHandler(StreamSubscription<T> subscription)) |
+ _AsBroadcastStream( |
+ this._source, |
+ void onListenHandler(StreamSubscription<T> subscription), |
+ void onCancelHandler(StreamSubscription<T> subscription)) |
// TODO(floitsch): the return type should be void and should be |
// inferred. |
- : _onListenHandler = Zone.current.registerUnaryCallback |
- <dynamic, StreamSubscription<T>>(onListenHandler), |
- _onCancelHandler = Zone.current.registerUnaryCallback |
- <dynamic, StreamSubscription<T>>(onCancelHandler), |
+ : _onListenHandler = Zone.current |
+ .registerUnaryCallback<dynamic, StreamSubscription<T>>( |
+ onListenHandler), |
+ _onCancelHandler = Zone.current |
+ .registerUnaryCallback<dynamic, StreamSubscription<T>>( |
+ onCancelHandler), |
_zone = Zone.current { |
_controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
} |
@@ -799,9 +793,7 @@ class _AsBroadcastStream<T> extends Stream<T> { |
bool get isBroadcast => true; |
StreamSubscription<T> listen(void onData(T data), |
- { Function onError, |
- void onDone(), |
- bool cancelOnError}) { |
+ {Function onError, void onDone(), bool cancelOnError}) { |
if (_controller == null || _controller.isClosed) { |
// Return a dummy subscription backed by nothing, since |
// it will only ever send one done event. |
@@ -809,8 +801,7 @@ class _AsBroadcastStream<T> extends Stream<T> { |
} |
if (_subscription == null) { |
_subscription = _source.listen(_controller.add, |
- onError: _controller.addError, |
- onDone: _controller.close); |
+ onError: _controller.addError, onDone: _controller.close); |
} |
cancelOnError = identical(true, cancelOnError); |
return _controller._subscribe(onData, onError, onDone, cancelOnError); |
@@ -843,7 +834,7 @@ class _AsBroadcastStream<T> extends Stream<T> { |
// Called by [_controller] when it has no subscribers left. |
StreamSubscription subscription = _subscription; |
_subscription = null; |
- _controller = null; // Marks the stream as no longer listenable. |
+ _controller = null; // Marks the stream as no longer listenable. |
subscription.cancel(); |
} |
@@ -909,7 +900,6 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
} |
} |
- |
/** |
* Simple implementation of [StreamIterator]. |
* |
@@ -969,7 +959,7 @@ class _StreamIterator<T> implements StreamIterator<T> { |
T get current { |
if (_subscription != null && _isPaused) { |
- return _stateData as Object /*=T*/; |
+ return _stateData as Object/*=T*/; |
} |
return null; |
} |
@@ -997,9 +987,9 @@ class _StreamIterator<T> implements StreamIterator<T> { |
assert(_subscription == null); |
var stateData = _stateData; |
if (stateData != null) { |
- Stream<T> stream = stateData as Object /*=Stream<T>*/; |
- _subscription = stream.listen( |
- _onData, onError: _onError, onDone: _onDone, cancelOnError: true); |
+ Stream<T> stream = stateData as Object/*=Stream<T>*/; |
+ _subscription = stream.listen(_onData, |
+ onError: _onError, onDone: _onDone, cancelOnError: true); |
var future = new _Future<bool>(); |
_stateData = future; |
return future; |
@@ -1014,7 +1004,7 @@ class _StreamIterator<T> implements StreamIterator<T> { |
if (subscription != null) { |
_subscription = null; |
if (!_isPaused) { |
- _Future<bool> future = stateData as Object /*=_Future<bool>*/; |
+ _Future<bool> future = stateData as Object/*=_Future<bool>*/; |
future._asyncComplete(false); |
} |
return subscription.cancel(); |
@@ -1024,7 +1014,7 @@ class _StreamIterator<T> implements StreamIterator<T> { |
void _onData(T data) { |
assert(_subscription != null && !_isPaused); |
- _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
+ _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; |
_stateData = data; |
_isPaused = true; |
moveNextFuture._complete(true); |
@@ -1033,7 +1023,7 @@ class _StreamIterator<T> implements StreamIterator<T> { |
void _onError(Object error, [StackTrace stackTrace]) { |
assert(_subscription != null && !_isPaused); |
- _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
+ _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; |
_subscription = null; |
_stateData = null; |
moveNextFuture._completeError(error, stackTrace); |
@@ -1041,7 +1031,7 @@ class _StreamIterator<T> implements StreamIterator<T> { |
void _onDone() { |
assert(_subscription != null && !_isPaused); |
- _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
+ _Future<bool> moveNextFuture = _stateData as Object/*=_Future<bool>*/; |
_subscription = null; |
_stateData = null; |
moveNextFuture._complete(false); |
@@ -1053,9 +1043,7 @@ class _EmptyStream<T> extends Stream<T> { |
const _EmptyStream() : super._internal(); |
bool get isBroadcast => true; |
StreamSubscription<T> listen(void onData(T data), |
- {Function onError, |
- void onDone(), |
- bool cancelOnError}) { |
+ {Function onError, void onDone(), bool cancelOnError}) { |
return new _DoneStreamSubscription<T>(onDone); |
} |
} |