| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| index 659360ffd2c094de2fc9bee69614512955e72abb..ab31d16a5dbf6837bf9d648fa6500a7a0ef1bfde 100644
|
| --- a/sdk/lib/async/stream.dart
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -147,6 +147,49 @@ abstract class Stream<T> {
|
| }
|
|
|
| /**
|
| + * Creates a stream where all events of an existing stream are piped through
|
| + * a sink-transformation.
|
| + *
|
| + * The given [mapSink] closure is invoked when the returned stream is
|
| + * listened to. All events from the [source] are added into the event sink
|
| + * that is returned from the invocation. The transformation puts all
|
| + * transformed events into the sink the [mapSink] closure received during
|
| + * its invocation. Conceptually the [mapSink] creates a transformation pipe
|
| + * with the input sink being the returned [EventSink] and the output sink
|
| + * being the sink it received.
|
| + *
|
| + * This constructor is frequently used to build transformers.
|
| + *
|
| + * Example use for a duplicating transformer:
|
| + *
|
| + * class DuplicationSink implements EventSink<String> {
|
| + * final EventSink<String> _outputSink;
|
| + * DuplicationSink(this._outputSink);
|
| + *
|
| + * void add(String data) {
|
| + * _outputSink.add(data);
|
| + * _outputSink.add(data);
|
| + * }
|
| + *
|
| + * void addError(e, [st]) => _outputSink(e, st);
|
| + * void close() => _outputSink.close();
|
| + * }
|
| + *
|
| + * class DuplicationTransformer implements StreamTransformer<String, String> {
|
| + * // Some generic types ommitted for brevety.
|
| + * Stream bind(Stream stream) => new Stream<String>.eventTransform(
|
| + * stream,
|
| + * (EventSink sink) => new DuplicationSink(sink));
|
| + * }
|
| + *
|
| + * stringStream.transform(new DuplicationTransformer());
|
| + */
|
| + factory Stream.eventTransformed(Stream source,
|
| + EventSink mapSink(EventSink<T> sink)) {
|
| + return new _BoundSinkStream(source, mapSink);
|
| + }
|
| +
|
| + /**
|
| * Reports whether this stream is a broadcast stream.
|
| */
|
| bool get isBroadcast => false;
|
| @@ -956,7 +999,7 @@ abstract class EventSink<T> {
|
| /** Create a data event */
|
| void add(T event);
|
| /** Create an async error. */
|
| - void addError(errorEvent);
|
| + void addError(errorEvent, [StackTrace stackTrace]);
|
| /** Request a stream to close. */
|
| void close();
|
| }
|
| @@ -1037,222 +1080,98 @@ abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> {
|
| *
|
| * The [Stream.transform] call will pass itself to this object and then return
|
| * the resulting stream.
|
| + *
|
| + * It is good practice to write transformers that can be used multiple times.
|
| */
|
| abstract class StreamTransformer<S, T> {
|
| +
|
| /**
|
| - * Create a [StreamTransformer] that delegates events to the given functions.
|
| + * Creates a [StreamTransformer].
|
| + *
|
| + * The returned instance takes responsibility of implementing ([bind]).
|
| + * When the user invokes `bind` it returns a new "bound" stream. Only when
|
| + * the user starts listening to the bound stream, the `listen` method
|
| + * invokes the given closure [transformer].
|
| + *
|
| + * The [transformer] closure receives the stream, that was bound, as argument
|
| + * and returns a [StreamSubscription]. In almost all cases the closure
|
| + * listens itself to the stream that is given as argument.
|
| + *
|
| + * The result of invoking the [transformer] closure is a [StreamSubscription].
|
| + * The bound stream-transformer (created by the `bind` method above) then sets
|
| + * the handlers it received as part of the `listen` call.
|
| + *
|
| + * Conceptually this can be summarized as follows:
|
| + *
|
| + * 1. `var transformer = new StreamTransformer(transformerClosure);`
|
| + * creates a `StreamTransformer` that supports the `bind` method.
|
| + * 2. `var boundStream = stream.transform(transformer);` binds the `stream`
|
| + * and returns a bound stream that has a pointer to `stream`.
|
| + * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)`
|
| + * starts the listening and transformation. This is accomplished
|
| + * in 2 steps: first the `boundStream` invokes the `transformerClosure` with
|
| + * the `stream` it captured: `transformerClosure(stream, b)`.
|
| + * The result `subscription`, a [StreamSubscription], is then
|
| + * updated to receive its handlers: `subscription.onData(f1)`,
|
| + * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription
|
| + * is returned as result of the `listen` call.
|
| + *
|
| + * There are two common ways to create a StreamSubscription:
|
| + *
|
| + * 1. by creating a new class that implements [StreamSubscription].
|
| + * Note that the subscription should run callbacks in the [Zone] the
|
| + * stream was listened to.
|
| + * 2. by allocating a [StreamController] and to return the result of
|
| + * listening to its stream.
|
| + *
|
| + * Example use of a duplicating transformer:
|
| *
|
| - * This is actually a [StreamEventTransformer] where the event handling is
|
| - * performed by the function arguments.
|
| - * If an argument is omitted, it acts as the corresponding default method from
|
| - * [StreamEventTransformer].
|
| + * stringStream.transform(new StreamTransformer<String, String>(
|
| + * (Stream<String> input, bool cancelOnError) {
|
| + * StreamController<String> controller;
|
| + * StreamSubscription<String> subscription;
|
| + * controller = new StreamController<String>(
|
| + * onListen: () {
|
| + * subscription = input.listen((data) {
|
| + * // Duplicate the data.
|
| + * controller.add(data);
|
| + * controller.add(data);
|
| + * },
|
| + * onError: controller.addError,
|
| + * onDone: controller.close,
|
| + * cancelOnError: cancelOnError);
|
| + * },
|
| + * onPause: subscription.pause,
|
| + * onResume: subscription.resume,
|
| + * onCancel: subscription.cancel,
|
| + * sync: true);
|
| + * return controller.stream.listen(null);
|
| + * });
|
| + */
|
| + const factory StreamTransformer(
|
| + StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
|
| + = _StreamSubscriptionTransformer;
|
| +
|
| + /**
|
| + * Creates a [StreamTransformer] that delegates events to the given functions.
|
| *
|
| - * Example use:
|
| + * Example use of a duplicating transformer:
|
| *
|
| - * stringStream.transform(new StreamTransformer<String, String>(
|
| + * stringStream.transform(new StreamTransformer<String, String>.fromHandlers(
|
| * handleData: (String value, EventSink<String> sink) {
|
| * sink.add(value);
|
| * sink.add(value); // Duplicate the incoming events.
|
| * }));
|
| - *
|
| */
|
| - factory StreamTransformer({
|
| + factory StreamTransformer.fromHandlers({
|
| void handleData(S data, EventSink<T> sink),
|
| - Function handleError,
|
| - void handleDone(EventSink<T> sink)}) {
|
| - return new _StreamTransformerImpl<S, T>(handleData,
|
| - handleError,
|
| - handleDone);
|
| - }
|
| + void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
|
| + void handleDone(EventSink<T> sink)})
|
| + = _StreamHandlerTransformer;
|
|
|
| Stream<T> bind(Stream<S> stream);
|
| }
|
|
|
| -
|
| -/**
|
| - * Base class for transformers that modifies stream events.
|
| - *
|
| - * A [StreamEventTransformer] transforms incoming Stream
|
| - * events of one kind into outgoing events of (possibly) another kind.
|
| - *
|
| - * Subscribing on the stream returned by [bind] is the same as subscribing on
|
| - * the source stream, except that events are passed through the [transformer]
|
| - * before being emitted. The transformer may generate any number and
|
| - * types of events for each incoming event. Pauses on the returned
|
| - * subscription are forwarded to this stream.
|
| - *
|
| - * An example that duplicates all data events:
|
| - *
|
| - * class DoubleTransformer<T> extends StreamEventTransformer<T, T> {
|
| - * void handleData(T data, EventSink<T> sink) {
|
| - * sink.add(value);
|
| - * sink.add(value);
|
| - * }
|
| - * }
|
| - * someTypeStream.transform(new DoubleTransformer<Type>());
|
| - *
|
| - * The default implementations of the "handle" methods forward
|
| - * the events unmodified. If using the default [handleData] the generic type [T]
|
| - * needs to be assignable to [S].
|
| - */
|
| -abstract class StreamEventTransformer<S, T> implements StreamTransformer<S, T> {
|
| - const StreamEventTransformer();
|
| -
|
| - Stream<T> bind(Stream<S> source) {
|
| - return new EventTransformStream<S, T>(source, this);
|
| - }
|
| -
|
| - /**
|
| - * Act on incoming data event.
|
| - *
|
| - * The method may generate any number of events on the sink, but should
|
| - * not throw.
|
| - */
|
| - void handleData(S event, EventSink<T> sink) {
|
| - var data = event;
|
| - sink.add(data);
|
| - }
|
| -
|
| - /**
|
| - * Act on incoming error event.
|
| - *
|
| - * The method may generate any number of events on the sink, but should
|
| - * not throw.
|
| - */
|
| - void handleError(error, EventSink<T> sink) {
|
| - sink.addError(error);
|
| - }
|
| -
|
| - /**
|
| - * Act on incoming done event.
|
| - *
|
| - * The method may generate any number of events on the sink, but should
|
| - * not throw.
|
| - */
|
| - void handleDone(EventSink<T> sink){
|
| - sink.close();
|
| - }
|
| -}
|
| -
|
| -
|
| -/**
|
| - * Stream that transforms another stream by intercepting and replacing events.
|
| - *
|
| - * This [Stream] is a transformation of a source stream. Listening on this
|
| - * stream is the same as listening on the source stream, except that events
|
| - * are intercepted and modified by a [StreamEventTransformer] before becoming
|
| - * events on this stream.
|
| - */
|
| -class EventTransformStream<S, T> extends Stream<T> {
|
| - final Stream<S> _source;
|
| - final StreamEventTransformer _transformer;
|
| - EventTransformStream(Stream<S> source,
|
| - StreamEventTransformer<S, T> transformer)
|
| - : _source = source, _transformer = transformer;
|
| -
|
| - StreamSubscription<T> listen(void onData(T data),
|
| - { Function onError,
|
| - void onDone(),
|
| - bool cancelOnError }) {
|
| - if (onData == null) onData = _nullDataHandler;
|
| - if (onError == null) onError = _nullErrorHandler;
|
| - if (onDone == null) onDone = _nullDoneHandler;
|
| - cancelOnError = identical(true, cancelOnError);
|
| - return new _EventTransformStreamSubscription(_source, _transformer,
|
| - onData, onError, onDone,
|
| - cancelOnError);
|
| - }
|
| -}
|
| -
|
| -class _EventTransformStreamSubscription<S, T>
|
| - extends _BufferingStreamSubscription<T> {
|
| - /** The transformer used to transform events. */
|
| - final StreamEventTransformer<S, T> _transformer;
|
| -
|
| - /** Whether this stream has sent a done event. */
|
| - bool _isClosed = false;
|
| -
|
| - /** Source of incoming events. */
|
| - StreamSubscription<S> _subscription;
|
| -
|
| - /** Cached EventSink wrapper for this class. */
|
| - EventSink<T> _sink;
|
| -
|
| - _EventTransformStreamSubscription(Stream<S> source,
|
| - this._transformer,
|
| - void onData(T data),
|
| - Function onError,
|
| - void onDone(),
|
| - bool cancelOnError)
|
| - : super(onData, onError, onDone, cancelOnError) {
|
| - _sink = new _EventSinkAdapter<T>(this);
|
| - _subscription = source.listen(_handleData,
|
| - onError: _handleError,
|
| - onDone: _handleDone);
|
| - }
|
| -
|
| - /** Whether this subscription is still subscribed to its source. */
|
| - bool get _isSubscribed => _subscription != null;
|
| -
|
| - void _onPause() {
|
| - if (_isSubscribed) _subscription.pause();
|
| - }
|
| -
|
| - void _onResume() {
|
| - if (_isSubscribed) _subscription.resume();
|
| - }
|
| -
|
| - void _onCancel() {
|
| - if (_isSubscribed) {
|
| - StreamSubscription subscription = _subscription;
|
| - _subscription = null;
|
| - subscription.cancel();
|
| - }
|
| - _isClosed = true;
|
| - }
|
| -
|
| - void _handleData(S data) {
|
| - try {
|
| - _transformer.handleData(data, _sink);
|
| - } catch (e, s) {
|
| - _addError(_asyncError(e, s), s);
|
| - }
|
| - }
|
| -
|
| - void _handleError(error, [stackTrace]) {
|
| - try {
|
| - _transformer.handleError(error, _sink);
|
| - } catch (e, s) {
|
| - if (identical(e, error)) {
|
| - _addError(error, stackTrace);
|
| - } else {
|
| - _addError(_asyncError(e, s), s);
|
| - }
|
| - }
|
| - }
|
| -
|
| - void _handleDone() {
|
| - try {
|
| - _subscription = null;
|
| - _transformer.handleDone(_sink);
|
| - } catch (e, s) {
|
| - _addError(_asyncError(e, s), s);
|
| - }
|
| - }
|
| -}
|
| -
|
| -class _EventSinkAdapter<T> implements EventSink<T> {
|
| - _EventSink _sink;
|
| - _EventSinkAdapter(this._sink);
|
| -
|
| - void add(T data) { _sink._add(data); }
|
| - void addError(error, [StackTrace stackTrace]) {
|
| - _sink._addError(error, stackTrace);
|
| - }
|
| - void close() { _sink._close(); }
|
| -}
|
| -
|
| -
|
| /**
|
| * An [Iterable] like interface for the values of a [Stream].
|
| *
|
|
|