| Index: sdk/lib/async/stream_pipe.dart
|
| diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
|
| index 643d14227afca6b41cda51b7ede42bbf8ae8b465..12063bd928b65cc24970c571c2ad020746740666 100644
|
| --- a/sdk/lib/async/stream_pipe.dart
|
| +++ b/sdk/lib/async/stream_pipe.dart
|
| @@ -5,9 +5,8 @@
|
| part of dart.async;
|
|
|
| /** Runs user code and takes actions depending on success or failure. */
|
| -_runUserCode(userCode(),
|
| - onSuccess(value),
|
| - onError(error, StackTrace stackTrace)) {
|
| +_runUserCode(
|
| + userCode(), onSuccess(value), onError(error, StackTrace stackTrace)) {
|
| try {
|
| onSuccess(userCode());
|
| } catch (e, s) {
|
| @@ -24,10 +23,8 @@ _runUserCode(userCode(),
|
|
|
| /** Helper function to cancel a subscription and wait for the potential future,
|
| before completing with an error. */
|
| -void _cancelAndError(StreamSubscription subscription,
|
| - _Future future,
|
| - error,
|
| - StackTrace stackTrace) {
|
| +void _cancelAndError(StreamSubscription subscription, _Future future, error,
|
| + StackTrace stackTrace) {
|
| var cancelFuture = subscription.cancel();
|
| if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) {
|
| cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
|
| @@ -37,8 +34,7 @@ void _cancelAndError(StreamSubscription subscription,
|
| }
|
|
|
| void _cancelAndErrorWithReplacement(StreamSubscription subscription,
|
| - _Future future,
|
| - error, StackTrace stackTrace) {
|
| + _Future future, error, StackTrace stackTrace) {
|
| AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
|
| if (replacement != null) {
|
| error = _nonNullError(replacement.error);
|
| @@ -68,7 +64,6 @@ void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
|
| }
|
| }
|
|
|
| -
|
| /**
|
| * A [Stream] that forwards subscriptions to another stream.
|
| *
|
| @@ -86,18 +81,13 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
|
| bool get isBroadcast => _source.isBroadcast;
|
|
|
| StreamSubscription<T> listen(void onData(T value),
|
| - { Function onError,
|
| - void onDone(),
|
| - bool cancelOnError }) {
|
| + {Function onError, void onDone(), bool cancelOnError}) {
|
| cancelOnError = identical(true, cancelOnError);
|
| return _createSubscription(onData, onError, onDone, cancelOnError);
|
| }
|
|
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| + StreamSubscription<T> _createSubscription(void onData(T data),
|
| + Function onError, void onDone(), bool cancelOnError) {
|
| return new _ForwardingStreamSubscription<S, T>(
|
| this, onData, onError, onDone, cancelOnError);
|
| }
|
| @@ -105,7 +95,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
|
| // Override the following methods in subclasses to change the behavior.
|
|
|
| void _handleData(S data, _EventSink<T> sink) {
|
| - sink._add(data as Object /*=T*/);
|
| + sink._add(data as Object/*=T*/);
|
| }
|
|
|
| void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
|
| @@ -127,12 +117,10 @@ class _ForwardingStreamSubscription<S, T>
|
| StreamSubscription<S> _subscription;
|
|
|
| _ForwardingStreamSubscription(this._stream, void onData(T data),
|
| - Function onError, void onDone(),
|
| - bool cancelOnError)
|
| + Function onError, void onDone(), bool cancelOnError)
|
| : super(onData, onError, onDone, cancelOnError) {
|
| - _subscription = _stream._source.listen(_handleData,
|
| - onError: _handleError,
|
| - onDone: _handleDone);
|
| + _subscription = _stream._source
|
| + .listen(_handleData, onError: _handleError, onDone: _handleDone);
|
| }
|
|
|
| // _StreamSink interface.
|
| @@ -200,12 +188,12 @@ void _addErrorWithReplacement(_EventSink sink, error, stackTrace) {
|
| sink._addError(error, stackTrace);
|
| }
|
|
|
| -
|
| class _WhereStream<T> extends _ForwardingStream<T, T> {
|
| final _Predicate<T> _test;
|
|
|
| _WhereStream(Stream<T> source, bool test(T value))
|
| - : _test = test, super(source);
|
| + : _test = test,
|
| + super(source);
|
|
|
| void _handleData(T inputEvent, _EventSink<T> sink) {
|
| bool satisfies;
|
| @@ -221,7 +209,6 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
|
| }
|
| }
|
|
|
| -
|
| typedef T _Transformation<S, T>(S value);
|
|
|
| /**
|
| @@ -231,7 +218,8 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
|
| final _Transformation<S, T> _transform;
|
|
|
| _MapStream(Stream<S> source, T transform(S event))
|
| - : this._transform = transform, super(source);
|
| + : this._transform = transform,
|
| + super(source);
|
|
|
| void _handleData(S inputEvent, _EventSink<T> sink) {
|
| T outputEvent;
|
| @@ -252,7 +240,8 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
|
| final _Transformation<S, Iterable<T>> _expand;
|
|
|
| _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
|
| - : this._expand = expand, super(source);
|
| + : this._expand = expand,
|
| + super(source);
|
|
|
| void _handleData(S inputEvent, _EventSink<T> sink) {
|
| try {
|
| @@ -267,7 +256,6 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
|
| }
|
| }
|
|
|
| -
|
| typedef bool _ErrorTest(error);
|
|
|
| /**
|
| @@ -278,10 +266,10 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
|
| final Function _transform;
|
| final _ErrorTest _test;
|
|
|
| - _HandleErrorStream(Stream<T> source,
|
| - Function onError,
|
| - bool test(error))
|
| - : this._transform = onError, this._test = test, super(source);
|
| + _HandleErrorStream(Stream<T> source, Function onError, bool test(error))
|
| + : this._transform = onError,
|
| + this._test = test,
|
| + super(source);
|
|
|
| void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
|
| bool matches = true;
|
| @@ -310,22 +298,19 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
|
| }
|
| }
|
|
|
| -
|
| class _TakeStream<T> extends _ForwardingStream<T, T> {
|
| final int _count;
|
|
|
| _TakeStream(Stream<T> source, int count)
|
| - : this._count = count, super(source) {
|
| + : this._count = count,
|
| + super(source) {
|
| // This test is done early to avoid handling an async error
|
| // in the _handleData method.
|
| if (count is! int) throw new ArgumentError(count);
|
| }
|
|
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| + StreamSubscription<T> _createSubscription(void onData(T data),
|
| + Function onError, void onDone(), bool cancelOnError) {
|
| if (_count == 0) {
|
| _source.listen(null).cancel();
|
| return new _DoneStreamSubscription<T>(onDone);
|
| @@ -360,22 +345,26 @@ class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
|
| var _sharedState;
|
|
|
| _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data),
|
| - Function onError, void onDone(),
|
| - bool cancelOnError, this._sharedState)
|
| + Function onError, void onDone(), bool cancelOnError, this._sharedState)
|
| : super(stream, onData, onError, onDone, cancelOnError);
|
|
|
| bool get _flag => _sharedState;
|
| - void set _flag(bool flag) { _sharedState = flag; }
|
| + void set _flag(bool flag) {
|
| + _sharedState = flag;
|
| + }
|
| +
|
| int get _count => _sharedState;
|
| - void set _count(int count) { _sharedState = count; }
|
| + void set _count(int count) {
|
| + _sharedState = count;
|
| + }
|
| }
|
|
|
| -
|
| class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
|
| final _Predicate<T> _test;
|
|
|
| _TakeWhileStream(Stream<T> source, bool test(T value))
|
| - : this._test = test, super(source);
|
| + : this._test = test,
|
| + super(source);
|
|
|
| void _handleData(T inputEvent, _EventSink<T> sink) {
|
| bool satisfies;
|
| @@ -399,17 +388,15 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
|
| final int _count;
|
|
|
| _SkipStream(Stream<T> source, int count)
|
| - : this._count = count, super(source) {
|
| + : this._count = count,
|
| + super(source) {
|
| // This test is done early to avoid handling an async error
|
| // in the _handleData method.
|
| if (count is! int || count < 0) throw new ArgumentError(count);
|
| }
|
|
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| + StreamSubscription<T> _createSubscription(void onData(T data),
|
| + Function onError, void onDone(), bool cancelOnError) {
|
| return new _StateStreamSubscription<T>(
|
| this, onData, onError, onDone, cancelOnError, _count);
|
| }
|
| @@ -429,13 +416,11 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
|
| final _Predicate<T> _test;
|
|
|
| _SkipWhileStream(Stream<T> source, bool test(T value))
|
| - : this._test = test, super(source);
|
| + : this._test = test,
|
| + super(source);
|
|
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| + StreamSubscription<T> _createSubscription(void onData(T data),
|
| + Function onError, void onDone(), bool cancelOnError) {
|
| return new _StateStreamSubscription<T>(
|
| this, onData, onError, onDone, cancelOnError, false);
|
| }
|
| @@ -472,7 +457,8 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| var _previous = _SENTINEL;
|
|
|
| _DistinctStream(Stream<T> source, bool equals(T a, T b))
|
| - : _equals = equals, super(source);
|
| + : _equals = equals,
|
| + super(source);
|
|
|
| void _handleData(T inputEvent, _EventSink<T> sink) {
|
| if (identical(_previous, _SENTINEL)) {
|
| @@ -484,7 +470,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| if (_equals == null) {
|
| isEqual = (_previous == inputEvent);
|
| } else {
|
| - isEqual = _equals(_previous as Object /*=T*/, inputEvent);
|
| + isEqual = _equals(_previous as Object/*=T*/, inputEvent);
|
| }
|
| } catch (e, s) {
|
| _addErrorWithReplacement(sink, e, s);
|
|
|