Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index 27c0ff78371de2cbc433f1dccbd837fac9a9f155..74407d76f00c86b224065aaed3571aff02330aad 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -1605,79 +1605,92 @@ abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> { |
} |
/** |
- * The target of a [Stream.transform] call. |
+ * Transforms a Stream. |
* |
- * The [Stream.transform] call will pass itself to this object and then return |
- * the resulting stream. |
+ * When a stream's [Stream.transform] method is invoked with a |
+ * [StreamTransformer], the stream calls the [bind] method on the provided |
+ * transformer. The resulting stream is then returned from the |
+ * [Stream.transform] method. |
+ * |
+ * Conceptually, a transformer is simply a function from [Stream] to [Stream] |
+ * that is encapsulated into a class. |
* |
* It is good practice to write transformers that can be used multiple times. |
+ * |
+ * All other transforming methods on [Stream], such as [Stream.map], |
+ * [Stream.where] or [Stream.expand] can be implemented using |
+ * [Stream.transform]. A [StreamTransformer] is thus very powerful but often |
+ * also a bit more complicated to use. |
*/ |
abstract class StreamTransformer<S, T> { |
/** |
- * Creates a [StreamTransformer]. |
- * |
- * The returned instance takes responsibility of implementing ([bind]). |
- * When the user invokes `bind` it returns a new "bound" stream. Only when |
- * the user starts listening to the bound stream, the `listen` method |
- * invokes the given closure [transformer]. |
- * |
- * The [transformer] closure receives the stream, that was bound, as argument |
- * and returns a [StreamSubscription]. In almost all cases the closure |
- * listens itself to the stream that is given as argument. |
- * |
- * The result of invoking the [transformer] closure is a [StreamSubscription]. |
- * The bound stream-transformer (created by the `bind` method above) then sets |
- * the handlers it received as part of the `listen` call. |
- * |
- * Conceptually this can be summarized as follows: |
- * |
- * 1. `var transformer = new StreamTransformer(transformerClosure);` |
- * creates a `StreamTransformer` that supports the `bind` method. |
- * 2. `var boundStream = stream.transform(transformer);` binds the `stream` |
- * and returns a bound stream that has a pointer to `stream`. |
- * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)` |
- * starts the listening and transformation. This is accomplished |
- * in 2 steps: first the `boundStream` invokes the `transformerClosure` with |
- * the `stream` it captured: `transformerClosure(stream, b)`. |
- * The result `subscription`, a [StreamSubscription], is then |
- * updated to receive its handlers: `subscription.onData(f1)`, |
- * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription |
- * is returned as result of the `listen` call. |
+ * Creates a [StreamTransformer] based on the given [onListen] callback. |
* |
- * There are two common ways to create a StreamSubscription: |
+ * The returned stream transformer uses the provided [onListen] callback |
+ * when a transformed stream is listened to. At that time, the callback |
+ * receives the input stream (the one passed to [bind]) and a |
+ * boolean flag `cancelOnError` to create a [StreamSubscription]. |
* |
- * 1. by creating a new class that implements [StreamSubscription]. |
- * Note that the subscription should run callbacks in the [Zone] the |
- * stream was listened to. |
- * 2. by allocating a [StreamController] and to return the result of |
- * listening to its stream. |
+ * The [onListen] callback does *not* receive the handlers that were passed |
+ * to [Stream.listen]. These are automatically set after the call to the |
+ * [onListen] callback (using [StreamSubscription.onData], |
+ * [StreamSubscription.onError] and [StreamSubscription.onDone]). |
* |
- * Example use of a duplicating transformer: |
+ * Most commonly, an [onListen] callback will first call [Stream.listen] on |
+ * the provided stream (with the corresponding `cancelOnError` flag), and then |
+ * return a new [StreamSubscription]. |
* |
- * stringStream.transform(new StreamTransformer<String, String>( |
- * (Stream<String> input, bool cancelOnError) { |
- * StreamController<String> controller; |
- * StreamSubscription<String> subscription; |
- * controller = new StreamController<String>( |
- * onListen: () { |
- * subscription = input.listen((data) { |
- * // Duplicate the data. |
- * controller.add(data); |
- * controller.add(data); |
- * }, |
- * onError: controller.addError, |
- * onDone: controller.close, |
- * cancelOnError: cancelOnError); |
- * }, |
- * onPause: () { subscription.pause(); }, |
- * onResume: () { subscription.resume(); }, |
- * onCancel: () => subscription.cancel(), |
- * sync: true); |
- * return controller.stream.listen(null); |
- * }); |
+ * There are two common ways to create a StreamSubscription: |
+ * |
+ * 1. by allocating a [StreamController] and to return the result of |
+ * listening to its stream. It's important to forward pause, resume and |
+ * cancel events (unless the transformer intentionally wants to change |
+ * this behavior). |
+ * 2. by creating a new class that implements [StreamSubscription]. |
+ * Note that the subscription should run callbacks in the [Zone] the |
+ * stream was listened to (see [Zone] and [Zone.bindCallback]). |
+ * |
+ * Example: |
+ * |
+ * ``` |
+ * /// Starts listening to [input] and duplicates all non-error events. |
+ * StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) { |
+ * StreamSubscription<String> subscription; |
+ * // Create controller that forwards pause, resume and cancel events. |
+ * var controller = new StreamController<String>( |
+ * onPause: () { |
+ * subscription.pause(); |
+ * }, |
+ * onResume: () { |
+ * subscription.resume(); |
+ * }, |
+ * onCancel: () => subscription.cancel(), |
+ * sync: true); // "sync" is correct here, since events are forwarded. |
+ * |
+ * // Listen to the provided stream using `cancelOnError`. |
+ * subscription = input.listen((data) { |
+ * // Duplicate the data. |
+ * controller.add(data); |
+ * controller.add(data); |
+ * }, |
+ * onError: controller.addError, |
+ * onDone: controller.close, |
+ * cancelOnError: cancelOnError); |
+ * |
+ * // Return a new [StreamSubscription] by listening to the controller's |
+ * // stream. |
+ * return controller.stream.listen(null); |
+ * } |
+ * |
+ * // Instantiate a transformer: |
+ * var duplicator = const StreamTransformer<int, int>(_onListen); |
+ * |
+ * // Use as follows: |
+ * intStream.transform(duplicator); |
+ * ``` |
*/ |
const factory StreamTransformer( |
- StreamSubscription<T> transformer( |
+ StreamSubscription<T> onListen( |
Stream<S> stream, bool cancelOnError)) = |
_StreamSubscriptionTransformer<S, T>; |
@@ -1698,14 +1711,25 @@ abstract class StreamTransformer<S, T> { |
void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; |
/** |
- * Transform the incoming [stream]'s events. |
+ * Transforms the provided [stream]. |
+ * |
+ * Returns a new stream with events that are computed from events of the |
+ * provided [stream]. |
+ * |
+ * Implementors of the [StreamTransformer] interface should document |
+ * differences from the following expected behavior: |
* |
- * Creates a new stream. |
- * When this stream is listened to, it will start listening on [stream], |
- * and generate events on the new stream based on the events from [stream]. |
+ * * When the returned stream is listened to, it starts listening to the |
+ * input [stream]. |
+ * * Subscriptions of the returned stream forward (in a reasonable time) |
+ * a [StreamSubscription.pause] call to the subscription of the input |
+ * [stream]. |
+ * * Similarly, canceling a subscription of the returned stream eventually |
+ * (in reasonable time) cancels the subscription of the input [stream]. |
* |
- * Subscriptions on the returned stream should propagate pause state |
- * to the subscription on [stream]. |
+ * "Reasonable time" depends on the transformer and stream. Some transformers, |
+ * like a "timeout" transformer, might make these operations depend on a |
+ * duration. Others might not delay them at all, or just by a microtask. |
*/ |
Stream<T> bind(Stream<S> stream); |
} |