| Index: sdk/lib/async/stream_pipe.dart
|
| diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
|
| index 07b0c19cb7379db1c6b19d39a9a8418b01d5f9fc..0b4b6ab2cf4d7b1a3ebc90a04bb7258788ad917a 100644
|
| --- a/sdk/lib/async/stream_pipe.dart
|
| +++ b/sdk/lib/async/stream_pipe.dart
|
| @@ -38,11 +38,11 @@ _cancelAndError(StreamSubscription subscription, _FutureImpl future) =>
|
|
|
|
|
| /**
|
| - * A wrapper around a stream that allows independent subscribers.
|
| + * A [StreamTransformer] that forwards events and subscriptions.
|
| *
|
| - * By default [this] subscribes to [_source] and forwards all events to its own
|
| - * subscribers. It does not subscribe until there is a subscriber, and
|
| - * unsubscribes again when there are no subscribers left.
|
| + * By default this transformer subscribes to [_source] and forwards all events
|
| + * to [_stream]. It does not subscribe to [_source] until there is a subscriber,
|
| + * on [_stream] and unsubscribes again when there are no subscribers left.
|
| *
|
| * The events are passed through the [_handleData], [_handleError] and
|
| * [_handleDone] methods. Subclasses are supposed to add handling of some of
|
| @@ -50,83 +50,112 @@ _cancelAndError(StreamSubscription subscription, _FutureImpl future) =>
|
| *
|
| * This class is intended for internal use only.
|
| */
|
| -class _ForwardingMultiStream<S, T> extends _MultiStreamImpl<T> {
|
| - Stream<S> _source = null;
|
| - StreamSubscription _subscription = null;
|
| -
|
| - void _subscribeToSource() {
|
| - _subscription = _source.listen(this._handleData,
|
| - onError: this._handleError,
|
| - onDone: this._handleDone);
|
| - if (_isPaused) {
|
| - _subscription.pause();
|
| +/**
|
| + *
|
| + * Handles backwards propagation of subscription and pause.
|
| + */
|
| +class _ForwardingStreamTransformer<S, T> implements StreamTransformer<S, T> {
|
| + Stream<T> _stream;
|
| + Stream<S> _source;
|
| + StreamSubscription<S> _subscription;
|
| +
|
| + Stream<T> _createOutputStream() {
|
| + if (_source.isSingleSubscription) {
|
| + return new _ForwardingSingleStream<T>(this);
|
| }
|
| + return new _ForwardingMultiStream<T>(this);
|
| }
|
|
|
| - /**
|
| - * Subscribe or unsubscribe on [source] depending on whether
|
| - * [stream] has subscribers.
|
| - */
|
| - void _onSubscriptionStateChange() {
|
| - if (_hasSubscribers) {
|
| - assert(_subscription == null);
|
| - if (_source != null) {
|
| - _subscribeToSource();
|
| + Stream<T> bind(Stream<S> source) {
|
| + if (_source != null) {
|
| + throw new StateError("Transformer source already bound");
|
| + }
|
| + _source = source;
|
| + _stream = _createOutputStream();
|
| + return _stream;
|
| + }
|
| +
|
| + void _onPauseStateChange(bool isPaused) {
|
| + if (isPaused) {
|
| + if (_subscription != null) {
|
| + _subscription.pause();
|
| }
|
| } else {
|
| if (_subscription != null) {
|
| - _subscription.cancel();
|
| - _subscription = null;
|
| + _subscription.resume();
|
| }
|
| }
|
| }
|
|
|
| - void _onPauseStateChange() {
|
| - if (_subscription == null) return;
|
| - if (isPaused) {
|
| - _subscription.pause();
|
| + /**
|
| + * Subscribe or unsubscribe on [_source] depending on whether
|
| + * [_stream] has subscribers.
|
| + */
|
| + void _onSubscriptionStateChange(bool hasSubscribers) {
|
| + if (hasSubscribers) {
|
| + assert(_subscription == null);
|
| + _subscription = _source.listen(this._handleData,
|
| + onError: this._handleError,
|
| + onDone: this._handleDone);
|
| } else {
|
| - _subscription.resume();
|
| + // TODO(lrn): Check why this can happen.
|
| + if (_subscription == null) return;
|
| + _subscription.cancel();
|
| + _subscription = null;
|
| }
|
| }
|
|
|
| void _handleData(S inputEvent) {
|
| var outputEvent = inputEvent;
|
| - _add(outputEvent);
|
| + _stream._add(outputEvent);
|
| }
|
|
|
| void _handleError(AsyncError error) {
|
| - _signalError(error);
|
| + _stream._signalError(error);
|
| }
|
|
|
| void _handleDone() {
|
| - _close();
|
| + _stream._close();
|
| }
|
| }
|
|
|
| +class _ForwardingMultiStream<T> extends _MultiStreamImpl<T> {
|
| + _ForwardingStreamTransformer _transformer;
|
| + _ForwardingMultiStream(this._transformer);
|
|
|
| -abstract class _ForwardingTransformer<S, T> extends _ForwardingMultiStream<S, T>
|
| - implements StreamTransformer<S, T> {
|
| - Stream<T> bind(Stream<S> source) {
|
| - if (_source != null) throw new StateError("Already bound to source.");
|
| - _source = source;
|
| - if (_hasSubscribers) {
|
| - _subscribeToSource();
|
| - }
|
| - return this;
|
| + _onSubscriptionStateChange() {
|
| + _transformer._onSubscriptionStateChange(_hasSubscribers);
|
| + }
|
| +
|
| + _onPauseStateChange() {
|
| + _transformer._onPauseStateChange(_isPaused);
|
| }
|
| }
|
|
|
| +class _ForwardingSingleStream<T> extends _SingleStreamImpl<T> {
|
| + _ForwardingStreamTransformer _transformer;
|
| + _ForwardingSingleStream(this._transformer);
|
| +
|
| + _onSubscriptionStateChange() {
|
| + _transformer._onSubscriptionStateChange(_hasSubscribers);
|
| + }
|
| +
|
| + _onPauseStateChange() {
|
| + _transformer._onPauseStateChange(_isPaused);
|
| + }
|
| +}
|
| +
|
| +
|
| // -------------------------------------------------------------------
|
| // Stream transformers used by the default Stream implementation.
|
| // -------------------------------------------------------------------
|
|
|
| typedef bool _Predicate<T>(T value);
|
|
|
| -class WhereStream<T> extends _ForwardingTransformer<T, T> {
|
| +class WhereTransformer<T> extends _ForwardingStreamTransformer<T, T> {
|
| final _Predicate<T> _test;
|
|
|
| - WhereStream(bool test(T value))
|
| + WhereTransformer(bool test(T value))
|
| : this._test = test;
|
|
|
| void _handleData(T inputEvent) {
|
| @@ -134,11 +163,11 @@ class WhereStream<T> extends _ForwardingTransformer<T, T> {
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - _signalError(_asyncError(e, s));
|
| + _stream._signalError(_asyncError(e, s));
|
| return;
|
| }
|
| if (satisfies) {
|
| - _add(inputEvent);
|
| + _stream._add(inputEvent);
|
| }
|
| }
|
| }
|
| @@ -149,10 +178,10 @@ typedef T _Transformation<S, T>(S value);
|
| /**
|
| * A stream pipe that converts data events before passing them on.
|
| */
|
| -class MapStream<S, T> extends _ForwardingTransformer<S, T> {
|
| +class MapTransformer<S, T> extends _ForwardingStreamTransformer<S, T> {
|
| final _Transformation _transform;
|
|
|
| - MapStream(T transform(S event))
|
| + MapTransformer(T transform(S event))
|
| : this._transform = transform;
|
|
|
| void _handleData(S inputEvent) {
|
| @@ -160,31 +189,31 @@ class MapStream<S, T> extends _ForwardingTransformer<S, T> {
|
| try {
|
| outputEvent = _transform(inputEvent);
|
| } catch (e, s) {
|
| - _signalError(_asyncError(e, s));
|
| + _stream._signalError(_asyncError(e, s));
|
| return;
|
| }
|
| - _add(outputEvent);
|
| + _stream._add(outputEvent);
|
| }
|
| }
|
|
|
| /**
|
| * A stream pipe that converts data events before passing them on.
|
| */
|
| -class ExpandStream<S, T> extends _ForwardingTransformer<S, T> {
|
| +class ExpandTransformer<S, T> extends _ForwardingStreamTransformer<S, T> {
|
| final _Transformation<S, Iterable<T>> _expand;
|
|
|
| - ExpandStream(Iterable<T> expand(S event))
|
| + ExpandTransformer(Iterable<T> expand(S event))
|
| : this._expand = expand;
|
|
|
| void _handleData(S inputEvent) {
|
| try {
|
| for (T value in _expand(inputEvent)) {
|
| - _add(value);
|
| + _stream._add(value);
|
| }
|
| } catch (e, s) {
|
| // If either _expand or iterating the generated iterator throws,
|
| // we abort the iteration.
|
| - _signalError(_asyncError(e, s));
|
| + _stream._signalError(_asyncError(e, s));
|
| }
|
| }
|
| }
|
| @@ -197,11 +226,11 @@ typedef bool _ErrorTest(error);
|
| * A stream pipe that converts or disposes error events
|
| * before passing them on.
|
| */
|
| -class HandleErrorStream<T> extends _ForwardingTransformer<T, T> {
|
| +class HandleErrorTransformer<T> extends _ForwardingStreamTransformer<T, T> {
|
| final _ErrorTransformation _transform;
|
| final _ErrorTest _test;
|
|
|
| - HandleErrorStream(void transform(AsyncError event), bool test(error))
|
| + HandleErrorTransformer(void transform(AsyncError event), bool test(error))
|
| : this._transform = transform, this._test = test;
|
|
|
| void _handleError(AsyncError error) {
|
| @@ -210,7 +239,7 @@ class HandleErrorStream<T> extends _ForwardingTransformer<T, T> {
|
| try {
|
| matches = _test(error.error);
|
| } catch (e, s) {
|
| - _signalError(_asyncError(e, s, error));
|
| + _stream._signalError(_asyncError(e, s, error));
|
| return;
|
| }
|
| }
|
| @@ -218,11 +247,11 @@ class HandleErrorStream<T> extends _ForwardingTransformer<T, T> {
|
| try {
|
| _transform(error);
|
| } catch (e, s) {
|
| - _signalError(_asyncError(e, s, error));
|
| + _stream._signalError(_asyncError(e, s, error));
|
| return;
|
| }
|
| } else {
|
| - _signalError(error);
|
| + _stream._signalError(error);
|
| }
|
| }
|
| }
|
| @@ -233,35 +262,39 @@ typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink);
|
| typedef void _TransformDoneHandler<T>(StreamSink<T> sink);
|
|
|
| /**
|
| - * A stream pipe that intercepts all events and can generate any event as
|
| + * A stream transfomer that intercepts all events and can generate any event as
|
| * output.
|
| *
|
| - * Each incoming event on this [StreamSink] is passed to the corresponding
|
| - * provided event handler, along with a [StreamSink] linked to the [output] of
|
| - * this pipe.
|
| - * The handler can then decide which events to send to the output
|
| + * Each incoming event on the source stream is passed to the corresponding
|
| + * provided event handler, along with a [StreamSink] linked to the output
|
| + * Stream.
|
| + * The handler can then decide exactly which events to send to the output.
|
| */
|
| -class PipeStream<S, T> extends _ForwardingTransformer<S, T> {
|
| +class _StreamTransformerImpl<S, T> extends _ForwardingStreamTransformer<S, T> {
|
| final _TransformDataHandler<S, T> _onData;
|
| final _TransformErrorHandler<T> _onError;
|
| final _TransformDoneHandler<T> _onDone;
|
| StreamSink<T> _sink;
|
|
|
| - PipeStream({void onData(S data, StreamSink<T> sink),
|
| - void onError(AsyncError data, StreamSink<T> sink),
|
| - void onDone(StreamSink<T> sink)})
|
| + _StreamTransformerImpl(void onData(S data, StreamSink<T> sink),
|
| + void onError(AsyncError data, StreamSink<T> sink),
|
| + void onDone(StreamSink<T> sink))
|
| : this._onData = (onData == null ? _defaultHandleData : onData),
|
| this._onError = (onError == null ? _defaultHandleError : onError),
|
| - this._onDone = (onDone == null ? _defaultHandleDone : onDone) {
|
| - // Cache the sink wrapper to avoid creating a new one for each event.
|
| - this._sink = new _StreamImplSink(this);
|
| + this._onDone = (onDone == null ? _defaultHandleDone : onDone);
|
| +
|
| + Stream<T> bind(Stream<S> source) {
|
| + Stream<T> stream = super.bind(source);
|
| + // Cache a Sink object to avoid creating a new one for each event.
|
| + _sink = new _StreamImplSink(stream);
|
| + return stream;
|
| }
|
|
|
| void _handleData(S data) {
|
| try {
|
| - return _onData(data, _sink);
|
| + _onData(data, _sink);
|
| } catch (e, s) {
|
| - _signalError(_asyncError(e, s));
|
| + _stream._signalError(_asyncError(e, s));
|
| }
|
| }
|
|
|
| @@ -269,7 +302,7 @@ class PipeStream<S, T> extends _ForwardingTransformer<S, T> {
|
| try {
|
| _onError(error, _sink);
|
| } catch (e, s) {
|
| - _signalError(_asyncError(e, s, error));
|
| + _stream._signalError(_asyncError(e, s, error));
|
| }
|
| }
|
|
|
| @@ -277,12 +310,12 @@ class PipeStream<S, T> extends _ForwardingTransformer<S, T> {
|
| try {
|
| _onDone(_sink);
|
| } catch (e, s) {
|
| - _signalError(_asyncError(e, s));
|
| + _stream._signalError(_asyncError(e, s));
|
| }
|
| }
|
|
|
| /** Default data handler forwards all data. */
|
| - static void _defaultHandleData(dynamic data, StreamSink sink) {
|
| + static void _defaultHandleData(var data, StreamSink sink) {
|
| sink.add(data);
|
| }
|
| /** Default error handler forwards all errors. */
|
| @@ -304,85 +337,11 @@ class _StreamImplSink<T> implements StreamSink<T> {
|
| void close() { _target._close(); }
|
| }
|
|
|
| -/**
|
| - * A stream pipe that intercepts all events and can generate any event as
|
| - * output.
|
| - *
|
| - * Each incoming event on this [StreamSink] is passed to the corresponding
|
| - * method on [transform], along with a [StreamSink] linked to the [output] of
|
| - * this pipe.
|
| - * The handler can then decide which events to send to the output
|
| - */
|
| -class TransformStream<S, T> extends _ForwardingTransformer<S, T> {
|
| - final StreamTransformer<S, T> _transform;
|
| - StreamSink<T> _sink;
|
|
|
| - TransformStream(StreamTransformer<S, T> transform)
|
| - : this._transform = transform {
|
| - // Cache the sink wrapper to avoid creating a new one for each event.
|
| - this._sink = new _StreamImplSink(this);
|
| - }
|
| -
|
| - void _handleData(S data) {
|
| - try {
|
| - return _transform.handleData(data, _sink);
|
| - } catch (e, s) {
|
| - _controller.signalError(_asyncError(e, s));
|
| - }
|
| - }
|
| -
|
| - void _handleError(AsyncError error) {
|
| - try {
|
| - _transform.handleError(error, _sink);
|
| - } catch (e, s) {
|
| - _controller.signalError(_asyncError(e, s, error));
|
| - }
|
| - }
|
| -
|
| - void _handleDone() {
|
| - try {
|
| - _transform.handleDone(_sink);
|
| - } catch (e, s) {
|
| - _controller.signalError(_asyncError(e, s));
|
| - }
|
| - }
|
| -}
|
| -
|
| -
|
| -/** Helper class for transforming three functions into a StreamTransformer. */
|
| -class _StreamTransformerFunctionWrapper<S, T>
|
| - extends _StreamTransformer<S, T> {
|
| - final _TransformDataHandler<S, T> _handleData;
|
| - final _TransformErrorHandler<T> _handleError;
|
| - final _TransformDoneHandler<T> _handleDone;
|
| -
|
| - _StreamTransformerFunctionWrapper({
|
| - void onData(S data, StreamSink<T> sink),
|
| - void onError(AsyncError data, StreamSink<T> sink),
|
| - void onDone(StreamSink<T> sink)})
|
| - : _handleData = onData != null ? onData : PipeStream._defaultHandleData,
|
| - _handleError = onError != null ? onError
|
| - : PipeStream._defaultHandleError,
|
| - _handleDone = onDone != null ? onDone : PipeStream._defaultHandleDone;
|
| -
|
| - void handleData(S data, StreamSink<T> sink) {
|
| - return _handleData(data, sink);
|
| - }
|
| -
|
| - void handleError(AsyncError error, StreamSink<T> sink) {
|
| - _handleError(error, sink);
|
| - }
|
| -
|
| - void handleDone(StreamSink<T> sink) {
|
| - _handleDone(sink);
|
| - }
|
| -}
|
| -
|
| -
|
| -class TakeStream<T> extends _ForwardingTransformer<T, T> {
|
| +class TakeTransformer<T> extends _ForwardingStreamTransformer<T, T> {
|
| int _remaining;
|
|
|
| - TakeStream(int count)
|
| + TakeTransformer(int count)
|
| : this._remaining = count {
|
| // This test is done early to avoid handling an async error
|
| // in the _handleData method.
|
| @@ -391,22 +350,22 @@ class TakeStream<T> extends _ForwardingTransformer<T, T> {
|
|
|
| void _handleData(T inputEvent) {
|
| if (_remaining > 0) {
|
| - _add(inputEvent);
|
| + _stream._add(inputEvent);
|
| _remaining -= 1;
|
| if (_remaining == 0) {
|
| // Closing also unsubscribes all subscribers, which unsubscribes
|
| // this from source.
|
| - _close();
|
| + _stream._close();
|
| }
|
| }
|
| }
|
| }
|
|
|
|
|
| -class TakeWhileStream<T> extends _ForwardingTransformer<T, T> {
|
| +class TakeWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> {
|
| final _Predicate<T> _test;
|
|
|
| - TakeWhileStream(bool test(T value))
|
| + TakeWhileTransformer(bool test(T value))
|
| : this._test = test;
|
|
|
| void _handleData(T inputEvent) {
|
| @@ -414,27 +373,27 @@ class TakeWhileStream<T> extends _ForwardingTransformer<T, T> {
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - _signalError(_asyncError(e, s));
|
| + _stream._signalError(_asyncError(e, s));
|
| // The test didn't say true. Didn't say false either, but we stop anyway.
|
| - _close();
|
| + _stream._close();
|
| return;
|
| }
|
| if (satisfies) {
|
| - _add(inputEvent);
|
| + _stream._add(inputEvent);
|
| } else {
|
| - _close();
|
| + _stream._close();
|
| }
|
| }
|
| }
|
|
|
| -class SkipStream<T> extends _ForwardingTransformer<T, T> {
|
| +class SkipTransformer<T> extends _ForwardingStreamTransformer<T, T> {
|
| int _remaining;
|
|
|
| - SkipStream(int count)
|
| + SkipTransformer(int count)
|
| : this._remaining = count{
|
| // This test is done early to avoid handling an async error
|
| // in the _handleData method.
|
| - if (count is! int) throw new ArgumentError(count);
|
| + if (count is! int || count < 0) throw new ArgumentError(count);
|
| }
|
|
|
| void _handleData(T inputEvent) {
|
| @@ -442,52 +401,52 @@ class SkipStream<T> extends _ForwardingTransformer<T, T> {
|
| _remaining--;
|
| return;
|
| }
|
| - return _add(inputEvent);
|
| + return _stream._add(inputEvent);
|
| }
|
| }
|
|
|
| -class SkipWhileStream<T> extends _ForwardingTransformer<T, T> {
|
| +class SkipWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> {
|
| final _Predicate<T> _test;
|
| bool _hasFailed = false;
|
|
|
| - SkipWhileStream(bool test(T value))
|
| + SkipWhileTransformer(bool test(T value))
|
| : this._test = test;
|
|
|
| void _handleData(T inputEvent) {
|
| if (_hasFailed) {
|
| - _add(inputEvent);
|
| + _stream._add(inputEvent);
|
| }
|
| bool satisfies;
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - _signalError(_asyncError(e, s));
|
| + _stream._signalError(_asyncError(e, s));
|
| // A failure to return a boolean is considered "not matching".
|
| _hasFailed = true;
|
| return;
|
| }
|
| if (!satisfies) {
|
| _hasFailed = true;
|
| - _add(inputEvent);
|
| + _stream._add(inputEvent);
|
| }
|
| }
|
| }
|
|
|
| typedef bool _Equality<T>(T a, T b);
|
|
|
| -class DistinctStream<T> extends _ForwardingTransformer<T, T> {
|
| +class DistinctTransformer<T> extends _ForwardingStreamTransformer<T, T> {
|
| static var _SENTINEL = new Object();
|
|
|
| _Equality<T> _equals;
|
| var _previous = _SENTINEL;
|
|
|
| - DistinctStream(bool equals(T a, T b))
|
| + DistinctTransformer(bool equals(T a, T b))
|
| : _equals = equals;
|
|
|
| void _handleData(T inputEvent) {
|
| if (identical(_previous, _SENTINEL)) {
|
| _previous = inputEvent;
|
| - return _add(inputEvent);
|
| + return _stream._add(inputEvent);
|
| } else {
|
| bool isEqual;
|
| try {
|
| @@ -497,11 +456,11 @@ class DistinctStream<T> extends _ForwardingTransformer<T, T> {
|
| isEqual = _equals(_previous, inputEvent);
|
| }
|
| } catch (e, s) {
|
| - _signalError(_asyncError(e, s));
|
| + _stream._signalError(_asyncError(e, s));
|
| return null;
|
| }
|
| if (!isEqual) {
|
| - _add(inputEvent);
|
| + _stream._add(inputEvent);
|
| _previous = inputEvent;
|
| }
|
| }
|
|
|