Index: sdk/lib/async/stream_pipe.dart |
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart |
index 643d14227afca6b41cda51b7ede42bbf8ae8b465..12063bd928b65cc24970c571c2ad020746740666 100644 |
--- a/sdk/lib/async/stream_pipe.dart |
+++ b/sdk/lib/async/stream_pipe.dart |
@@ -5,9 +5,8 @@ |
part of dart.async; |
/** Runs user code and takes actions depending on success or failure. */ |
-_runUserCode(userCode(), |
- onSuccess(value), |
- onError(error, StackTrace stackTrace)) { |
+_runUserCode( |
+ userCode(), onSuccess(value), onError(error, StackTrace stackTrace)) { |
try { |
onSuccess(userCode()); |
} catch (e, s) { |
@@ -24,10 +23,8 @@ _runUserCode(userCode(), |
/** Helper function to cancel a subscription and wait for the potential future, |
before completing with an error. */ |
-void _cancelAndError(StreamSubscription subscription, |
- _Future future, |
- error, |
- StackTrace stackTrace) { |
+void _cancelAndError(StreamSubscription subscription, _Future future, error, |
+ StackTrace stackTrace) { |
var cancelFuture = subscription.cancel(); |
if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { |
cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); |
@@ -37,8 +34,7 @@ void _cancelAndError(StreamSubscription subscription, |
} |
void _cancelAndErrorWithReplacement(StreamSubscription subscription, |
- _Future future, |
- error, StackTrace stackTrace) { |
+ _Future future, error, StackTrace stackTrace) { |
AsyncError replacement = Zone.current.errorCallback(error, stackTrace); |
if (replacement != null) { |
error = _nonNullError(replacement.error); |
@@ -68,7 +64,6 @@ void _cancelAndValue(StreamSubscription subscription, _Future future, value) { |
} |
} |
- |
/** |
* A [Stream] that forwards subscriptions to another stream. |
* |
@@ -86,18 +81,13 @@ abstract class _ForwardingStream<S, T> extends Stream<T> { |
bool get isBroadcast => _source.isBroadcast; |
StreamSubscription<T> listen(void onData(T value), |
- { Function onError, |
- void onDone(), |
- bool cancelOnError }) { |
+ {Function onError, void onDone(), bool cancelOnError}) { |
cancelOnError = identical(true, cancelOnError); |
return _createSubscription(onData, onError, onDone, cancelOnError); |
} |
- StreamSubscription<T> _createSubscription( |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) { |
+ StreamSubscription<T> _createSubscription(void onData(T data), |
+ Function onError, void onDone(), bool cancelOnError) { |
return new _ForwardingStreamSubscription<S, T>( |
this, onData, onError, onDone, cancelOnError); |
} |
@@ -105,7 +95,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> { |
// Override the following methods in subclasses to change the behavior. |
void _handleData(S data, _EventSink<T> sink) { |
- sink._add(data as Object /*=T*/); |
+ sink._add(data as Object/*=T*/); |
} |
void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { |
@@ -127,12 +117,10 @@ class _ForwardingStreamSubscription<S, T> |
StreamSubscription<S> _subscription; |
_ForwardingStreamSubscription(this._stream, void onData(T data), |
- Function onError, void onDone(), |
- bool cancelOnError) |
+ Function onError, void onDone(), bool cancelOnError) |
: super(onData, onError, onDone, cancelOnError) { |
- _subscription = _stream._source.listen(_handleData, |
- onError: _handleError, |
- onDone: _handleDone); |
+ _subscription = _stream._source |
+ .listen(_handleData, onError: _handleError, onDone: _handleDone); |
} |
// _StreamSink interface. |
@@ -200,12 +188,12 @@ void _addErrorWithReplacement(_EventSink sink, error, stackTrace) { |
sink._addError(error, stackTrace); |
} |
- |
class _WhereStream<T> extends _ForwardingStream<T, T> { |
final _Predicate<T> _test; |
_WhereStream(Stream<T> source, bool test(T value)) |
- : _test = test, super(source); |
+ : _test = test, |
+ super(source); |
void _handleData(T inputEvent, _EventSink<T> sink) { |
bool satisfies; |
@@ -221,7 +209,6 @@ class _WhereStream<T> extends _ForwardingStream<T, T> { |
} |
} |
- |
typedef T _Transformation<S, T>(S value); |
/** |
@@ -231,7 +218,8 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> { |
final _Transformation<S, T> _transform; |
_MapStream(Stream<S> source, T transform(S event)) |
- : this._transform = transform, super(source); |
+ : this._transform = transform, |
+ super(source); |
void _handleData(S inputEvent, _EventSink<T> sink) { |
T outputEvent; |
@@ -252,7 +240,8 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
final _Transformation<S, Iterable<T>> _expand; |
_ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
- : this._expand = expand, super(source); |
+ : this._expand = expand, |
+ super(source); |
void _handleData(S inputEvent, _EventSink<T> sink) { |
try { |
@@ -267,7 +256,6 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
} |
} |
- |
typedef bool _ErrorTest(error); |
/** |
@@ -278,10 +266,10 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
final Function _transform; |
final _ErrorTest _test; |
- _HandleErrorStream(Stream<T> source, |
- Function onError, |
- bool test(error)) |
- : this._transform = onError, this._test = test, super(source); |
+ _HandleErrorStream(Stream<T> source, Function onError, bool test(error)) |
+ : this._transform = onError, |
+ this._test = test, |
+ super(source); |
void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { |
bool matches = true; |
@@ -310,22 +298,19 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
} |
} |
- |
class _TakeStream<T> extends _ForwardingStream<T, T> { |
final int _count; |
_TakeStream(Stream<T> source, int count) |
- : this._count = count, super(source) { |
+ : this._count = count, |
+ super(source) { |
// This test is done early to avoid handling an async error |
// in the _handleData method. |
if (count is! int) throw new ArgumentError(count); |
} |
- StreamSubscription<T> _createSubscription( |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) { |
+ StreamSubscription<T> _createSubscription(void onData(T data), |
+ Function onError, void onDone(), bool cancelOnError) { |
if (_count == 0) { |
_source.listen(null).cancel(); |
return new _DoneStreamSubscription<T>(onDone); |
@@ -360,22 +345,26 @@ class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { |
var _sharedState; |
_StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), |
- Function onError, void onDone(), |
- bool cancelOnError, this._sharedState) |
+ Function onError, void onDone(), bool cancelOnError, this._sharedState) |
: super(stream, onData, onError, onDone, cancelOnError); |
bool get _flag => _sharedState; |
- void set _flag(bool flag) { _sharedState = flag; } |
+ void set _flag(bool flag) { |
+ _sharedState = flag; |
+ } |
+ |
int get _count => _sharedState; |
- void set _count(int count) { _sharedState = count; } |
+ void set _count(int count) { |
+ _sharedState = count; |
+ } |
} |
- |
class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
final _Predicate<T> _test; |
_TakeWhileStream(Stream<T> source, bool test(T value)) |
- : this._test = test, super(source); |
+ : this._test = test, |
+ super(source); |
void _handleData(T inputEvent, _EventSink<T> sink) { |
bool satisfies; |
@@ -399,17 +388,15 @@ class _SkipStream<T> extends _ForwardingStream<T, T> { |
final int _count; |
_SkipStream(Stream<T> source, int count) |
- : this._count = count, super(source) { |
+ : this._count = count, |
+ super(source) { |
// This test is done early to avoid handling an async error |
// in the _handleData method. |
if (count is! int || count < 0) throw new ArgumentError(count); |
} |
- StreamSubscription<T> _createSubscription( |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) { |
+ StreamSubscription<T> _createSubscription(void onData(T data), |
+ Function onError, void onDone(), bool cancelOnError) { |
return new _StateStreamSubscription<T>( |
this, onData, onError, onDone, cancelOnError, _count); |
} |
@@ -429,13 +416,11 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
final _Predicate<T> _test; |
_SkipWhileStream(Stream<T> source, bool test(T value)) |
- : this._test = test, super(source); |
+ : this._test = test, |
+ super(source); |
- StreamSubscription<T> _createSubscription( |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) { |
+ StreamSubscription<T> _createSubscription(void onData(T data), |
+ Function onError, void onDone(), bool cancelOnError) { |
return new _StateStreamSubscription<T>( |
this, onData, onError, onDone, cancelOnError, false); |
} |
@@ -472,7 +457,8 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
var _previous = _SENTINEL; |
_DistinctStream(Stream<T> source, bool equals(T a, T b)) |
- : _equals = equals, super(source); |
+ : _equals = equals, |
+ super(source); |
void _handleData(T inputEvent, _EventSink<T> sink) { |
if (identical(_previous, _SENTINEL)) { |
@@ -484,7 +470,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
if (_equals == null) { |
isEqual = (_previous == inputEvent); |
} else { |
- isEqual = _equals(_previous as Object /*=T*/, inputEvent); |
+ isEqual = _equals(_previous as Object/*=T*/, inputEvent); |
} |
} catch (e, s) { |
_addErrorWithReplacement(sink, e, s); |