Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 659360ffd2c094de2fc9bee69614512955e72abb..ab31d16a5dbf6837bf9d648fa6500a7a0ef1bfde 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -147,6 +147,49 @@ abstract class Stream<T> { |
} |
/** |
+ * Creates a stream where all events of an existing stream are piped through |
+ * a sink-transformation. |
+ * |
+ * The given [mapSink] closure is invoked when the returned stream is |
+ * listened to. All events from the [source] are added into the event sink |
+ * that is returned from the invocation. The transformation puts all |
+ * transformed events into the sink the [mapSink] closure received during |
+ * its invocation. Conceptually the [mapSink] creates a transformation pipe |
+ * with the input sink being the returned [EventSink] and the output sink |
+ * being the sink it received. |
+ * |
+ * This constructor is frequently used to build transformers. |
+ * |
+ * Example use for a duplicating transformer: |
+ * |
+ * class DuplicationSink implements EventSink<String> { |
+ * final EventSink<String> _outputSink; |
+ * DuplicationSink(this._outputSink); |
+ * |
+ * void add(String data) { |
+ * _outputSink.add(data); |
+ * _outputSink.add(data); |
+ * } |
+ * |
+ * void addError(e, [st]) => _outputSink(e, st); |
+ * void close() => _outputSink.close(); |
+ * } |
+ * |
+ * class DuplicationTransformer implements StreamTransformer<String, String> { |
+ * // Some generic types ommitted for brevety. |
+ * Stream bind(Stream stream) => new Stream<String>.eventTransform( |
+ * stream, |
+ * (EventSink sink) => new DuplicationSink(sink)); |
+ * } |
+ * |
+ * stringStream.transform(new DuplicationTransformer()); |
+ */ |
+ factory Stream.eventTransformed(Stream source, |
+ EventSink mapSink(EventSink<T> sink)) { |
+ return new _BoundSinkStream(source, mapSink); |
+ } |
+ |
+ /** |
* Reports whether this stream is a broadcast stream. |
*/ |
bool get isBroadcast => false; |
@@ -956,7 +999,7 @@ abstract class EventSink<T> { |
/** Create a data event */ |
void add(T event); |
/** Create an async error. */ |
- void addError(errorEvent); |
+ void addError(errorEvent, [StackTrace stackTrace]); |
/** Request a stream to close. */ |
void close(); |
} |
@@ -1037,222 +1080,98 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { |
* |
* The [Stream.transform] call will pass itself to this object and then return |
* the resulting stream. |
+ * |
+ * It is good practice to write transformers that can be used multiple times. |
*/ |
abstract class StreamTransformer<S, T> { |
+ |
/** |
- * Create a [StreamTransformer] that delegates events to the given functions. |
+ * Creates a [StreamTransformer]. |
+ * |
+ * The returned instance takes responsibility of implementing ([bind]). |
+ * When the user invokes `bind` it returns a new "bound" stream. Only when |
+ * the user starts listening to the bound stream, the `listen` method |
+ * invokes the given closure [transformer]. |
+ * |
+ * The [transformer] closure receives the stream, that was bound, as argument |
+ * and returns a [StreamSubscription]. In almost all cases the closure |
+ * listens itself to the stream that is given as argument. |
+ * |
+ * The result of invoking the [transformer] closure is a [StreamSubscription]. |
+ * The bound stream-transformer (created by the `bind` method above) then sets |
+ * the handlers it received as part of the `listen` call. |
+ * |
+ * Conceptually this can be summarized as follows: |
+ * |
+ * 1. `var transformer = new StreamTransformer(transformerClosure);` |
+ * creates a `StreamTransformer` that supports the `bind` method. |
+ * 2. `var boundStream = stream.transform(transformer);` binds the `stream` |
+ * and returns a bound stream that has a pointer to `stream`. |
+ * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)` |
+ * starts the listening and transformation. This is accomplished |
+ * in 2 steps: first the `boundStream` invokes the `transformerClosure` with |
+ * the `stream` it captured: `transformerClosure(stream, b)`. |
+ * The result `subscription`, a [StreamSubscription], is then |
+ * updated to receive its handlers: `subscription.onData(f1)`, |
+ * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription |
+ * is returned as result of the `listen` call. |
+ * |
+ * There are two common ways to create a StreamSubscription: |
+ * |
+ * 1. by creating a new class that implements [StreamSubscription]. |
+ * Note that the subscription should run callbacks in the [Zone] the |
+ * stream was listened to. |
+ * 2. by allocating a [StreamController] and to return the result of |
+ * listening to its stream. |
+ * |
+ * Example use of a duplicating transformer: |
* |
- * This is actually a [StreamEventTransformer] where the event handling is |
- * performed by the function arguments. |
- * If an argument is omitted, it acts as the corresponding default method from |
- * [StreamEventTransformer]. |
+ * stringStream.transform(new StreamTransformer<String, String>( |
+ * (Stream<String> input, bool cancelOnError) { |
+ * StreamController<String> controller; |
+ * StreamSubscription<String> subscription; |
+ * controller = new StreamController<String>( |
+ * onListen: () { |
+ * subscription = input.listen((data) { |
+ * // Duplicate the data. |
+ * controller.add(data); |
+ * controller.add(data); |
+ * }, |
+ * onError: controller.addError, |
+ * onDone: controller.close, |
+ * cancelOnError: cancelOnError); |
+ * }, |
+ * onPause: subscription.pause, |
+ * onResume: subscription.resume, |
+ * onCancel: subscription.cancel, |
+ * sync: true); |
+ * return controller.stream.listen(null); |
+ * }); |
+ */ |
+ const factory StreamTransformer( |
+ StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) |
+ = _StreamSubscriptionTransformer; |
+ |
+ /** |
+ * Creates a [StreamTransformer] that delegates events to the given functions. |
* |
- * Example use: |
+ * Example use of a duplicating transformer: |
* |
- * stringStream.transform(new StreamTransformer<String, String>( |
+ * stringStream.transform(new StreamTransformer<String, String>.fromHandlers( |
* handleData: (String value, EventSink<String> sink) { |
* sink.add(value); |
* sink.add(value); // Duplicate the incoming events. |
* })); |
- * |
*/ |
- factory StreamTransformer({ |
+ factory StreamTransformer.fromHandlers({ |
void handleData(S data, EventSink<T> sink), |
- Function handleError, |
- void handleDone(EventSink<T> sink)}) { |
- return new _StreamTransformerImpl<S, T>(handleData, |
- handleError, |
- handleDone); |
- } |
+ void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
+ void handleDone(EventSink<T> sink)}) |
+ = _StreamHandlerTransformer; |
Stream<T> bind(Stream<S> stream); |
} |
- |
-/** |
- * Base class for transformers that modifies stream events. |
- * |
- * A [StreamEventTransformer] transforms incoming Stream |
- * events of one kind into outgoing events of (possibly) another kind. |
- * |
- * Subscribing on the stream returned by [bind] is the same as subscribing on |
- * the source stream, except that events are passed through the [transformer] |
- * before being emitted. The transformer may generate any number and |
- * types of events for each incoming event. Pauses on the returned |
- * subscription are forwarded to this stream. |
- * |
- * An example that duplicates all data events: |
- * |
- * class DoubleTransformer<T> extends StreamEventTransformer<T, T> { |
- * void handleData(T data, EventSink<T> sink) { |
- * sink.add(value); |
- * sink.add(value); |
- * } |
- * } |
- * someTypeStream.transform(new DoubleTransformer<Type>()); |
- * |
- * The default implementations of the "handle" methods forward |
- * the events unmodified. If using the default [handleData] the generic type [T] |
- * needs to be assignable to [S]. |
- */ |
-abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> { |
- const StreamEventTransformer(); |
- |
- Stream<T> bind(Stream<S> source) { |
- return new EventTransformStream<S, T>(source, this); |
- } |
- |
- /** |
- * Act on incoming data event. |
- * |
- * The method may generate any number of events on the sink, but should |
- * not throw. |
- */ |
- void handleData(S event, EventSink<T> sink) { |
- var data = event; |
- sink.add(data); |
- } |
- |
- /** |
- * Act on incoming error event. |
- * |
- * The method may generate any number of events on the sink, but should |
- * not throw. |
- */ |
- void handleError(error, EventSink<T> sink) { |
- sink.addError(error); |
- } |
- |
- /** |
- * Act on incoming done event. |
- * |
- * The method may generate any number of events on the sink, but should |
- * not throw. |
- */ |
- void handleDone(EventSink<T> sink){ |
- sink.close(); |
- } |
-} |
- |
- |
-/** |
- * Stream that transforms another stream by intercepting and replacing events. |
- * |
- * This [Stream] is a transformation of a source stream. Listening on this |
- * stream is the same as listening on the source stream, except that events |
- * are intercepted and modified by a [StreamEventTransformer] before becoming |
- * events on this stream. |
- */ |
-class EventTransformStream<S, T> extends Stream<T> { |
- final Stream<S> _source; |
- final StreamEventTransformer _transformer; |
- EventTransformStream(Stream<S> source, |
- StreamEventTransformer<S, T> transformer) |
- : _source = source, _transformer = transformer; |
- |
- StreamSubscription<T> listen(void onData(T data), |
- { Function onError, |
- void onDone(), |
- bool cancelOnError }) { |
- if (onData == null) onData = _nullDataHandler; |
- if (onError == null) onError = _nullErrorHandler; |
- if (onDone == null) onDone = _nullDoneHandler; |
- cancelOnError = identical(true, cancelOnError); |
- return new _EventTransformStreamSubscription(_source, _transformer, |
- onData, onError, onDone, |
- cancelOnError); |
- } |
-} |
- |
-class _EventTransformStreamSubscription<S, T> |
- extends _BufferingStreamSubscription<T> { |
- /** The transformer used to transform events. */ |
- final StreamEventTransformer<S, T> _transformer; |
- |
- /** Whether this stream has sent a done event. */ |
- bool _isClosed = false; |
- |
- /** Source of incoming events. */ |
- StreamSubscription<S> _subscription; |
- |
- /** Cached EventSink wrapper for this class. */ |
- EventSink<T> _sink; |
- |
- _EventTransformStreamSubscription(Stream<S> source, |
- this._transformer, |
- void onData(T data), |
- Function onError, |
- void onDone(), |
- bool cancelOnError) |
- : super(onData, onError, onDone, cancelOnError) { |
- _sink = new _EventSinkAdapter<T>(this); |
- _subscription = source.listen(_handleData, |
- onError: _handleError, |
- onDone: _handleDone); |
- } |
- |
- /** Whether this subscription is still subscribed to its source. */ |
- bool get _isSubscribed => _subscription != null; |
- |
- void _onPause() { |
- if (_isSubscribed) _subscription.pause(); |
- } |
- |
- void _onResume() { |
- if (_isSubscribed) _subscription.resume(); |
- } |
- |
- void _onCancel() { |
- if (_isSubscribed) { |
- StreamSubscription subscription = _subscription; |
- _subscription = null; |
- subscription.cancel(); |
- } |
- _isClosed = true; |
- } |
- |
- void _handleData(S data) { |
- try { |
- _transformer.handleData(data, _sink); |
- } catch (e, s) { |
- _addError(_asyncError(e, s), s); |
- } |
- } |
- |
- void _handleError(error, [stackTrace]) { |
- try { |
- _transformer.handleError(error, _sink); |
- } catch (e, s) { |
- if (identical(e, error)) { |
- _addError(error, stackTrace); |
- } else { |
- _addError(_asyncError(e, s), s); |
- } |
- } |
- } |
- |
- void _handleDone() { |
- try { |
- _subscription = null; |
- _transformer.handleDone(_sink); |
- } catch (e, s) { |
- _addError(_asyncError(e, s), s); |
- } |
- } |
-} |
- |
-class _EventSinkAdapter<T> implements EventSink<T> { |
- _EventSink _sink; |
- _EventSinkAdapter(this._sink); |
- |
- void add(T data) { _sink._add(data); } |
- void addError(error, [StackTrace stackTrace]) { |
- _sink._addError(error, stackTrace); |
- } |
- void close() { _sink._close(); } |
-} |
- |
- |
/** |
* An [Iterable] like interface for the values of a [Stream]. |
* |