| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| index 185ce2a97c983403a01337265b834ec73a01699d..ac0ec79ac739adfb2995dc5cf5651c5087c43ef4 100644
|
| --- a/sdk/lib/async/stream.dart
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -68,7 +68,7 @@ abstract class Stream<T> {
|
| stream._close();
|
| },
|
| onError: (error) {
|
| - stream._signalError(error);
|
| + stream._addError(error);
|
| stream._close();
|
| });
|
| return stream;
|
| @@ -214,13 +214,13 @@ abstract class Stream<T> {
|
| }
|
|
|
| // Deprecated method, previously called 'pipe', retained for compatibility.
|
| - Future pipeInto(StreamSink<T> sink,
|
| + Future pipeInto(EventSink<T> sink,
|
| {void onError(AsyncError error),
|
| bool unsubscribeOnError}) {
|
| _FutureImpl<T> result = new _FutureImpl<T>();
|
| this.listen(
|
| sink.add,
|
| - onError: sink.signalError,
|
| + onError: sink.addError,
|
| onDone: () {
|
| sink.close();
|
| result._setValue(null);
|
| @@ -844,15 +844,28 @@ abstract class StreamSubscription<T> {
|
|
|
|
|
| /**
|
| - * An interface that abstracts sending events into a [Stream].
|
| + * *Deprecated*. Use [EventSink] instead.
|
| */
|
| -abstract class StreamSink<T> {
|
| +abstract class StreamSink<T> extends EventSink<T>{
|
| + /* TODO(8997): Remove class.*/
|
| + /** *Deprecated*. Use [EventSink.addError] instead.*/
|
| + void signalError(AsyncError errorEvent) { addError(errorEvent); }
|
| +}
|
| +
|
| +
|
| +/**
|
| + * An interface that abstracts creation or handling of [Stream] events.
|
| + */
|
| +abstract class EventSink<T> {
|
| + /** Create a data event */
|
| void add(T event);
|
| - /** Signal an async error to the receivers of this sink's values. */
|
| - void signalError(AsyncError errorEvent);
|
| + /** Create an async error. */
|
| + void addError(AsyncError errorEvent);
|
| + /** Request a stream to close. */
|
| void close();
|
| }
|
|
|
| +
|
| /** [Stream] wrapper that only exposes the [Stream] interface. */
|
| class StreamView<T> extends Stream<T> {
|
| Stream<T> _stream;
|
| @@ -873,15 +886,16 @@ class StreamView<T> extends Stream<T> {
|
| }
|
|
|
| /**
|
| - * [StreamSink] wrapper that only exposes the [StreamSink] interface.
|
| + * [EventSink] wrapper that only exposes the [EventSink] interface.
|
| */
|
| -class StreamSinkView<T> implements StreamSink<T> {
|
| - final StreamSink<T> _sink;
|
| +class EventSinkView<T> extends StreamSink<T> {
|
| + // TODO(8997): Implment EventSink instead.
|
| + final EventSink<T> _sink;
|
|
|
| - StreamSinkView(this._sink);
|
| + EventSinkView(this._sink);
|
|
|
| void add(T value) { _sink.add(value); }
|
| - void signalError(AsyncError error) { _sink.signalError(error); }
|
| + void addError(AsyncError error) { _sink.addError(error); }
|
| void close() { _sink.close(); }
|
| }
|
|
|
| @@ -916,16 +930,16 @@ abstract class StreamTransformer<S, T> {
|
| * Example use:
|
| *
|
| * stringStream.transform(new StreamTransformer<String, String>(
|
| - * handleData: (Strung value, StreamSink<String> sink) {
|
| + * handleData: (Strung value, EventSink<String> sink) {
|
| * sink.add(value);
|
| * sink.add(value); // Duplicate the incoming events.
|
| * }));
|
| *
|
| */
|
| factory StreamTransformer({
|
| - void handleData(S data, StreamSink<T> sink),
|
| - void handleError(AsyncError error, StreamSink<T> sink),
|
| - void handleDone(StreamSink<T> sink)}) {
|
| + void handleData(S data, EventSink<T> sink),
|
| + void handleError(AsyncError error, EventSink<T> sink),
|
| + void handleDone(EventSink<T> sink)}) {
|
| return new _StreamTransformerImpl<S, T>(handleData,
|
| handleError,
|
| handleDone);
|
| @@ -950,7 +964,7 @@ abstract class StreamTransformer<S, T> {
|
| * An example that duplicates all data events:
|
| *
|
| * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> {
|
| - * void handleData(T data, StreamSink<T> sink) {
|
| + * void handleData(T data, EventSink<T> sink) {
|
| * sink.add(value);
|
| * sink.add(value);
|
| * }
|
| @@ -974,7 +988,7 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
|
| * The method may generate any number of events on the sink, but should
|
| * not throw.
|
| */
|
| - void handleData(S event, StreamSink<T> sink) {
|
| + void handleData(S event, EventSink<T> sink) {
|
| var data = event;
|
| sink.add(data);
|
| }
|
| @@ -985,8 +999,8 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
|
| * The method may generate any number of events on the sink, but should
|
| * not throw.
|
| */
|
| - void handleError(AsyncError error, StreamSink<T> sink) {
|
| - sink.signalError(error);
|
| + void handleError(AsyncError error, EventSink<T> sink) {
|
| + sink.addError(error);
|
| }
|
|
|
| /**
|
| @@ -995,7 +1009,7 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
|
| * The method may generate any number of events on the sink, but should
|
| * not throw.
|
| */
|
| - void handleDone(StreamSink<T> sink){
|
| + void handleDone(EventSink<T> sink){
|
| sink.close();
|
| }
|
| }
|
| @@ -1029,15 +1043,15 @@ class EventTransformStream<S, T> extends Stream<T> {
|
|
|
| class _EventTransformStreamSubscription<S, T>
|
| extends _BaseStreamSubscription<T>
|
| - implements _StreamOutputSink<T> {
|
| + implements _EventOutputSink<T> {
|
| /** The transformer used to transform events. */
|
| final StreamEventTransformer<S, T> _transformer;
|
| /** Whether to unsubscribe when emitting an error. */
|
| final bool _unsubscribeOnError;
|
| /** Source of incoming events. */
|
| StreamSubscription<S> _subscription;
|
| - /** Cached StreamSink wrapper for this class. */
|
| - StreamSink<T> _sink;
|
| + /** Cached EventSink wrapper for this class. */
|
| + EventSink<T> _sink;
|
|
|
| _EventTransformStreamSubscription(Stream<S> source,
|
| this._transformer,
|
| @@ -1046,7 +1060,7 @@ class _EventTransformStreamSubscription<S, T>
|
| void onDone(),
|
| this._unsubscribeOnError)
|
| : super(onData, onError, onDone) {
|
| - _sink = new _StreamOutputSinkWrapper<T>(this);
|
| + _sink = new _EventOutputSinkWrapper<T>(this);
|
| _subscription = source.listen(_handleData,
|
| onError: _handleError,
|
| onDone: _handleDone);
|
| @@ -1092,7 +1106,7 @@ class _EventTransformStreamSubscription<S, T>
|
| }
|
| }
|
|
|
| - // StreamOutputSink interface.
|
| + // EventOutputSink interface.
|
| void _sendData(T data) {
|
| _onData(data);
|
| }
|
| @@ -1111,11 +1125,12 @@ class _EventTransformStreamSubscription<S, T>
|
| }
|
| }
|
|
|
| -class _StreamOutputSinkWrapper<T> implements StreamSink<T> {
|
| - _StreamOutputSink _sink;
|
| - _StreamOutputSinkWrapper(this._sink);
|
| +/* TODO(8997): Implement EventSink instead, */
|
| +class _EventOutputSinkWrapper<T> extends StreamSink<T> {
|
| + _EventOutputSink _sink;
|
| + _EventOutputSinkWrapper(this._sink);
|
|
|
| - void add(T data) => _sink._sendData(data);
|
| - void signalError(AsyncError error) => _sink._sendError(error);
|
| - void close() => _sink._sendDone();
|
| + void add(T data) { _sink._sendData(data); }
|
| + void addError(AsyncError error) { _sink._sendError(error); }
|
| + void close() { _sink._sendDone(); }
|
| }
|
|
|