| Index: sdk/lib/async/stream_pipe.dart
|
| diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
|
| index aaa34b059362638f8f9b04fad2f50f6eaccc9eea..69210ecabd1f2259571bfe6db85f32fc8942bd19 100644
|
| --- a/sdk/lib/async/stream_pipe.dart
|
| +++ b/sdk/lib/async/stream_pipe.dart
|
| @@ -74,16 +74,16 @@ abstract class _ForwardingStream<S, T> extends Stream<T> {
|
|
|
| // Override the following methods in subclasses to change the behavior.
|
|
|
| - void _handleData(S data, _StreamOutputSink<T> sink) {
|
| + void _handleData(S data, _EventOutputSink<T> sink) {
|
| var outputData = data;
|
| sink._sendData(outputData);
|
| }
|
|
|
| - void _handleError(AsyncError error, _StreamOutputSink<T> sink) {
|
| + void _handleError(AsyncError error, _EventOutputSink<T> sink) {
|
| sink._sendError(error);
|
| }
|
|
|
| - void _handleDone(_StreamOutputSink<T> sink) {
|
| + void _handleDone(_EventOutputSink<T> sink) {
|
| sink._sendDone();
|
| }
|
| }
|
| @@ -136,7 +136,7 @@ abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> {
|
| * Abstract superclass for subscriptions that forward to other subscriptions.
|
| */
|
| class _ForwardingStreamSubscription<S, T>
|
| - extends _BaseStreamSubscription<T> implements _StreamOutputSink<T> {
|
| + extends _BaseStreamSubscription<T> implements _EventOutputSink<T> {
|
| final _ForwardingStream<S, T> _stream;
|
| final bool _unsubscribeOnError;
|
|
|
| @@ -179,7 +179,7 @@ class _ForwardingStreamSubscription<S, T>
|
| _subscription = null;
|
| }
|
|
|
| - // _StreamOutputSink interface. Sends data to this subscription.
|
| + // _EventOutputSink interface. Sends data to this subscription.
|
|
|
| void _sendData(T data) {
|
| _onData(data);
|
| @@ -233,7 +233,7 @@ class _WhereStream<T> extends _ForwardingStream<T, T> {
|
| _WhereStream(Stream<T> source, bool test(T value))
|
| : _test = test, super(source);
|
|
|
| - void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| bool satisfies;
|
| try {
|
| satisfies = _test(inputEvent);
|
| @@ -259,7 +259,7 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> {
|
| _MapStream(Stream<S> source, T transform(S event))
|
| : this._transform = transform, super(source);
|
|
|
| - void _handleData(S inputEvent, _StreamOutputSink<T> sink) {
|
| + void _handleData(S inputEvent, _EventOutputSink<T> sink) {
|
| T outputEvent;
|
| try {
|
| outputEvent = _transform(inputEvent);
|
| @@ -280,7 +280,7 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
|
| _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
|
| : this._expand = expand, super(source);
|
|
|
| - void _handleData(S inputEvent, _StreamOutputSink<T> sink) {
|
| + void _handleData(S inputEvent, _EventOutputSink<T> sink) {
|
| try {
|
| for (T value in _expand(inputEvent)) {
|
| sink._sendData(value);
|
| @@ -310,7 +310,7 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
|
| bool test(error))
|
| : this._transform = transform, this._test = test, super(source);
|
|
|
| - void _handleError(AsyncError error, _StreamOutputSink<T> sink) {
|
| + void _handleError(AsyncError error, _EventOutputSink<T> sink) {
|
| bool matches = true;
|
| if (_test != null) {
|
| try {
|
| @@ -344,7 +344,7 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
|
| if (count is! int) throw new ArgumentError(count);
|
| }
|
|
|
| - void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| if (_remaining > 0) {
|
| sink._sendData(inputEvent);
|
| _remaining -= 1;
|
| @@ -364,7 +364,7 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
|
| _TakeWhileStream(Stream<T> source, bool test(T value))
|
| : this._test = test, super(source);
|
|
|
| - void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| bool satisfies;
|
| try {
|
| satisfies = _test(inputEvent);
|
| @@ -392,7 +392,7 @@ class _SkipStream<T> extends _ForwardingStream<T, T> {
|
| if (count is! int || count < 0) throw new ArgumentError(count);
|
| }
|
|
|
| - void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| if (_remaining > 0) {
|
| _remaining--;
|
| return;
|
| @@ -408,7 +408,7 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
|
| _SkipWhileStream(Stream<T> source, bool test(T value))
|
| : this._test = test, super(source);
|
|
|
| - void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| if (_hasFailed) {
|
| sink._sendData(inputEvent);
|
| }
|
| @@ -439,7 +439,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| _DistinctStream(Stream<T> source, bool equals(T a, T b))
|
| : _equals = equals, super(source);
|
|
|
| - void _handleData(T inputEvent, _StreamOutputSink<T> sink) {
|
| + void _handleData(T inputEvent, _EventOutputSink<T> sink) {
|
| if (identical(_previous, _SENTINEL)) {
|
| _previous = inputEvent;
|
| return sink._sendData(inputEvent);
|
| @@ -465,35 +465,26 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
|
|
| // Stream transformations and event transformations.
|
|
|
| -typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink);
|
| -typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink);
|
| -typedef void _TransformDoneHandler<T>(StreamSink<T> sink);
|
| +typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
|
| +typedef void _TransformErrorHandler<T>(AsyncError data, EventSink<T> sink);
|
| +typedef void _TransformDoneHandler<T>(EventSink<T> sink);
|
|
|
| /** Default data handler forwards all data. */
|
| -void _defaultHandleData(var data, StreamSink sink) {
|
| +void _defaultHandleData(var data, EventSink sink) {
|
| sink.add(data);
|
| }
|
|
|
| /** Default error handler forwards all errors. */
|
| -void _defaultHandleError(AsyncError error, StreamSink sink) {
|
| - sink.signalError(error);
|
| +void _defaultHandleError(AsyncError error, EventSink sink) {
|
| + sink.addError(error);
|
| }
|
|
|
| /** Default done handler forwards done. */
|
| -void _defaultHandleDone(StreamSink sink) {
|
| +void _defaultHandleDone(EventSink sink) {
|
| sink.close();
|
| }
|
|
|
|
|
| -/** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */
|
| -class _StreamImplSink<T> implements StreamSink<T> {
|
| - _StreamImpl<T> _target;
|
| - _StreamImplSink(this._target);
|
| - void add(T data) { _target._add(data); }
|
| - void signalError(AsyncError error) { _target._signalError(error); }
|
| - void close() { _target._close(); }
|
| -}
|
| -
|
| /**
|
| * A [StreamTransformer] that modifies stream events.
|
| *
|
| @@ -511,9 +502,9 @@ class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> {
|
| final _TransformErrorHandler<T> _handleError;
|
| final _TransformDoneHandler<T> _handleDone;
|
|
|
| - _StreamTransformerImpl(void handleData(S data, StreamSink<T> sink),
|
| - void handleError(AsyncError data, StreamSink<T> sink),
|
| - void handleDone(StreamSink<T> sink))
|
| + _StreamTransformerImpl(void handleData(S data, EventSink<T> sink),
|
| + void handleError(AsyncError data, EventSink<T> sink),
|
| + void handleDone(EventSink<T> sink))
|
| : this._handleData = (handleData == null ? _defaultHandleData
|
| : handleData),
|
| this._handleError = (handleError == null ? _defaultHandleError
|
| @@ -521,15 +512,15 @@ class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> {
|
| this._handleDone = (handleDone == null ? _defaultHandleDone
|
| : handleDone);
|
|
|
| - void handleData(S data, StreamSink<T> sink) {
|
| + void handleData(S data, EventSink<T> sink) {
|
| _handleData(data, sink);
|
| }
|
|
|
| - void handleError(AsyncError error, StreamSink<T> sink) {
|
| + void handleError(AsyncError error, EventSink<T> sink) {
|
| _handleError(error, sink);
|
| }
|
|
|
| - void handleDone(StreamSink<T> sink) {
|
| + void handleDone(EventSink<T> sink) {
|
| _handleDone(sink);
|
| }
|
| }
|
|
|