Index: sdk/lib/async/stream_pipe.dart |
diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart |
index aaa34b059362638f8f9b04fad2f50f6eaccc9eea..69210ecabd1f2259571bfe6db85f32fc8942bd19 100644 |
--- a/sdk/lib/async/stream_pipe.dart |
+++ b/sdk/lib/async/stream_pipe.dart |
@@ -74,16 +74,16 @@ abstract class _ForwardingStream<S, T> extends Stream<T> { |
// Override the following methods in subclasses to change the behavior. |
- void _handleData(S data, _StreamOutputSink<T> sink) { |
+ void _handleData(S data, _EventOutputSink<T> sink) { |
var outputData = data; |
sink._sendData(outputData); |
} |
- void _handleError(AsyncError error, _StreamOutputSink<T> sink) { |
+ void _handleError(AsyncError error, _EventOutputSink<T> sink) { |
sink._sendError(error); |
} |
- void _handleDone(_StreamOutputSink<T> sink) { |
+ void _handleDone(_EventOutputSink<T> sink) { |
sink._sendDone(); |
} |
} |
@@ -136,7 +136,7 @@ abstract class _BaseStreamSubscription<T> implements StreamSubscription<T> { |
* Abstract superclass for subscriptions that forward to other subscriptions. |
*/ |
class _ForwardingStreamSubscription<S, T> |
- extends _BaseStreamSubscription<T> implements _StreamOutputSink<T> { |
+ extends _BaseStreamSubscription<T> implements _EventOutputSink<T> { |
final _ForwardingStream<S, T> _stream; |
final bool _unsubscribeOnError; |
@@ -179,7 +179,7 @@ class _ForwardingStreamSubscription<S, T> |
_subscription = null; |
} |
- // _StreamOutputSink interface. Sends data to this subscription. |
+ // _EventOutputSink interface. Sends data to this subscription. |
void _sendData(T data) { |
_onData(data); |
@@ -233,7 +233,7 @@ class _WhereStream<T> extends _ForwardingStream<T, T> { |
_WhereStream(Stream<T> source, bool test(T value)) |
: _test = test, super(source); |
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
bool satisfies; |
try { |
satisfies = _test(inputEvent); |
@@ -259,7 +259,7 @@ class _MapStream<S, T> extends _ForwardingStream<S, T> { |
_MapStream(Stream<S> source, T transform(S event)) |
: this._transform = transform, super(source); |
- void _handleData(S inputEvent, _StreamOutputSink<T> sink) { |
+ void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
T outputEvent; |
try { |
outputEvent = _transform(inputEvent); |
@@ -280,7 +280,7 @@ class _ExpandStream<S, T> extends _ForwardingStream<S, T> { |
_ExpandStream(Stream<S> source, Iterable<T> expand(S event)) |
: this._expand = expand, super(source); |
- void _handleData(S inputEvent, _StreamOutputSink<T> sink) { |
+ void _handleData(S inputEvent, _EventOutputSink<T> sink) { |
try { |
for (T value in _expand(inputEvent)) { |
sink._sendData(value); |
@@ -310,7 +310,7 @@ class _HandleErrorStream<T> extends _ForwardingStream<T, T> { |
bool test(error)) |
: this._transform = transform, this._test = test, super(source); |
- void _handleError(AsyncError error, _StreamOutputSink<T> sink) { |
+ void _handleError(AsyncError error, _EventOutputSink<T> sink) { |
bool matches = true; |
if (_test != null) { |
try { |
@@ -344,7 +344,7 @@ class _TakeStream<T> extends _ForwardingStream<T, T> { |
if (count is! int) throw new ArgumentError(count); |
} |
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
if (_remaining > 0) { |
sink._sendData(inputEvent); |
_remaining -= 1; |
@@ -364,7 +364,7 @@ class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
_TakeWhileStream(Stream<T> source, bool test(T value)) |
: this._test = test, super(source); |
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
bool satisfies; |
try { |
satisfies = _test(inputEvent); |
@@ -392,7 +392,7 @@ class _SkipStream<T> extends _ForwardingStream<T, T> { |
if (count is! int || count < 0) throw new ArgumentError(count); |
} |
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
if (_remaining > 0) { |
_remaining--; |
return; |
@@ -408,7 +408,7 @@ class _SkipWhileStream<T> extends _ForwardingStream<T, T> { |
_SkipWhileStream(Stream<T> source, bool test(T value)) |
: this._test = test, super(source); |
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
if (_hasFailed) { |
sink._sendData(inputEvent); |
} |
@@ -439,7 +439,7 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
_DistinctStream(Stream<T> source, bool equals(T a, T b)) |
: _equals = equals, super(source); |
- void _handleData(T inputEvent, _StreamOutputSink<T> sink) { |
+ void _handleData(T inputEvent, _EventOutputSink<T> sink) { |
if (identical(_previous, _SENTINEL)) { |
_previous = inputEvent; |
return sink._sendData(inputEvent); |
@@ -465,35 +465,26 @@ class _DistinctStream<T> extends _ForwardingStream<T, T> { |
// Stream transformations and event transformations. |
-typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); |
-typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); |
-typedef void _TransformDoneHandler<T>(StreamSink<T> sink); |
+typedef void _TransformDataHandler<S, T>(S data, EventSink<T> sink); |
+typedef void _TransformErrorHandler<T>(AsyncError data, EventSink<T> sink); |
+typedef void _TransformDoneHandler<T>(EventSink<T> sink); |
/** Default data handler forwards all data. */ |
-void _defaultHandleData(var data, StreamSink sink) { |
+void _defaultHandleData(var data, EventSink sink) { |
sink.add(data); |
} |
/** Default error handler forwards all errors. */ |
-void _defaultHandleError(AsyncError error, StreamSink sink) { |
- sink.signalError(error); |
+void _defaultHandleError(AsyncError error, EventSink sink) { |
+ sink.addError(error); |
} |
/** Default done handler forwards done. */ |
-void _defaultHandleDone(StreamSink sink) { |
+void _defaultHandleDone(EventSink sink) { |
sink.close(); |
} |
-/** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ |
-class _StreamImplSink<T> implements StreamSink<T> { |
- _StreamImpl<T> _target; |
- _StreamImplSink(this._target); |
- void add(T data) { _target._add(data); } |
- void signalError(AsyncError error) { _target._signalError(error); } |
- void close() { _target._close(); } |
-} |
- |
/** |
* A [StreamTransformer] that modifies stream events. |
* |
@@ -511,9 +502,9 @@ class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { |
final _TransformErrorHandler<T> _handleError; |
final _TransformDoneHandler<T> _handleDone; |
- _StreamTransformerImpl(void handleData(S data, StreamSink<T> sink), |
- void handleError(AsyncError data, StreamSink<T> sink), |
- void handleDone(StreamSink<T> sink)) |
+ _StreamTransformerImpl(void handleData(S data, EventSink<T> sink), |
+ void handleError(AsyncError data, EventSink<T> sink), |
+ void handleDone(EventSink<T> sink)) |
: this._handleData = (handleData == null ? _defaultHandleData |
: handleData), |
this._handleError = (handleError == null ? _defaultHandleError |
@@ -521,15 +512,15 @@ class _StreamTransformerImpl<S, T> extends StreamEventTransformer<S, T> { |
this._handleDone = (handleDone == null ? _defaultHandleDone |
: handleDone); |
- void handleData(S data, StreamSink<T> sink) { |
+ void handleData(S data, EventSink<T> sink) { |
_handleData(data, sink); |
} |
- void handleError(AsyncError error, StreamSink<T> sink) { |
+ void handleError(AsyncError error, EventSink<T> sink) { |
_handleError(error, sink); |
} |
- void handleDone(StreamSink<T> sink) { |
+ void handleDone(EventSink<T> sink) { |
_handleDone(sink); |
} |
} |