Chromium Code Reviews| Index: sdk/lib/async/stream_pipe.dart |
| diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart |
| index e089f6ce88dce9180a110615d26ec6df7d2b2ab0..93ed5866d908770b5acfcb32156d2ddab99272a5 100644 |
| --- a/sdk/lib/async/stream_pipe.dart |
| +++ b/sdk/lib/async/stream_pipe.dart |
| @@ -69,84 +69,26 @@ abstract class _ForwardingStream<S, T> extends Stream<T> { |
| // Override the following methods in subclasses to change the behavior. |
| - void _handleData(S data, _EventOutputSink<T> sink) { |
| + void _handleData(S data, _EventSink<T> sink) { |
| var outputData = data; |
| - sink._sendData(outputData); |
| + sink._add(outputData); |
| } |
| - void _handleError(error, _EventOutputSink<T> sink) { |
| - sink._sendError(error); |
| + void _handleError(error, _EventSink<T> sink) { |
| + sink._addError(error); |
| } |
| - void _handleDone(_EventOutputSink<T> sink) { |
| - sink._sendDone(); |
| + void _handleDone(_EventSink<T> sink) { |
| + sink._close(); |
| } |
| } |
| /** |
| - * Common behavior of [StreamSubscription] classes. |
| - * |
| - * Stores and allows updating of the event handlers of a [StreamSubscription]. |
| - */ |
| -abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { |
| - // TODO(ahe): Restore type when feature is implemented in dart2js |
| - // checked mode. http://dartbug.com/7733 |
| - var /* _DataHandler<T> */ _onData; |
| - _ErrorHandler _onError; |
| - _DoneHandler _onDone; |
| - |
| - _BaseStreamSubscription(this._onData, |
| - this._onError, |
| - this._onDone) { |
| - if (_onData == null) _onData = _nullDataHandler; |
| - if (_onError == null) _onError = _nullErrorHandler; |
| - if (_onDone == null) _onDone = _nullDoneHandler; |
| - } |
| - |
| - // StreamSubscription interface. |
| - void onData(void handleData(T event)) { |
| - if (handleData == null) handleData = _nullDataHandler; |
| - _onData = handleData; |
| - } |
| - |
| - void onError(void handleError(error)) { |
| - if (handleError == null) handleError = _nullErrorHandler; |
| - _onError = handleError; |
| - } |
| - |
| - void onDone(void handleDone()) { |
| - if (handleDone == null) handleDone = _nullDoneHandler; |
| - _onDone = handleDone; |
| - } |
| - |
| - void pause([Future resumeSignal]); |
| - |
| - void resume(); |
| - |
| - void cancel(); |
| - |
| - Future asFuture([var futureValue]) { |
| - _FutureImpl<T> result = new _FutureImpl<T>(); |
| - |
| - // Overwrite the onDone and onError handlers. |
| - onDone(() { result._setValue(futureValue); }); |
| - onError((error) { |
| - cancel(); |
| - result._setError(error); |
| - }); |
| - |
| - return result; |
| - } |
| -} |
| - |
| - |
| -/** |
| * Abstract superclass for subscriptions that forward to other subscriptions. |
| */ |
| class _ForwardingStreamSubscription<S, T> |
| - extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
| + extends _BufferingStreamSubscription<T> { |
| final _ForwardingStream<S, T> _stream; |
| - final bool _cancelOnError; |
| StreamSubscription<S> _subscription; |
| @@ -154,60 +96,46 @@ class _ForwardingStreamSubscription<S, T> |
| void onData(T data), |
| void onError(error), |
| void onDone(), |
| - this._cancelOnError) |
| - : super(onData, onError, onDone) { |
| - // Don't unsubscribe on incoming error, only if we send an error forwards. |
| + bool cancelOnError) |
| + : super(onData, onError, onDone, cancelOnError, null) { |
| _subscription = |
| _stream._source.listen(_handleData, |
| onError: _handleError, |
| onDone: _handleDone); |
| } |
| - // StreamSubscription interface. |
| - |
| - void pause([Future resumeSignal]) { |
| - if (_subscription == null) return; |
| - _subscription.pause(resumeSignal); |
| - } |
| - |
| - void resume() { |
| - if (_subscription == null) return; |
| - _subscription.resume(); |
| - } |
| + // _StreamSink interface. |
| + // Transformers sending more than one event have no way to know if the stream |
| + // is cancelled or closed after the first, so we just ignore remaining events. |
|
floitsch
2013/05/22 16:26:29
canceled.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
| - bool get isPaused { |
| - if (_subscription == null) return false; |
| - return _subscription.isPaused; |
| + void _add(T data) { |
| + if (_isClosed) return; |
| + super._add(data); |
| } |
| - void cancel() { |
| - if (_subscription != null) { |
| - _subscription.cancel(); |
| - _subscription = null; |
| - } |
| + void _addError(Object error) { |
| + if (_isClosed) return; |
| + super._addError(error); |
| } |
| - // _EventOutputSink interface. Sends data to this subscription. |
| + // StreamSubscription callbacks. |
| - void _sendData(T data) { |
| - _onData(data); |
| + void _onPause() { |
| + if (_subscription == null) return; |
| + _subscription.pause(); |
| } |
| - void _sendError(error) { |
| - _onError(error); |
| - if (_cancelOnError) { |
| - _subscription.cancel(); |
| - _subscription = null; |
| - } |
| + void _onResume() { |
| + if (_subscription == null) return; |
| + _subscription.resume(); |
| } |
| - void _sendDone() { |
| - // If the transformation sends a done signal, we stop the subscription. |
| + void _onCancel() { |
| if (_subscription != null) { |
| - _subscription.cancel(); |
| + StreamSubscription subscription = _subscription; |
| _subscription = null; |
| + subscription.cancel(); |
| } |
| - _onDone(); |
| } |
| // Methods used as listener on source subscription. |
| @@ -241,16 +169,16 @@ class _WhereStream<T> extends _ForwardingStream<T, T> { |
| _WhereStream(Stream<T> source, bool test(T value)) |
| : _test = test, super(source); |
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| + void _handleData(T inputEvent, _EventSink<T> sink) { |
| bool satisfies; |
| try { |
| satisfies = _test(inputEvent); |
| } catch (e, s) { |
| - sink._sendError(_asyncError(e, s)); |
| + sink._addError(_asyncError(e, s)); |
| return; |
| } |
| if (satisfies) { |
| - sink._sendData(inputEvent); |
| + sink._add(inputEvent); |
| } |
| } |
| } |
| @@ -267,15 +195,15 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> { |
| _MapStream(Stream<S> source, T transform(S event)) |
| : this._transform = transform, super(source); |
| - void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
| + void _handleData(S inputEvent, _EventSink<T> sink) { |
| T outputEvent; |
| try { |
| outputEvent = _transform(inputEvent); |
| } catch (e, s) { |
| - sink._sendError(_asyncError(e, s)); |
| + sink._addError(_asyncError(e, s)); |
| return; |
| } |
| - sink._sendData(outputEvent); |
| + sink._add(outputEvent); |
| } |
| } |
| @@ -288,15 +216,15 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
| _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
| : this._expand = expand, super(source); |
| - void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
| + void _handleData(S inputEvent, _EventSink<T> sink) { |
| try { |
| for (T value in _expand(inputEvent)) { |
| - sink._sendData(value); |
| + sink._add(value); |
| } |
| } catch (e, s) { |
| // If either _expand or iterating the generated iterator throws, |
| // we abort the iteration. |
| - sink._sendError(_asyncError(e, s)); |
| + sink._addError(_asyncError(e, s)); |
| } |
| } |
| } |
| @@ -318,13 +246,13 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
| bool test(error)) |
| : this._transform = transform, this._test = test, super(source); |
| - void _handleError(Object error, _EventOutputSink<T> sink) { |
| + void _handleError(Object error, _EventSink<T> sink) { |
| bool matches = true; |
| if (_test != null) { |
| try { |
| matches = _test(error); |
| } catch (e, s) { |
| - sink._sendError(_asyncError(e, s)); |
| + sink._addError(_asyncError(e, s)); |
| return; |
| } |
| } |
| @@ -332,11 +260,11 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
| try { |
| _transform(error); |
| } catch (e, s) { |
| - sink._sendError(_asyncError(e, s)); |
| + sink._addError(_asyncError(e, s)); |
| return; |
| } |
| } else { |
| - sink._sendError(error); |
| + sink._addError(error); |
| } |
| } |
| } |
| @@ -352,14 +280,14 @@ class _TakeStream<T> extends _ForwardingStream<T, T> { |
| if (count is! int) throw new ArgumentError(count); |
| } |
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| + void _handleData(T inputEvent, _EventSink<T> sink) { |
| if (_remaining > 0) { |
| - sink._sendData(inputEvent); |
| + sink._add(inputEvent); |
| _remaining -= 1; |
| if (_remaining == 0) { |
| // Closing also unsubscribes all subscribers, which unsubscribes |
| // this from source. |
| - sink._sendDone(); |
| + sink._close(); |
| } |
| } |
| } |
| @@ -372,20 +300,20 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
| _TakeWhileStream(Stream<T> source, bool test(T value)) |
| : this._test = test, super(source); |
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| + void _handleData(T inputEvent, _EventSink<T> sink) { |
| bool satisfies; |
| try { |
| satisfies = _test(inputEvent); |
| } catch (e, s) { |
| - sink._sendError(_asyncError(e, s)); |
| + sink._addError(_asyncError(e, s)); |
| // The test didn't say true. Didn't say false either, but we stop anyway. |
| - sink._sendDone(); |
| + sink._close(); |
| return; |
| } |
| if (satisfies) { |
| - sink._sendData(inputEvent); |
| + sink._add(inputEvent); |
| } else { |
| - sink._sendDone(); |
| + sink._close(); |
| } |
| } |
| } |
| @@ -400,12 +328,12 @@ class _SkipStream<T> extends _ForwardingStream<T, T> { |
| if (count is! int || count < 0) throw new ArgumentError(count); |
| } |
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| + void _handleData(T inputEvent, _EventSink<T> sink) { |
| if (_remaining > 0) { |
| _remaining--; |
| return; |
| } |
| - return sink._sendData(inputEvent); |
| + return sink._add(inputEvent); |
| } |
| } |
| @@ -416,23 +344,23 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
| _SkipWhileStream(Stream<T> source, bool test(T value)) |
| : this._test = test, super(source); |
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| + void _handleData(T inputEvent, _EventSink<T> sink) { |
| if (_hasFailed) { |
| - sink._sendData(inputEvent); |
| + sink._add(inputEvent); |
| return; |
| } |
| bool satisfies; |
| try { |
| satisfies = _test(inputEvent); |
| } catch (e, s) { |
| - sink._sendError(_asyncError(e, s)); |
| + sink._addError(_asyncError(e, s)); |
| // A failure to return a boolean is considered "not matching". |
| _hasFailed = true; |
| return; |
| } |
| if (!satisfies) { |
| _hasFailed = true; |
| - sink._sendData(inputEvent); |
| + sink._add(inputEvent); |
| } |
| } |
| } |
| @@ -448,10 +376,10 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
| _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
| : _equals = equals, super(source); |
| - void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
| + void _handleData(T inputEvent, _EventSink<T> sink) { |
| if (identical(_previous, _SENTINEL)) { |
| _previous = inputEvent; |
| - return sink._sendData(inputEvent); |
| + return sink._add(inputEvent); |
| } else { |
| bool isEqual; |
| try { |
| @@ -461,11 +389,11 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
| isEqual = _equals(_previous, inputEvent); |
| } |
| } catch (e, s) { |
| - sink._sendError(_asyncError(e, s)); |
| + sink._addError(_asyncError(e, s)); |
| return null; |
| } |
| if (!isEqual) { |
| - sink._sendData(inputEvent); |
| + sink._add(inputEvent); |
| _previous = inputEvent; |
| } |
| } |