| Index: sdk/lib/async/stream.dart
|
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
|
| index 27c0ff78371de2cbc433f1dccbd837fac9a9f155..74407d76f00c86b224065aaed3571aff02330aad 100644
|
| --- a/sdk/lib/async/stream.dart
|
| +++ b/sdk/lib/async/stream.dart
|
| @@ -1605,79 +1605,92 @@ abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> {
|
| }
|
|
|
| /**
|
| - * The target of a [Stream.transform] call.
|
| + * Transforms a Stream.
|
| *
|
| - * The [Stream.transform] call will pass itself to this object and then return
|
| - * the resulting stream.
|
| + * When a stream's [Stream.transform] method is invoked with a
|
| + * [StreamTransformer], the stream calls the [bind] method on the provided
|
| + * transformer. The resulting stream is then returned from the
|
| + * [Stream.transform] method.
|
| + *
|
| + * Conceptually, a transformer is simply a function from [Stream] to [Stream]
|
| + * that is encapsulated into a class.
|
| *
|
| * It is good practice to write transformers that can be used multiple times.
|
| + *
|
| + * All other transforming methods on [Stream], such as [Stream.map],
|
| + * [Stream.where] or [Stream.expand] can be implemented using
|
| + * [Stream.transform]. A [StreamTransformer] is thus very powerful but often
|
| + * also a bit more complicated to use.
|
| */
|
| abstract class StreamTransformer<S, T> {
|
| /**
|
| - * Creates a [StreamTransformer].
|
| - *
|
| - * The returned instance takes responsibility of implementing ([bind]).
|
| - * When the user invokes `bind` it returns a new "bound" stream. Only when
|
| - * the user starts listening to the bound stream, the `listen` method
|
| - * invokes the given closure [transformer].
|
| - *
|
| - * The [transformer] closure receives the stream, that was bound, as argument
|
| - * and returns a [StreamSubscription]. In almost all cases the closure
|
| - * listens itself to the stream that is given as argument.
|
| - *
|
| - * The result of invoking the [transformer] closure is a [StreamSubscription].
|
| - * The bound stream-transformer (created by the `bind` method above) then sets
|
| - * the handlers it received as part of the `listen` call.
|
| - *
|
| - * Conceptually this can be summarized as follows:
|
| - *
|
| - * 1. `var transformer = new StreamTransformer(transformerClosure);`
|
| - * creates a `StreamTransformer` that supports the `bind` method.
|
| - * 2. `var boundStream = stream.transform(transformer);` binds the `stream`
|
| - * and returns a bound stream that has a pointer to `stream`.
|
| - * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)`
|
| - * starts the listening and transformation. This is accomplished
|
| - * in 2 steps: first the `boundStream` invokes the `transformerClosure` with
|
| - * the `stream` it captured: `transformerClosure(stream, b)`.
|
| - * The result `subscription`, a [StreamSubscription], is then
|
| - * updated to receive its handlers: `subscription.onData(f1)`,
|
| - * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription
|
| - * is returned as result of the `listen` call.
|
| + * Creates a [StreamTransformer] based on the given [onListen] callback.
|
| *
|
| - * There are two common ways to create a StreamSubscription:
|
| + * The returned stream transformer uses the provided [onListen] callback
|
| + * when a transformed stream is listened to. At that time, the callback
|
| + * receives the input stream (the one passed to [bind]) and a
|
| + * boolean flag `cancelOnError` to create a [StreamSubscription].
|
| *
|
| - * 1. by creating a new class that implements [StreamSubscription].
|
| - * Note that the subscription should run callbacks in the [Zone] the
|
| - * stream was listened to.
|
| - * 2. by allocating a [StreamController] and to return the result of
|
| - * listening to its stream.
|
| + * The [onListen] callback does *not* receive the handlers that were passed
|
| + * to [Stream.listen]. These are automatically set after the call to the
|
| + * [onListen] callback (using [StreamSubscription.onData],
|
| + * [StreamSubscription.onError] and [StreamSubscription.onDone]).
|
| *
|
| - * Example use of a duplicating transformer:
|
| + * Most commonly, an [onListen] callback will first call [Stream.listen] on
|
| + * the provided stream (with the corresponding `cancelOnError` flag), and then
|
| + * return a new [StreamSubscription].
|
| *
|
| - * stringStream.transform(new StreamTransformer<String, String>(
|
| - * (Stream<String> input, bool cancelOnError) {
|
| - * StreamController<String> controller;
|
| - * StreamSubscription<String> subscription;
|
| - * controller = new StreamController<String>(
|
| - * onListen: () {
|
| - * subscription = input.listen((data) {
|
| - * // Duplicate the data.
|
| - * controller.add(data);
|
| - * controller.add(data);
|
| - * },
|
| - * onError: controller.addError,
|
| - * onDone: controller.close,
|
| - * cancelOnError: cancelOnError);
|
| - * },
|
| - * onPause: () { subscription.pause(); },
|
| - * onResume: () { subscription.resume(); },
|
| - * onCancel: () => subscription.cancel(),
|
| - * sync: true);
|
| - * return controller.stream.listen(null);
|
| - * });
|
| + * There are two common ways to create a StreamSubscription:
|
| + *
|
| + * 1. by allocating a [StreamController] and to return the result of
|
| + * listening to its stream. It's important to forward pause, resume and
|
| + * cancel events (unless the transformer intentionally wants to change
|
| + * this behavior).
|
| + * 2. by creating a new class that implements [StreamSubscription].
|
| + * Note that the subscription should run callbacks in the [Zone] the
|
| + * stream was listened to (see [Zone] and [Zone.bindCallback]).
|
| + *
|
| + * Example:
|
| + *
|
| + * ```
|
| + * /// Starts listening to [input] and duplicates all non-error events.
|
| + * StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) {
|
| + * StreamSubscription<String> subscription;
|
| + * // Create controller that forwards pause, resume and cancel events.
|
| + * var controller = new StreamController<String>(
|
| + * onPause: () {
|
| + * subscription.pause();
|
| + * },
|
| + * onResume: () {
|
| + * subscription.resume();
|
| + * },
|
| + * onCancel: () => subscription.cancel(),
|
| + * sync: true); // "sync" is correct here, since events are forwarded.
|
| + *
|
| + * // Listen to the provided stream using `cancelOnError`.
|
| + * subscription = input.listen((data) {
|
| + * // Duplicate the data.
|
| + * controller.add(data);
|
| + * controller.add(data);
|
| + * },
|
| + * onError: controller.addError,
|
| + * onDone: controller.close,
|
| + * cancelOnError: cancelOnError);
|
| + *
|
| + * // Return a new [StreamSubscription] by listening to the controller's
|
| + * // stream.
|
| + * return controller.stream.listen(null);
|
| + * }
|
| + *
|
| + * // Instantiate a transformer:
|
| + * var duplicator = const StreamTransformer<int, int>(_onListen);
|
| + *
|
| + * // Use as follows:
|
| + * intStream.transform(duplicator);
|
| + * ```
|
| */
|
| const factory StreamTransformer(
|
| - StreamSubscription<T> transformer(
|
| + StreamSubscription<T> onListen(
|
| Stream<S> stream, bool cancelOnError)) =
|
| _StreamSubscriptionTransformer<S, T>;
|
|
|
| @@ -1698,14 +1711,25 @@ abstract class StreamTransformer<S, T> {
|
| void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>;
|
|
|
| /**
|
| - * Transform the incoming [stream]'s events.
|
| + * Transforms the provided [stream].
|
| + *
|
| + * Returns a new stream with events that are computed from events of the
|
| + * provided [stream].
|
| + *
|
| + * Implementors of the [StreamTransformer] interface should document
|
| + * differences from the following expected behavior:
|
| *
|
| - * Creates a new stream.
|
| - * When this stream is listened to, it will start listening on [stream],
|
| - * and generate events on the new stream based on the events from [stream].
|
| + * * When the returned stream is listened to, it starts listening to the
|
| + * input [stream].
|
| + * * Subscriptions of the returned stream forward (in a reasonable time)
|
| + * a [StreamSubscription.pause] call to the subscription of the input
|
| + * [stream].
|
| + * * Similarly, canceling a subscription of the returned stream eventually
|
| + * (in reasonable time) cancels the subscription of the input [stream].
|
| *
|
| - * Subscriptions on the returned stream should propagate pause state
|
| - * to the subscription on [stream].
|
| + * "Reasonable time" depends on the transformer and stream. Some transformers,
|
| + * like a "timeout" transformer, might make these operations depend on a
|
| + * duration. Others might not delay them at all, or just by a microtask.
|
| */
|
| Stream<T> bind(Stream<S> stream);
|
| }
|
|
|