Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index 185ce2a97c983403a01337265b834ec73a01699d..3be17f0ee18cbf8f8b466b861ae7bc3f8b6d9bca 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -68,7 +68,7 @@ abstract class Stream<T> { |
| stream._close(); |
| }, |
| onError: (error) { |
| - stream._signalError(error); |
| + stream._addError(error); |
| stream._close(); |
| }); |
| return stream; |
| @@ -214,13 +214,13 @@ abstract class Stream<T> { |
| } |
| // Deprecated method, previously called 'pipe', retained for compatibility. |
| - Future pipeInto(StreamSink<T> sink, |
| + Future pipeInto(EventSink<T> sink, |
| {void onError(AsyncError error), |
| bool unsubscribeOnError}) { |
| _FutureImpl<T> result = new _FutureImpl<T>(); |
| this.listen( |
| sink.add, |
| - onError: sink.signalError, |
| + onError: sink.addError, |
| onDone: () { |
| sink.close(); |
| result._setValue(null); |
| @@ -846,13 +846,22 @@ abstract class StreamSubscription<T> { |
| /** |
| * An interface that abstracts sending events into a [Stream]. |
| */ |
| -abstract class StreamSink<T> { |
| +abstract class EventSink<T> { |
| void add(T event); |
| /** Signal an async error to the receivers of this sink's values. */ |
| - void signalError(AsyncError errorEvent); |
| + void addError(AsyncError errorEvent); |
| + /** *Deprecated*. Use [addError] instead. */ |
| + void signalError(AsyncError errorEvent) { |
|
floitsch
2013/03/07 14:19:34
Move this method to StreamSink below. Users that u
Lasse Reichstein Nielsen
2013/03/08 10:18:25
That's problematic. This is very much a consumed i
|
| + addError(errorEvent); |
| + } |
| void close(); |
| } |
| +/** |
| + * *Deprecated*. Use [EventSink] instead. |
| + */ |
| +abstract class StreamSink<T> extends EventSink<T> {} |
|
floitsch
2013/03/07 14:19:34
ditto for @deprecated.
Lasse Reichstein Nielsen
2013/03/08 10:18:25
Done.
|
| + |
| /** [Stream] wrapper that only exposes the [Stream] interface. */ |
| class StreamView<T> extends Stream<T> { |
| Stream<T> _stream; |
| @@ -873,15 +882,16 @@ class StreamView<T> extends Stream<T> { |
| } |
| /** |
| - * [StreamSink] wrapper that only exposes the [StreamSink] interface. |
| + * [EventSink] wrapper that only exposes the [EventSink] interface. |
| */ |
| -class StreamSinkView<T> implements StreamSink<T> { |
| - final StreamSink<T> _sink; |
| +class EventSinkView<T> implements EventSink<T> { |
| + final EventSink<T> _sink; |
| - StreamSinkView(this._sink); |
| + EventSinkView(this._sink); |
| void add(T value) { _sink.add(value); } |
| - void signalError(AsyncError error) { _sink.signalError(error); } |
| + void addError(AsyncError error) { _sink.addError(error); } |
| + void signalError(AsyncError error) { addError(error); } |
|
floitsch
2013/03/07 14:19:34
Add TODO?
Alternatively add comment to EventSink.s
Lasse Reichstein Nielsen
2013/03/08 10:18:25
Added TODO(8997) where relevant (and created bug f
|
| void close() { _sink.close(); } |
| } |
| @@ -916,16 +926,16 @@ abstract class StreamTransformer<S, T> { |
| * Example use: |
| * |
| * stringStream.transform(new StreamTransformer<String, String>( |
| - * handleData: (Strung value, StreamSink<String> sink) { |
| + * handleData: (Strung value, EventSink<String> sink) { |
| * sink.add(value); |
| * sink.add(value); // Duplicate the incoming events. |
| * })); |
| * |
| */ |
| factory StreamTransformer({ |
| - void handleData(S data, StreamSink<T> sink), |
| - void handleError(AsyncError error, StreamSink<T> sink), |
| - void handleDone(StreamSink<T> sink)}) { |
| + void handleData(S data, EventSink<T> sink), |
| + void handleError(AsyncError error, EventSink<T> sink), |
| + void handleDone(EventSink<T> sink)}) { |
| return new _StreamTransformerImpl<S, T>(handleData, |
| handleError, |
| handleDone); |
| @@ -950,7 +960,7 @@ abstract class StreamTransformer<S, T> { |
| * An example that duplicates all data events: |
| * |
| * class DoubleTransformer<T> extends StreamEventTransformerBase<T, T> { |
| - * void handleData(T data, StreamSink<T> sink) { |
| + * void handleData(T data, EventSink<T> sink) { |
| * sink.add(value); |
| * sink.add(value); |
| * } |
| @@ -974,7 +984,7 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
| * The method may generate any number of events on the sink, but should |
| * not throw. |
| */ |
| - void handleData(S event, StreamSink<T> sink) { |
| + void handleData(S event, EventSink<T> sink) { |
| var data = event; |
| sink.add(data); |
| } |
| @@ -985,8 +995,8 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
| * The method may generate any number of events on the sink, but should |
| * not throw. |
| */ |
| - void handleError(AsyncError error, StreamSink<T> sink) { |
| - sink.signalError(error); |
| + void handleError(AsyncError error, EventSink<T> sink) { |
| + sink.addError(error); |
| } |
| /** |
| @@ -995,7 +1005,7 @@ abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
| * The method may generate any number of events on the sink, but should |
| * not throw. |
| */ |
| - void handleDone(StreamSink<T> sink){ |
| + void handleDone(EventSink<T> sink){ |
| sink.close(); |
| } |
| } |
| @@ -1029,15 +1039,15 @@ class EventTransformStream<S, T> extends Stream<T> { |
| class _EventTransformStreamSubscription<S, T> |
| extends _BaseStreamSubscription<T> |
| - implements _StreamOutputSink<T> { |
| + implements _EventOutputSink<T> { |
| /** The transformer used to transform events. */ |
| final StreamEventTransformer<S, T> _transformer; |
| /** Whether to unsubscribe when emitting an error. */ |
| final bool _unsubscribeOnError; |
| /** Source of incoming events. */ |
| StreamSubscription<S> _subscription; |
| - /** Cached StreamSink wrapper for this class. */ |
| - StreamSink<T> _sink; |
| + /** Cached EventSink wrapper for this class. */ |
| + EventSink<T> _sink; |
| _EventTransformStreamSubscription(Stream<S> source, |
| this._transformer, |
| @@ -1046,7 +1056,7 @@ class _EventTransformStreamSubscription<S, T> |
| void onDone(), |
| this._unsubscribeOnError) |
| : super(onData, onError, onDone) { |
| - _sink = new _StreamOutputSinkWrapper<T>(this); |
| + _sink = new _EventOutputSinkWrapper<T>(this); |
| _subscription = source.listen(_handleData, |
| onError: _handleError, |
| onDone: _handleDone); |
| @@ -1092,7 +1102,7 @@ class _EventTransformStreamSubscription<S, T> |
| } |
| } |
| - // StreamOutputSink interface. |
| + // EventOutputSink interface. |
| void _sendData(T data) { |
| _onData(data); |
| } |
| @@ -1111,11 +1121,12 @@ class _EventTransformStreamSubscription<S, T> |
| } |
| } |
| -class _StreamOutputSinkWrapper<T> implements StreamSink<T> { |
| - _StreamOutputSink _sink; |
| - _StreamOutputSinkWrapper(this._sink); |
| +class _EventOutputSinkWrapper<T> implements EventSink<T> { |
| + _EventOutputSink _sink; |
| + _EventOutputSinkWrapper(this._sink); |
| - void add(T data) => _sink._sendData(data); |
| - void signalError(AsyncError error) => _sink._sendError(error); |
| - void close() => _sink._sendDone(); |
| + void add(T data) { _sink._sendData(data); } |
| + void addError(AsyncError error) { _sink._sendError(error); } |
| + void signalError(AsyncError error) { addError(error); } |
| + void close() { _sink._sendDone(); } |
| } |