Index: tool/input_sdk/lib/async/stream_pipe.dart |
diff --git a/tool/input_sdk/lib/async/stream_pipe.dart b/tool/input_sdk/lib/async/stream_pipe.dart |
index 3683719062d332df6676ca6cca5a0f7cf5ed5792..1125620aacb3b7b445c3745e8a079c725a980f37 100644 |
--- a/tool/input_sdk/lib/async/stream_pipe.dart |
+++ b/tool/input_sdk/lib/async/stream_pipe.dart |
@@ -47,10 +47,15 @@ void _cancelAndErrorWithReplacement(StreamSubscription subscription, |
_cancelAndError(subscription, future, error, stackTrace); |
} |
+typedef void _ErrorCallback(error, StackTrace stackTrace); |
+ |
/** Helper function to make an onError argument to [_runUserCode]. */ |
-_cancelAndErrorClosure(StreamSubscription subscription, _Future future) => |
- ((error, StackTrace stackTrace) => _cancelAndError( |
- subscription, future, error, stackTrace)); |
+_ErrorCallback _cancelAndErrorClosure( |
+ StreamSubscription subscription, _Future future) { |
+ return (error, StackTrace stackTrace) { |
+ _cancelAndError(subscription, future, error, stackTrace); |
+ }; |
+} |
/** Helper function to cancel a subscription and wait for the potential future, |
before completing with a value. */ |
@@ -100,8 +105,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> { |
// Override the following methods in subclasses to change the behavior. |
void _handleData(S data, _EventSink<T> sink) { |
- dynamic outputData = data; |
- sink._add(outputData); |
+ sink._add(data as Object /*=T*/); |
} |
void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { |
@@ -161,7 +165,7 @@ class _ForwardingStreamSubscription<S, T> |
if (_subscription != null) { |
StreamSubscription subscription = _subscription; |
_subscription = null; |
- subscription.cancel(); |
+ return subscription.cancel(); |
} |
return null; |
} |
@@ -224,7 +228,7 @@ typedef T _Transformation<S, T>(S value); |
* A stream pipe that converts data events before passing them on. |
*/ |
class _MapStream<S, T> extends _ForwardingStream<S, T> { |
- final _Transformation _transform; |
+ final _Transformation<S, T> _transform; |
_MapStream(Stream<S> source, T transform(S event)) |
: this._transform = transform, super(source); |
@@ -308,20 +312,32 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
class _TakeStream<T> extends _ForwardingStream<T, T> { |
- int _remaining; |
+ final int _count; |
_TakeStream(Stream<T> source, int count) |
- : this._remaining = count, super(source) { |
+ : this._count = count, super(source) { |
// This test is done early to avoid handling an async error |
// in the _handleData method. |
if (count is! int) throw new ArgumentError(count); |
} |
+ StreamSubscription<T> _createSubscription( |
+ void onData(T data), |
+ Function onError, |
+ void onDone(), |
+ bool cancelOnError) { |
+ return new _StateStreamSubscription<T>( |
+ this, onData, onError, onDone, cancelOnError, _count); |
+ } |
+ |
void _handleData(T inputEvent, _EventSink<T> sink) { |
- if (_remaining > 0) { |
+ _StateStreamSubscription<T> subscription = sink; |
+ int count = subscription._count; |
+ if (count > 0) { |
sink._add(inputEvent); |
- _remaining -= 1; |
- if (_remaining == 0) { |
+ count -= 1; |
+ subscription._count = count; |
+ if (count == 0) { |
// Closing also unsubscribes all subscribers, which unsubscribes |
// this from source. |
sink._close(); |
@@ -330,6 +346,26 @@ class _TakeStream<T> extends _ForwardingStream<T, T> { |
} |
} |
+/** |
+ * A [_ForwardingStreamSubscription] with one extra state field. |
+ * |
+ * Use by several different classes, some storing an integer, others a bool. |
+ */ |
+class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { |
+ // Raw state field. Typed access provided by getters and setters below. |
+ var _sharedState; |
+ |
+ _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), |
+ Function onError, void onDone(), |
+ bool cancelOnError, this._sharedState) |
+ : super(stream, onData, onError, onDone, cancelOnError); |
+ |
+ bool get _flag => _sharedState; |
+ void set _flag(bool flag) { _sharedState = flag; } |
+ int get _count => _sharedState; |
+ void set _count(int count) { _sharedState = count; } |
+} |
+ |
class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
final _Predicate<T> _test; |
@@ -356,18 +392,29 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
} |
class _SkipStream<T> extends _ForwardingStream<T, T> { |
- int _remaining; |
+ final int _count; |
_SkipStream(Stream<T> source, int count) |
- : this._remaining = count, super(source) { |
+ : this._count = count, super(source) { |
// This test is done early to avoid handling an async error |
// in the _handleData method. |
if (count is! int || count < 0) throw new ArgumentError(count); |
} |
+ StreamSubscription<T> _createSubscription( |
+ void onData(T data), |
+ Function onError, |
+ void onDone(), |
+ bool cancelOnError) { |
+ return new _StateStreamSubscription<T>( |
+ this, onData, onError, onDone, cancelOnError, _count); |
+ } |
+ |
void _handleData(T inputEvent, _EventSink<T> sink) { |
- if (_remaining > 0) { |
- _remaining--; |
+ _StateStreamSubscription<T> subscription = sink; |
+ int count = subscription._count; |
+ if (count > 0) { |
+ subscription._count = count - 1; |
return; |
} |
sink._add(inputEvent); |
@@ -376,13 +423,23 @@ class _SkipStream<T> extends _ForwardingStream<T, T> { |
class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
final _Predicate<T> _test; |
- bool _hasFailed = false; |
_SkipWhileStream(Stream<T> source, bool test(T value)) |
: this._test = test, super(source); |
+ StreamSubscription<T> _createSubscription( |
+ void onData(T data), |
+ Function onError, |
+ void onDone(), |
+ bool cancelOnError) { |
+ return new _StateStreamSubscription<T>( |
+ this, onData, onError, onDone, cancelOnError, false); |
+ } |
+ |
void _handleData(T inputEvent, _EventSink<T> sink) { |
- if (_hasFailed) { |
+ _StateStreamSubscription<T> subscription = sink; |
+ bool hasFailed = subscription._flag; |
+ if (hasFailed) { |
sink._add(inputEvent); |
return; |
} |
@@ -392,11 +449,11 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
} catch (e, s) { |
_addErrorWithReplacement(sink, e, s); |
// A failure to return a boolean is considered "not matching". |
- _hasFailed = true; |
+ subscription._flag = true; |
return; |
} |
if (!satisfies) { |
- _hasFailed = true; |
+ subscription._flag = true; |
sink._add(inputEvent); |
} |
} |
@@ -423,7 +480,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
if (_equals == null) { |
isEqual = (_previous == inputEvent); |
} else { |
- isEqual = _equals(_previous, inputEvent); |
+ isEqual = _equals(_previous as Object /*=T*/, inputEvent); |
} |
} catch (e, s) { |
_addErrorWithReplacement(sink, e, s); |