| Index: tool/input_sdk/lib/async/stream_pipe.dart
|
| diff --git a/tool/input_sdk/lib/async/stream_pipe.dart b/tool/input_sdk/lib/async/stream_pipe.dart
|
| index 3683719062d332df6676ca6cca5a0f7cf5ed5792..1125620aacb3b7b445c3745e8a079c725a980f37 100644
|
| --- a/tool/input_sdk/lib/async/stream_pipe.dart
|
| +++ b/tool/input_sdk/lib/async/stream_pipe.dart
|
| @@ -47,10 +47,15 @@ void _cancelAndErrorWithReplacement(StreamSubscription subscription,
|
| _cancelAndError(subscription, future, error, stackTrace);
|
| }
|
|
|
| +typedef void _ErrorCallback(error, StackTrace stackTrace);
|
| +
|
| /** Helper function to make an onError argument to [_runUserCode]. */
|
| -_cancelAndErrorClosure(StreamSubscription subscription, _Future future) =>
|
| - ((error, StackTrace stackTrace) => _cancelAndError(
|
| - subscription, future, error, stackTrace));
|
| +_ErrorCallback _cancelAndErrorClosure(
|
| + StreamSubscription subscription, _Future future) {
|
| + return (error, StackTrace stackTrace) {
|
| + _cancelAndError(subscription, future, error, stackTrace);
|
| + };
|
| +}
|
|
|
| /** Helper function to cancel a subscription and wait for the potential future,
|
| before completing with a value. */
|
| @@ -100,8 +105,7 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
|
| // Override the following methods in subclasses to change the behavior.
|
|
|
| void _handleData(S data, _EventSink<T> sink) {
|
| - dynamic outputData = data;
|
| - sink._add(outputData);
|
| + sink._add(data as Object /*=T*/);
|
| }
|
|
|
| void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
|
| @@ -161,7 +165,7 @@ class _ForwardingStreamSubscription<S, T>
|
| if (_subscription != null) {
|
| StreamSubscription subscription = _subscription;
|
| _subscription = null;
|
| - subscription.cancel();
|
| + return subscription.cancel();
|
| }
|
| return null;
|
| }
|
| @@ -224,7 +228,7 @@ typedef T _Transformation<S, T>(S value);
|
| * A stream pipe that converts data events before passing them on.
|
| */
|
| class _MapStream<S, T> extends _ForwardingStream<S, T> {
|
| - final _Transformation _transform;
|
| + final _Transformation<S, T> _transform;
|
|
|
| _MapStream(Stream<S> source, T transform(S event))
|
| : this._transform = transform, super(source);
|
| @@ -308,20 +312,32 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
|
|
|
|
|
| class _TakeStream<T> extends _ForwardingStream<T, T> {
|
| - int _remaining;
|
| + final int _count;
|
|
|
| _TakeStream(Stream<T> source, int count)
|
| - : this._remaining = count, super(source) {
|
| + : this._count = count, super(source) {
|
| // This test is done early to avoid handling an async error
|
| // in the _handleData method.
|
| if (count is! int) throw new ArgumentError(count);
|
| }
|
|
|
| + StreamSubscription<T> _createSubscription(
|
| + void onData(T data),
|
| + Function onError,
|
| + void onDone(),
|
| + bool cancelOnError) {
|
| + return new _StateStreamSubscription<T>(
|
| + this, onData, onError, onDone, cancelOnError, _count);
|
| + }
|
| +
|
| void _handleData(T inputEvent, _EventSink<T> sink) {
|
| - if (_remaining > 0) {
|
| + _StateStreamSubscription<T> subscription = sink;
|
| + int count = subscription._count;
|
| + if (count > 0) {
|
| sink._add(inputEvent);
|
| - _remaining -= 1;
|
| - if (_remaining == 0) {
|
| + count -= 1;
|
| + subscription._count = count;
|
| + if (count == 0) {
|
| // Closing also unsubscribes all subscribers, which unsubscribes
|
| // this from source.
|
| sink._close();
|
| @@ -330,6 +346,26 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
|
| }
|
| }
|
|
|
| +/**
|
| + * A [_ForwardingStreamSubscription] with one extra state field.
|
| + *
|
| + * Use by several different classes, some storing an integer, others a bool.
|
| + */
|
| +class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
|
| + // Raw state field. Typed access provided by getters and setters below.
|
| + var _sharedState;
|
| +
|
| + _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data),
|
| + Function onError, void onDone(),
|
| + bool cancelOnError, this._sharedState)
|
| + : super(stream, onData, onError, onDone, cancelOnError);
|
| +
|
| + bool get _flag => _sharedState;
|
| + void set _flag(bool flag) { _sharedState = flag; }
|
| + int get _count => _sharedState;
|
| + void set _count(int count) { _sharedState = count; }
|
| +}
|
| +
|
|
|
| class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
|
| final _Predicate<T> _test;
|
| @@ -356,18 +392,29 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
|
| }
|
|
|
| class _SkipStream<T> extends _ForwardingStream<T, T> {
|
| - int _remaining;
|
| + final int _count;
|
|
|
| _SkipStream(Stream<T> source, int count)
|
| - : this._remaining = count, super(source) {
|
| + : this._count = count, super(source) {
|
| // This test is done early to avoid handling an async error
|
| // in the _handleData method.
|
| if (count is! int || count < 0) throw new ArgumentError(count);
|
| }
|
|
|
| + StreamSubscription<T> _createSubscription(
|
| + void onData(T data),
|
| + Function onError,
|
| + void onDone(),
|
| + bool cancelOnError) {
|
| + return new _StateStreamSubscription<T>(
|
| + this, onData, onError, onDone, cancelOnError, _count);
|
| + }
|
| +
|
| void _handleData(T inputEvent, _EventSink<T> sink) {
|
| - if (_remaining > 0) {
|
| - _remaining--;
|
| + _StateStreamSubscription<T> subscription = sink;
|
| + int count = subscription._count;
|
| + if (count > 0) {
|
| + subscription._count = count - 1;
|
| return;
|
| }
|
| sink._add(inputEvent);
|
| @@ -376,13 +423,23 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
|
|
|
| class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
|
| final _Predicate<T> _test;
|
| - bool _hasFailed = false;
|
|
|
| _SkipWhileStream(Stream<T> source, bool test(T value))
|
| : this._test = test, super(source);
|
|
|
| + StreamSubscription<T> _createSubscription(
|
| + void onData(T data),
|
| + Function onError,
|
| + void onDone(),
|
| + bool cancelOnError) {
|
| + return new _StateStreamSubscription<T>(
|
| + this, onData, onError, onDone, cancelOnError, false);
|
| + }
|
| +
|
| void _handleData(T inputEvent, _EventSink<T> sink) {
|
| - if (_hasFailed) {
|
| + _StateStreamSubscription<T> subscription = sink;
|
| + bool hasFailed = subscription._flag;
|
| + if (hasFailed) {
|
| sink._add(inputEvent);
|
| return;
|
| }
|
| @@ -392,11 +449,11 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
|
| } catch (e, s) {
|
| _addErrorWithReplacement(sink, e, s);
|
| // A failure to return a boolean is considered "not matching".
|
| - _hasFailed = true;
|
| + subscription._flag = true;
|
| return;
|
| }
|
| if (!satisfies) {
|
| - _hasFailed = true;
|
| + subscription._flag = true;
|
| sink._add(inputEvent);
|
| }
|
| }
|
| @@ -423,7 +480,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| if (_equals == null) {
|
| isEqual = (_previous == inputEvent);
|
| } else {
|
| - isEqual = _equals(_previous, inputEvent);
|
| + isEqual = _equals(_previous as Object /*=T*/, inputEvent);
|
| }
|
| } catch (e, s) {
|
| _addErrorWithReplacement(sink, e, s);
|
|
|