Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index 2de8566cdfb75378540efc1138c8f8ac4ce56e7e..7506f022d32a52d6d919ea9f391940ec5380e6de 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -147,6 +147,49 @@ abstract class Stream<T> { |
| } |
| /** |
| + * Creates a stream where all events of an existing stream are piped through |
| + * a sink-transformation. |
| + * |
| + * The given [mapSink] closure is invoked when the returned stream is |
| + * listened to. All events from the [source] are added into the event sink |
| + * that is returned from the invocation. The transformation puts all |
| + * transformed events into the sink the [mapSink] closure received during |
| + * its invocation. Conceptually the [mapSink] creates a transformation pipe |
| + * with the input sink being the returned [EventSink] and the output sink |
| + * being the sink it received. |
| + * |
| + * This constructor is frequently used to build transformers. |
| + * |
| + * Example use for a duplicating transformer: |
| + * |
| + * class DuplicationSink implements EventSink<String> { |
| + * final EventSink<String> _outputSink; |
| + * DuplicationSink(this._outputSink); |
| + * |
| + * void add(String data) { |
| + * _outputSink.add(data); |
| + * _outputSink.add(data); |
| + * } |
| + * |
| + * void addError(e, [st]) => _outputSink(e, st); |
| + * void close() => _outputSink.close(); |
| + * } |
| + * |
| + * class DuplicationTransformer implements StreamTransformer<String, String> { |
|
Lasse Reichstein Nielsen
2013/10/04 14:35:53
Long line.
floitsch
2013/10/05 18:47:40
I know, but since the actual code is less than 80
|
| + * // Some generic types ommitted for brevety. |
| + * Stream bind(Stream stream) => new Stream<String>.eventTransform( |
| + * stream, |
| + * (EventSink sink) => new DuplicationSink(sink)); |
| + * } |
| + * |
| + * stringStream.transform(new DuplicationTransformer()); |
| + */ |
| + factory Stream.eventTransformed(Stream source, |
| + EventSink mapSink(EventSink<T> sink)) { |
| + return new _BoundSinkStream(source, mapSink); |
| + } |
| + |
| + /** |
| * Reports whether this stream is a broadcast stream. |
| */ |
| bool get isBroadcast => false; |
| @@ -951,7 +994,7 @@ abstract class EventSink<T> { |
| /** Create a data event */ |
| void add(T event); |
| /** Create an async error. */ |
| - void addError(errorEvent); |
| + void addError(errorEvent, [StackTrace stackTrace]); |
| /** Request a stream to close. */ |
| void close(); |
| } |
| @@ -1032,222 +1075,79 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { |
| * |
| * The [Stream.transform] call will pass itself to this object and then return |
| * the resulting stream. |
| + * |
| + * It is good practice to create transformers that can be used multiple times. |
| */ |
| abstract class StreamTransformer<S, T> { |
| + |
| /** |
| - * Create a [StreamTransformer] that delegates events to the given functions. |
| + * Creates a [StreamTransformer]. |
| * |
| - * This is actually a [StreamEventTransformer] where the event handling is |
| - * performed by the function arguments. |
| - * If an argument is omitted, it acts as the corresponding default method from |
| - * [StreamEventTransformer]. |
| + * The returned instance takes responsibility of binding ([bind]) and only |
| + * invokes the given closure [transformer] when a user starts listening on |
| + * the bound stream. At that point the transformer should start listening |
| + * to the bound stream (which is given as argument) and return a |
| + * [StreamSubscription]. The bound stream-transformer then sets the handlers |
| + * it received as part of the `listen` call. |
| * |
| - * Example use: |
| + * There are two common ways to create a StreamSubscription: |
| * |
| - * stringStream.transform(new StreamTransformer<String, String>( |
| - * handleData: (String value, EventSink<String> sink) { |
| - * sink.add(value); |
| - * sink.add(value); // Duplicate the incoming events. |
| - * })); |
| + * 1. by creating a new class that implements [StreamSubscription]. |
| + * Note that the subscription should run callbacks in the [Zone] the |
| + * stream was listened to. |
| + * 2. by allocating a [StreamController] and to return the result of |
| + * listening to its stream. |
| * |
| - */ |
| - factory StreamTransformer({ |
| - void handleData(S data, EventSink<T> sink), |
| - Function handleError, |
| - void handleDone(EventSink<T> sink)}) { |
| - return new _StreamTransformerImpl<S, T>(handleData, |
| - handleError, |
| - handleDone); |
| - } |
| - |
| - Stream<T> bind(Stream<S> stream); |
| -} |
| - |
| - |
| -/** |
| - * Base class for transformers that modifies stream events. |
| - * |
| - * A [StreamEventTransformer] transforms incoming Stream |
| - * events of one kind into outgoing events of (possibly) another kind. |
| - * |
| - * Subscribing on the stream returned by [bind] is the same as subscribing on |
| - * the source stream, except that events are passed through the [transformer] |
| - * before being emitted. The transformer may generate any number and |
| - * types of events for each incoming event. Pauses on the returned |
| - * subscription are forwarded to this stream. |
| - * |
| - * An example that duplicates all data events: |
| - * |
| - * class DoubleTransformer<T> extends StreamEventTransformer<T, T> { |
| - * void handleData(T data, EventSink<T> sink) { |
| - * sink.add(value); |
| - * sink.add(value); |
| - * } |
| - * } |
| - * someTypeStream.transform(new DoubleTransformer<Type>()); |
| - * |
| - * The default implementations of the "handle" methods forward |
| - * the events unmodified. If using the default [handleData] the generic type [T] |
| - * needs to be assignable to [S]. |
| - */ |
| -abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
| - const StreamEventTransformer(); |
| - |
| - Stream<T> bind(Stream<S> source) { |
| - return new EventTransformStream<S, T>(source, this); |
| - } |
| - |
| - /** |
| - * Act on incoming data event. |
| + * Example use of a duplicating transformer: |
| * |
| - * The method may generate any number of events on the sink, but should |
| - * not throw. |
| + * stringStream.transform(new StreamTransformer<String, String>( |
| + * (Stream<String> input, bool cancelOnError) { |
| + * StreamController<String> controller; |
| + * StreamSubscription<String> subscription; |
| + * controller = new StreamController<String>( |
| + * onListen: () { |
| + * subscription = input.listen((data) { |
| + * // Duplicate the data. |
| + * controller.add(data); |
| + * controller.add(data); |
| + * }, |
| + * onError: controller.addError, |
| + * onDone: controller.close, |
| + * cancelOnError: cancelOnError); |
| + * }, |
| + * onPause: subscription.pause, |
| + * onResume: subscription.resume, |
| + * onCancel: subscription.cancel, |
| + * sync: true); // One of the few places, where sync is correct. |
| + * return controller.stream.listen(null); |
| + * }); |
| */ |
| - void handleData(S event, EventSink<T> sink) { |
| - var data = event; |
| - sink.add(data); |
| - } |
| + const factory StreamTransformer( |
| + StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) |
| + = _StreamSubscriptionTransformer; |
| /** |
| - * Act on incoming error event. |
| + * Creates a [StreamTransformer] that delegates events to the given functions. |
| * |
| - * The method may generate any number of events on the sink, but should |
| - * not throw. |
| - */ |
| - void handleError(error, EventSink<T> sink) { |
| - sink.addError(error); |
| - } |
| - |
| - /** |
| - * Act on incoming done event. |
| + * This constructor returns a transformer that can only be used once. |
| + * |
| + * Example use of a duplicating transformer: |
| * |
| - * The method may generate any number of events on the sink, but should |
| - * not throw. |
| + * stringStream.transform(new StreamTransformer<String, String>.fromHandlers( |
| + * handleData: (String value, EventSink<String> sink) { |
| + * sink.add(value); |
| + * sink.add(value); // Duplicate the incoming events. |
| + * })); |
| */ |
| - void handleDone(EventSink<T> sink){ |
| - sink.close(); |
| - } |
| -} |
| - |
| - |
| -/** |
| - * Stream that transforms another stream by intercepting and replacing events. |
| - * |
| - * This [Stream] is a transformation of a source stream. Listening on this |
| - * stream is the same as listening on the source stream, except that events |
| - * are intercepted and modified by a [StreamEventTransformer] before becoming |
| - * events on this stream. |
| - */ |
| -class EventTransformStream<S, T> extends Stream<T> { |
| - final Stream<S> _source; |
| - final StreamEventTransformer _transformer; |
| - EventTransformStream(Stream<S> source, |
| - StreamEventTransformer<S, T> transformer) |
| - : _source = source, _transformer = transformer; |
| - |
| - StreamSubscription<T> listen(void onData(T data), |
| - { Function onError, |
| - void onDone(), |
| - bool cancelOnError }) { |
| - if (onData == null) onData = _nullDataHandler; |
| - if (onError == null) onError = _nullErrorHandler; |
| - if (onDone == null) onDone = _nullDoneHandler; |
| - cancelOnError = identical(true, cancelOnError); |
| - return new _EventTransformStreamSubscription(_source, _transformer, |
| - onData, onError, onDone, |
| - cancelOnError); |
| - } |
| -} |
| - |
| -class _EventTransformStreamSubscription<S, T> |
| - extends _BufferingStreamSubscription<T> { |
| - /** The transformer used to transform events. */ |
| - final StreamEventTransformer<S, T> _transformer; |
| - |
| - /** Whether this stream has sent a done event. */ |
| - bool _isClosed = false; |
| - |
| - /** Source of incoming events. */ |
| - StreamSubscription<S> _subscription; |
| - |
| - /** Cached EventSink wrapper for this class. */ |
| - EventSink<T> _sink; |
| - |
| - _EventTransformStreamSubscription(Stream<S> source, |
| - this._transformer, |
| - void onData(T data), |
| - Function onError, |
| - void onDone(), |
| - bool cancelOnError) |
| - : super(onData, onError, onDone, cancelOnError) { |
| - _sink = new _EventSinkAdapter<T>(this); |
| - _subscription = source.listen(_handleData, |
| - onError: _handleError, |
| - onDone: _handleDone); |
| - } |
| - |
| - /** Whether this subscription is still subscribed to its source. */ |
| - bool get _isSubscribed => _subscription != null; |
| - |
| - void _onPause() { |
| - if (_isSubscribed) _subscription.pause(); |
| - } |
| - |
| - void _onResume() { |
| - if (_isSubscribed) _subscription.resume(); |
| - } |
| - |
| - void _onCancel() { |
| - if (_isSubscribed) { |
| - StreamSubscription subscription = _subscription; |
| - _subscription = null; |
| - subscription.cancel(); |
| - } |
| - _isClosed = true; |
| - } |
| - |
| - void _handleData(S data) { |
| - try { |
| - _transformer.handleData(data, _sink); |
| - } catch (e, s) { |
| - _addError(_asyncError(e, s), s); |
| - } |
| - } |
| - |
| - void _handleError(error, [stackTrace]) { |
| - try { |
| - _transformer.handleError(error, _sink); |
| - } catch (e, s) { |
| - if (identical(e, error)) { |
| - _addError(error, stackTrace); |
| - } else { |
| - _addError(_asyncError(e, s), s); |
| - } |
| - } |
| - } |
| - |
| - void _handleDone() { |
| - try { |
| - _subscription = null; |
| - _transformer.handleDone(_sink); |
| - } catch (e, s) { |
| - _addError(_asyncError(e, s), s); |
| - } |
| - } |
| -} |
| - |
| -class _EventSinkAdapter<T> implements EventSink<T> { |
| - _EventSink _sink; |
| - _EventSinkAdapter(this._sink); |
| + factory StreamTransformer.fromHandlers({ |
| + void handleData(S data, EventSink<T> sink), |
| + void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
| + void handleDone(EventSink<T> sink)}) |
| + = _StreamHandlerTransformer; |
| - void add(T data) { _sink._add(data); } |
| - void addError(error, [StackTrace stackTrace]) { |
| - _sink._addError(error, stackTrace); |
| - } |
| - void close() { _sink._close(); } |
| + Stream<T> bind(Stream<S> stream); |
| } |
| - |
| /** |
| * An [Iterable] like interface for the values of a [Stream]. |
| * |