Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 185ce2a97c983403a01337265b834ec73a01699d..ac0ec79ac739adfb2995dc5cf5651c5087c43ef4 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -68,7 +68,7 @@ abstract class Stream<T> { |
stream._close(); |
}, |
onError: (error) { |
- stream._signalError(error); |
+ stream._addError(error); |
stream._close(); |
}); |
return stream; |
@@ -214,13 +214,13 @@ abstract class Stream<T> { |
} |
// Deprecated method, previously called 'pipe', retained for compatibility. |
- Future pipeInto(StreamSink<T> sink, |
+ Future pipeInto(EventSink<T> sink, |
{void onError(AsyncError error), |
bool unsubscribeOnError}) { |
_FutureImpl<T> result = new _FutureImpl<T>(); |
this.listen( |
sink.add, |
- onError: sink.signalError, |
+ onError: sink.addError, |
onDone: () { |
sink.close(); |
result._setValue(null); |
@@ -844,15 +844,28 @@ abstract class StreamSubscription<T> { |
/** |
- * An interface that abstracts sending events into a [Stream]. |
+ * *Deprecated*. Use [EventSink] instead. |
*/ |
-abstract class StreamSink<T> { |
+abstract class StreamSink<T> extends EventSink<T>{ |
+ /* TODO(8997): Remove class.*/ |
+ /** *Deprecated*. Use [EventSink.addError] instead.*/ |
+ void signalError(AsyncError errorEvent) { addError(errorEvent); } |
+} |
+ |
+ |
+/** |
+ * An interface that abstracts creation or handling of [Stream] events. |
+ */ |
+abstract class EventSink<T> { |
+ /** Create a data event */ |
void add(T event); |
- /** Signal an async error to the receivers of this sink's values. */ |
- void signalError(AsyncError errorEvent); |
+ /** Create an async error. */ |
+ void addError(AsyncError errorEvent); |
+ /** Request a stream to close. */ |
void close(); |
} |
+ |
/** [Stream] wrapper that only exposes the [Stream] interface. */ |
class StreamView<T> extends Stream<T> { |
Stream<T> _stream; |
@@ -873,15 +886,16 @@ class StreamView<T> extends Stream<T> { |
} |
/** |
- * [StreamSink] wrapper that only exposes the [StreamSink] interface. |
+ * [EventSink] wrapper that only exposes the [EventSink] interface. |
*/ |
-class StreamSinkView<T> implements StreamSink<T> { |
- final StreamSink<T> _sink; |
+class EventSinkView<T> extends StreamSink<T> { |
+ // TODO(8997): Implment EventSink instead. |
+ final EventSink<T> _sink; |
- StreamSinkView(this._sink); |
+ EventSinkView(this._sink); |
void add(T value) { _sink.add(value); } |
- void signalError(AsyncError error) { _sink.signalError(error); } |
+ void addError(AsyncError error) { _sink.addError(error); } |
void close() { _sink.close(); } |
} |
@@ -916,16 +930,16 @@ abstract class StreamTransformer<S, T> { |
* Example use: |
* |
* stringStream.transform(new StreamTransformer<String, String>( |
- * handleData: (Strung value, StreamSink<String> sink) { |
+ * handleData: (Strung value, EventSink<String> sink) { |
* sink.add(value); |
* sink.add(value); // Duplicate the incoming events. |
* })); |
* |
*/ |
factory StreamTransformer({ |
- void handleData(S data, StreamSink<T> sink), |
- void handleError(AsyncError error, StreamSink<T> sink), |
- void handleDone(StreamSink<T> sink)}) { |
+ void handleData(S data, EventSink<T> sink), |
+ void handleError(AsyncError error, EventSink<T> sink), |
+ void handleDone(EventSink<T> sink)}) { |
return new _StreamTransformerImpl<S, T>(handleData, |
handleError, |
handleDone); |
@@ -950,7 +964,7 @@ abstract class StreamTransformer<S, T> { |
* An example that duplicates all data events: |
* |
* class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> { |
- * void handleData(T data, StreamSink<T> sink) { |
+ * void handleData(T data, EventSink<T> sink) { |
* sink.add(value); |
* sink.add(value); |
* } |
@@ -974,7 +988,7 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
* The method may generate any number of events on the sink, but should |
* not throw. |
*/ |
- void handleData(S event, StreamSink<T> sink) { |
+ void handleData(S event, EventSink<T> sink) { |
var data = event; |
sink.add(data); |
} |
@@ -985,8 +999,8 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
* The method may generate any number of events on the sink, but should |
* not throw. |
*/ |
- void handleError(AsyncError error, StreamSink<T> sink) { |
- sink.signalError(error); |
+ void handleError(AsyncError error, EventSink<T> sink) { |
+ sink.addError(error); |
} |
/** |
@@ -995,7 +1009,7 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
* The method may generate any number of events on the sink, but should |
* not throw. |
*/ |
- void handleDone(StreamSink<T> sink){ |
+ void handleDone(EventSink<T> sink){ |
sink.close(); |
} |
} |
@@ -1029,15 +1043,15 @@ class EventTransformStream<S, T> extends Stream<T> { |
class _EventTransformStreamSubscription<S, T> |
extends _BaseStreamSubscription<T> |
- implements _StreamOutputSink<T> { |
+ implements _EventOutputSink<T> { |
/** The transformer used to transform events. */ |
final StreamEventTransformer<S, T> _transformer; |
/** Whether to unsubscribe when emitting an error. */ |
final bool _unsubscribeOnError; |
/** Source of incoming events. */ |
StreamSubscription<S> _subscription; |
- /** Cached StreamSink wrapper for this class. */ |
- StreamSink<T> _sink; |
+ /** Cached EventSink wrapper for this class. */ |
+ EventSink<T> _sink; |
_EventTransformStreamSubscription(Stream<S> source, |
this._transformer, |
@@ -1046,7 +1060,7 @@ class _EventTransformStreamSubscription<S, T> |
void onDone(), |
this._unsubscribeOnError) |
: super(onData, onError, onDone) { |
- _sink = new _StreamOutputSinkWrapper<T>(this); |
+ _sink = new _EventOutputSinkWrapper<T>(this); |
_subscription = source.listen(_handleData, |
onError: _handleError, |
onDone: _handleDone); |
@@ -1092,7 +1106,7 @@ class _EventTransformStreamSubscription<S, T> |
} |
} |
- // StreamOutputSink interface. |
+ // EventOutputSink interface. |
void _sendData(T data) { |
_onData(data); |
} |
@@ -1111,11 +1125,12 @@ class _EventTransformStreamSubscription<S, T> |
} |
} |
-class _StreamOutputSinkWrapper<T> implements StreamSink<T> { |
- _StreamOutputSink _sink; |
- _StreamOutputSinkWrapper(this._sink); |
+/* TODO(8997): Implement EventSink instead, */ |
+class _EventOutputSinkWrapper<T> extends StreamSink<T> { |
+ _EventOutputSink _sink; |
+ _EventOutputSinkWrapper(this._sink); |
- void add(T data) => _sink._sendData(data); |
- void signalError(AsyncError error) => _sink._sendError(error); |
- void close() => _sink._sendDone(); |
+ void add(T data) { _sink._sendData(data); } |
+ void addError(AsyncError error) { _sink._sendError(error); } |
+ void close() { _sink._sendDone(); } |
} |