| Index: sdk/lib/async/stream_pipe.dart
|
| diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
|
| index e5aca172415406ea65dbe4da256ade70f2649958..e089f6ce88dce9180a110615d26ec6df7d2b2ab0 100644
|
| --- a/sdk/lib/async/stream_pipe.dart
|
| +++ b/sdk/lib/async/stream_pipe.dart
|
| @@ -69,26 +69,84 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
|
|
|
| // Override the following methods in subclasses to change the behavior.
|
|
|
| - void _handleData(S data, _EventSink<T> sink) {
|
| + void _handleData(S data, _EventOutputSink<T> sink) {
|
| var outputData = data;
|
| - sink._add(outputData);
|
| + sink._sendData(outputData);
|
| }
|
|
|
| - void _handleError(error, _EventSink<T> sink) {
|
| - sink._addError(error);
|
| + void _handleError(error, _EventOutputSink<T> sink) {
|
| + sink._sendError(error);
|
| }
|
|
|
| - void _handleDone(_EventSink<T> sink) {
|
| - sink._close();
|
| + void _handleDone(_EventOutputSink<T> sink) {
|
| + sink._sendDone();
|
| }
|
| }
|
|
|
| /**
|
| + * Common behavior of [StreamSubscription] classes.
|
| + *
|
| + * Stores and allows updating of the event handlers of a [StreamSubscription].
|
| + */
|
| +abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> {
|
| + // TODO(ahe): Restore type when feature is implemented in dart2js
|
| + // checked mode. http://dartbug.com/7733
|
| + var /* _DataHandler<T> */ _onData;
|
| + _ErrorHandler _onError;
|
| + _DoneHandler _onDone;
|
| +
|
| + _BaseStreamSubscription(this._onData,
|
| + this._onError,
|
| + this._onDone) {
|
| + if (_onData == null) _onData = _nullDataHandler;
|
| + if (_onError == null) _onError = _nullErrorHandler;
|
| + if (_onDone == null) _onDone = _nullDoneHandler;
|
| + }
|
| +
|
| + // StreamSubscription interface.
|
| + void onData(void handleData(T event)) {
|
| + if (handleData == null) handleData = _nullDataHandler;
|
| + _onData = handleData;
|
| + }
|
| +
|
| + void onError(void handleError(error)) {
|
| + if (handleError == null) handleError = _nullErrorHandler;
|
| + _onError = handleError;
|
| + }
|
| +
|
| + void onDone(void handleDone()) {
|
| + if (handleDone == null) handleDone = _nullDoneHandler;
|
| + _onDone = handleDone;
|
| + }
|
| +
|
| + void pause([Future resumeSignal]);
|
| +
|
| + void resume();
|
| +
|
| + void cancel();
|
| +
|
| + Future asFuture([var futureValue]) {
|
| + _FutureImpl<T> result = new _FutureImpl<T>();
|
| +
|
| + // Overwrite the onDone and onError handlers.
|
| + onDone(() { result._setValue(futureValue); });
|
| + onError((error) {
|
| + cancel();
|
| + result._setError(error);
|
| + });
|
| +
|
| + return result;
|
| + }
|
| +}
|
| +
|
| +
|
| +/**
|
| * Abstract superclass for subscriptions that forward to other subscriptions.
|
| */
|
| class _ForwardingStreamSubscription<S, T>
|
| - extends _BufferingStreamSubscription<T> {
|
| + extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
|
| final _ForwardingStream<S, T> _stream;
|
| + final bool _cancelOnError;
|
|
|
| StreamSubscription<S> _subscription;
|
|
|
| @@ -96,46 +154,60 @@ class _ForwardingStreamSubscription<S, T>
|
| void onData(T data),
|
| void onError(error),
|
| void onDone(),
|
| - bool cancelOnError)
|
| - : super(onData, onError, onDone, cancelOnError) {
|
| + this._cancelOnError)
|
| + : super(onData, onError, onDone) {
|
| + // Don't unsubscribe on incoming error, only if we send an error forwards.
|
| _subscription =
|
| _stream._source.listen(_handleData,
|
| onError: _handleError,
|
| onDone: _handleDone);
|
| }
|
|
|
| - // _StreamSink interface.
|
| - // Transformers sending more than one event have no way to know if the stream
|
| - // is canceled or closed after the first, so we just ignore remaining events.
|
| + // StreamSubscription interface.
|
|
|
| - void _add(T data) {
|
| - if (_isClosed) return;
|
| - super._add(data);
|
| + void pause([Future resumeSignal]) {
|
| + if (_subscription == null) return;
|
| + _subscription.pause(resumeSignal);
|
| }
|
|
|
| - void _addError(Object error) {
|
| - if (_isClosed) return;
|
| - super._addError(error);
|
| + void resume() {
|
| + if (_subscription == null) return;
|
| + _subscription.resume();
|
| }
|
|
|
| - // StreamSubscription callbacks.
|
| + bool get isPaused {
|
| + if (_subscription == null) return false;
|
| + return _subscription.isPaused;
|
| + }
|
|
|
| - void _onPause() {
|
| - if (_subscription == null) return;
|
| - _subscription.pause();
|
| + void cancel() {
|
| + if (_subscription != null) {
|
| + _subscription.cancel();
|
| + _subscription = null;
|
| + }
|
| }
|
|
|
| - void _onResume() {
|
| - if (_subscription == null) return;
|
| - _subscription.resume();
|
| + // _EventOutputSink interface. Sends data to this subscription.
|
| +
|
| + void _sendData(T data) {
|
| + _onData(data);
|
| + }
|
| +
|
| + void _sendError(error) {
|
| + _onError(error);
|
| + if (_cancelOnError) {
|
| + _subscription.cancel();
|
| + _subscription = null;
|
| + }
|
| }
|
|
|
| - void _onCancel() {
|
| + void _sendDone() {
|
| + // If the transformation sends a done signal, we stop the subscription.
|
| if (_subscription != null) {
|
| - StreamSubscription subscription = _subscription;
|
| + _subscription.cancel();
|
| _subscription = null;
|
| - subscription.cancel();
|
| }
|
| + _onDone();
|
| }
|
|
|
| // Methods used as listener on source subscription.
|
| @@ -169,16 +241,16 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
|
| _WhereStream(Stream<T> source, bool test(T value))
|
| : _test = test, super(source);
|
|
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| bool satisfies;
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._sendError(_asyncError(e, s));
|
| return;
|
| }
|
| if (satisfies) {
|
| - sink._add(inputEvent);
|
| + sink._sendData(inputEvent);
|
| }
|
| }
|
| }
|
| @@ -195,15 +267,15 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
|
| _MapStream(Stream<S> source, T transform(S event))
|
| : this._transform = transform, super(source);
|
|
|
| - void _handleData(S inputEvent, _EventSink<T> sink) {
|
| + void _handleData(S inputEvent, _EventOutputSink<T> sink) {
|
| T outputEvent;
|
| try {
|
| outputEvent = _transform(inputEvent);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._sendError(_asyncError(e, s));
|
| return;
|
| }
|
| - sink._add(outputEvent);
|
| + sink._sendData(outputEvent);
|
| }
|
| }
|
|
|
| @@ -216,15 +288,15 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
|
| _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
|
| : this._expand = expand, super(source);
|
|
|
| - void _handleData(S inputEvent, _EventSink<T> sink) {
|
| + void _handleData(S inputEvent, _EventOutputSink<T> sink) {
|
| try {
|
| for (T value in _expand(inputEvent)) {
|
| - sink._add(value);
|
| + sink._sendData(value);
|
| }
|
| } catch (e, s) {
|
| // If either _expand or iterating the generated iterator throws,
|
| // we abort the iteration.
|
| - sink._addError(_asyncError(e, s));
|
| + sink._sendError(_asyncError(e, s));
|
| }
|
| }
|
| }
|
| @@ -246,13 +318,13 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
|
| bool test(error))
|
| : this._transform = transform, this._test = test, super(source);
|
|
|
| - void _handleError(Object error, _EventSink<T> sink) {
|
| + void _handleError(Object error, _EventOutputSink<T> sink) {
|
| bool matches = true;
|
| if (_test != null) {
|
| try {
|
| matches = _test(error);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._sendError(_asyncError(e, s));
|
| return;
|
| }
|
| }
|
| @@ -260,11 +332,11 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
|
| try {
|
| _transform(error);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._sendError(_asyncError(e, s));
|
| return;
|
| }
|
| } else {
|
| - sink._addError(error);
|
| + sink._sendError(error);
|
| }
|
| }
|
| }
|
| @@ -280,14 +352,14 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
|
| if (count is! int) throw new ArgumentError(count);
|
| }
|
|
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| if (_remaining > 0) {
|
| - sink._add(inputEvent);
|
| + sink._sendData(inputEvent);
|
| _remaining -= 1;
|
| if (_remaining == 0) {
|
| // Closing also unsubscribes all subscribers, which unsubscribes
|
| // this from source.
|
| - sink._close();
|
| + sink._sendDone();
|
| }
|
| }
|
| }
|
| @@ -300,20 +372,20 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
|
| _TakeWhileStream(Stream<T> source, bool test(T value))
|
| : this._test = test, super(source);
|
|
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| bool satisfies;
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._sendError(_asyncError(e, s));
|
| // The test didn't say true. Didn't say false either, but we stop anyway.
|
| - sink._close();
|
| + sink._sendDone();
|
| return;
|
| }
|
| if (satisfies) {
|
| - sink._add(inputEvent);
|
| + sink._sendData(inputEvent);
|
| } else {
|
| - sink._close();
|
| + sink._sendDone();
|
| }
|
| }
|
| }
|
| @@ -328,12 +400,12 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
|
| if (count is! int || count < 0) throw new ArgumentError(count);
|
| }
|
|
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| if (_remaining > 0) {
|
| _remaining--;
|
| return;
|
| }
|
| - return sink._add(inputEvent);
|
| + return sink._sendData(inputEvent);
|
| }
|
| }
|
|
|
| @@ -344,23 +416,23 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
|
| _SkipWhileStream(Stream<T> source, bool test(T value))
|
| : this._test = test, super(source);
|
|
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| if (_hasFailed) {
|
| - sink._add(inputEvent);
|
| + sink._sendData(inputEvent);
|
| return;
|
| }
|
| bool satisfies;
|
| try {
|
| satisfies = _test(inputEvent);
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._sendError(_asyncError(e, s));
|
| // A failure to return a boolean is considered "not matching".
|
| _hasFailed = true;
|
| return;
|
| }
|
| if (!satisfies) {
|
| _hasFailed = true;
|
| - sink._add(inputEvent);
|
| + sink._sendData(inputEvent);
|
| }
|
| }
|
| }
|
| @@ -376,10 +448,10 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| _DistinctStream(Stream<T> source, bool equals(T a, T b))
|
| : _equals = equals, super(source);
|
|
|
| - void _handleData(T inputEvent, _EventSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| if (identical(_previous, _SENTINEL)) {
|
| _previous = inputEvent;
|
| - return sink._add(inputEvent);
|
| + return sink._sendData(inputEvent);
|
| } else {
|
| bool isEqual;
|
| try {
|
| @@ -389,11 +461,11 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| isEqual = _equals(_previous, inputEvent);
|
| }
|
| } catch (e, s) {
|
| - sink._addError(_asyncError(e, s));
|
| + sink._sendError(_asyncError(e, s));
|
| return null;
|
| }
|
| if (!isEqual) {
|
| - sink._add(inputEvent);
|
| + sink._sendData(inputEvent);
|
| _previous = inputEvent;
|
| }
|
| }
|
|
|