| Index: sdk/lib/async/stream_pipe.dart
|
| diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
|
| index e089f6ce88dce9180a110615d26ec6df7d2b2ab0..e5aca172415406ea65dbe4da256ade70f2649958 100644
|
| --- a/sdk/lib/async/stream_pipe.dart
|
| +++ b/sdk/lib/async/stream_pipe.dart
|
| @@ -69,84 +69,26 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
|
|
|
| // Override the following methods in subclasses to change the behavior.
|
|
|
| - void _handleData(S data, _EventOutputSink<T> sink) {
|
| + void _handleData(S data, _EventSink<T> sink) {
|
| var outputData = data;
|
| - sink._sendData(outputData);
|
| + sink._add(outputData);
|
| }
|
|
|
| - void _handleError(error, _EventOutputSink<T> sink) {
|
| - sink._sendError(error);
|
| + void _handleError(error, _EventSink<T> sink) {
|
| + sink._addError(error);
|
| }
|
|
|
| - void _handleDone(_EventOutputSink<T> sink) {
|
| - sink._sendDone();
|
| + void _handleDone(_EventSink<T> sink) {
|
| + sink._close();
|
| }
|
| }
|
|
|
| /**
|
| - * Common behavior of [StreamSubscription] classes.
|
| - *
|
| - * Stores and allows updating of the event handlers of a [StreamSubscription].
|
| - */
|
| -abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> {
|
| - // TODO(ahe): Restore type when feature is implemented in dart2js
|
| - // checked mode. http://dartbug.com/7733
|
| - var /* _DataHandler<T> */ _onData;
|
| - _ErrorHandler _onError;
|
| - _DoneHandler _onDone;
|
| -
|
| - _BaseStreamSubscription(this._onData,
|
| - this._onError,
|
| - this._onDone) {
|
| - if (_onData == null) _onData = _nullDataHandler;
|
| - if (_onError == null) _onError = _nullErrorHandler;
|
| - if (_onDone == null) _onDone = _nullDoneHandler;
|
| - }
|
| -
|
| - // StreamSubscription interface.
|
| - void onData(void handleData(T event)) {
|
| - if (handleData == null) handleData = _nullDataHandler;
|
| - _onData = handleData;
|
| - }
|
| -
|
| - void onError(void handleError(error)) {
|
| - if (handleError == null) handleError = _nullErrorHandler;
|
| - _onError = handleError;
|
| - }
|
| -
|
| - void onDone(void handleDone()) {
|
| - if (handleDone == null) handleDone = _nullDoneHandler;
|
| - _onDone = handleDone;
|
| - }
|
| -
|
| - void pause([Future resumeSignal]);
|
| -
|
| - void resume();
|
| -
|
| - void cancel();
|
| -
|
| - Future asFuture([var futureValue]) {
|
| - _FutureImpl<T> result = new _FutureImpl<T>();
|
| -
|
| - // Overwrite the onDone and onError handlers.
|
| - onDone(() { result._setValue(futureValue); });
|
| - onError((error) {
|
| - cancel();
|
| - result._setError(error);
|
| - });
|
| -
|
| - return result;
|
| - }
|
| -}
|
| -
|
| -
|
| -/**
|
| * Abstract superclass for subscriptions that forward to other subscriptions.
|
| */
|
| class _ForwardingStreamSubscription<S, T>
|
| - extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
|
| + extends _BufferingStreamSubscription<T> {
|
| final _ForwardingStream<S, T> _stream;
|
| - final bool _cancelOnError;
|
|
|
| StreamSubscription<S> _subscription;
|
|
|
| @@ -154,60 +96,46 @@ class _ForwardingStreamSubscription<S, T>
|
| void onData(T data),
|
| void onError(error),
|
| void onDone(),
|
| - this._cancelOnError)
|
| - : super(onData, onError, onDone) {
|
| - // Don't unsubscribe on incoming error, only if we send an error forwards.
|
| + bool cancelOnError)
|
| + : super(onData, onError, onDone, cancelOnError) {
|
| _subscription =
|
| _stream._source.listen(_handleData,
|
| onError: _handleError,
|
| onDone: _handleDone);
|
| }
|
|
|
| - // StreamSubscription interface.
|
| -
|
| - void pause([Future resumeSignal]) {
|
| - if (_subscription == null) return;
|
| - _subscription.pause(resumeSignal);
|
| - }
|
| -
|
| - void resume() {
|
| - if (_subscription == null) return;
|
| - _subscription.resume();
|
| - }
|
| + // _StreamSink interface.
|
| + // Transformers sending more than one event have no way to know if the stream
|
| + // is canceled or closed after the first, so we just ignore remaining events.
|
|
|
| - bool get isPaused {
|
| - if (_subscription == null) return false;
|
| - return _subscription.isPaused;
|
| + void _add(T data) {
|
| + if (_isClosed) return;
|
| + super._add(data);
|
| }
|
|
|
| - void cancel() {
|
| - if (_subscription != null) {
|
| - _subscription.cancel();
|
| - _subscription = null;
|
| - }
|
| + void _addError(Object error) {
|
| + if (_isClosed) return;
|
| + super._addError(error);
|
| }
|
|
|
| - // _EventOutputSink interface. Sends data to this subscription.
|
| + // StreamSubscription callbacks.
|
|
|
| - void _sendData(T data) {
|
| - _onData(data);
|
| + void _onPause() {
|
| + if (_subscription == null) return;
|
| + _subscription.pause();
|
| }
|
|
|
| - void _sendError(error) {
|
| - _onError(error);
|
| - if (_cancelOnError) {
|
| - _subscription.cancel();
|
| - _subscription = null;
|
| - }
|
| + void _onResume() {
|
| + if (_subscription == null) return;
|
| + _subscription.resume();
|
| }
|
|
|
| - void _sendDone() {
|
| - // If the transformation sends a done signal, we stop the subscription.
|
| + void _onCancel() {
|
| if (_subscription != null) {
|
| - _subscription.cancel();
|
| + StreamSubscription subscription = _subscription;
|
| _subscription = null;
|
| + subscription.cancel();
|
| }
|
| - _onDone();
|
| }
|
|
|
| // Methods used as listener on source subscription.
|
| @@ -241,16 +169,16 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
|
| _WhereStream(Stream<T> source, bool test(T value))
|
| : _test = test, super(source);
|
|
|
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventSink<T> sink) {
|
| bool satisfies;
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - sink._sendError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s));
|
| return;
|
| }
|
| if (satisfies) {
|
| - sink._sendData(inputEvent);
|
| + sink._add(inputEvent);
|
| }
|
| }
|
| }
|
| @@ -267,15 +195,15 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
|
| _MapStream(Stream<S> source, T transform(S event))
|
| : this._transform = transform, super(source);
|
|
|
| - void _handleData(S inputEvent, _EventOutputSink<T> sink) {
|
| + void _handleData(S inputEvent, _EventSink<T> sink) {
|
| T outputEvent;
|
| try {
|
| outputEvent = _transform(inputEvent);
|
| } catch (e, s) {
|
| - sink._sendError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s));
|
| return;
|
| }
|
| - sink._sendData(outputEvent);
|
| + sink._add(outputEvent);
|
| }
|
| }
|
|
|
| @@ -288,15 +216,15 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
|
| _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
|
| : this._expand = expand, super(source);
|
|
|
| - void _handleData(S inputEvent, _EventOutputSink<T> sink) {
|
| + void _handleData(S inputEvent, _EventSink<T> sink) {
|
| try {
|
| for (T value in _expand(inputEvent)) {
|
| - sink._sendData(value);
|
| + sink._add(value);
|
| }
|
| } catch (e, s) {
|
| // If either _expand or iterating the generated iterator throws,
|
| // we abort the iteration.
|
| - sink._sendError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s));
|
| }
|
| }
|
| }
|
| @@ -318,13 +246,13 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
|
| bool test(error))
|
| : this._transform = transform, this._test = test, super(source);
|
|
|
| - void _handleError(Object error, _EventOutputSink<T> sink) {
|
| + void _handleError(Object error, _EventSink<T> sink) {
|
| bool matches = true;
|
| if (_test != null) {
|
| try {
|
| matches = _test(error);
|
| } catch (e, s) {
|
| - sink._sendError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s));
|
| return;
|
| }
|
| }
|
| @@ -332,11 +260,11 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
|
| try {
|
| _transform(error);
|
| } catch (e, s) {
|
| - sink._sendError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s));
|
| return;
|
| }
|
| } else {
|
| - sink._sendError(error);
|
| + sink._addError(error);
|
| }
|
| }
|
| }
|
| @@ -352,14 +280,14 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
|
| if (count is! int) throw new ArgumentError(count);
|
| }
|
|
|
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventSink<T> sink) {
|
| if (_remaining > 0) {
|
| - sink._sendData(inputEvent);
|
| + sink._add(inputEvent);
|
| _remaining -= 1;
|
| if (_remaining == 0) {
|
| // Closing also unsubscribes all subscribers, which unsubscribes
|
| // this from source.
|
| - sink._sendDone();
|
| + sink._close();
|
| }
|
| }
|
| }
|
| @@ -372,20 +300,20 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
|
| _TakeWhileStream(Stream<T> source, bool test(T value))
|
| : this._test = test, super(source);
|
|
|
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventSink<T> sink) {
|
| bool satisfies;
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - sink._sendError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s));
|
| // The test didn't say true. Didn't say false either, but we stop anyway.
|
| - sink._sendDone();
|
| + sink._close();
|
| return;
|
| }
|
| if (satisfies) {
|
| - sink._sendData(inputEvent);
|
| + sink._add(inputEvent);
|
| } else {
|
| - sink._sendDone();
|
| + sink._close();
|
| }
|
| }
|
| }
|
| @@ -400,12 +328,12 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
|
| if (count is! int || count < 0) throw new ArgumentError(count);
|
| }
|
|
|
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventSink<T> sink) {
|
| if (_remaining > 0) {
|
| _remaining--;
|
| return;
|
| }
|
| - return sink._sendData(inputEvent);
|
| + return sink._add(inputEvent);
|
| }
|
| }
|
|
|
| @@ -416,23 +344,23 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
|
| _SkipWhileStream(Stream<T> source, bool test(T value))
|
| : this._test = test, super(source);
|
|
|
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventSink<T> sink) {
|
| if (_hasFailed) {
|
| - sink._sendData(inputEvent);
|
| + sink._add(inputEvent);
|
| return;
|
| }
|
| bool satisfies;
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - sink._sendError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s));
|
| // A failure to return a boolean is considered "not matching".
|
| _hasFailed = true;
|
| return;
|
| }
|
| if (!satisfies) {
|
| _hasFailed = true;
|
| - sink._sendData(inputEvent);
|
| + sink._add(inputEvent);
|
| }
|
| }
|
| }
|
| @@ -448,10 +376,10 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| _DistinctStream(Stream<T> source, bool equals(T a, T b))
|
| : _equals = equals, super(source);
|
|
|
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventSink<T> sink) {
|
| if (identical(_previous, _SENTINEL)) {
|
| _previous = inputEvent;
|
| - return sink._sendData(inputEvent);
|
| + return sink._add(inputEvent);
|
| } else {
|
| bool isEqual;
|
| try {
|
| @@ -461,11 +389,11 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| isEqual = _equals(_previous, inputEvent);
|
| }
|
| } catch (e, s) {
|
| - sink._sendError(_asyncError(e, s));
|
| + sink._addError(_asyncError(e, s));
|
| return null;
|
| }
|
| if (!isEqual) {
|
| - sink._sendData(inputEvent);
|
| + sink._add(inputEvent);
|
| _previous = inputEvent;
|
| }
|
| }
|
|
|