Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index 27c0ff78371de2cbc433f1dccbd837fac9a9f155..921db1424ba25e95e4474b26729c9c7e23b0b710 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -1605,79 +1605,86 @@ abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> { |
| } |
| /** |
| - * The target of a [Stream.transform] call. |
| + * Transforms a Stream. |
| * |
| - * The [Stream.transform] call will pass itself to this object and then return |
| - * the resulting stream. |
| + * When a stream's [Stream.transform] method is invoked with a |
| + * [StreamTransformer], the stream calls the [bind] method on the provided |
| + * transformer. The resulting stream is then returned from the |
| + * [Stream.transform] method. |
| + * |
| + * Conceptually, a transformer is simply a function from [Stream] to [Stream] |
| + * that is encapsulated into a class. |
| * |
| * It is good practice to write transformers that can be used multiple times. |
| + * |
| + * All other transforming methods on [Stream], such as [Stream.map], |
| + * [Stream.where] or [Stream.expand] can be implemented using |
| + * [Stream.transform]. A [StreamTransformer] is thus very powerful but often |
| + * also a bit more complicated to use. |
| */ |
| abstract class StreamTransformer<S, T> { |
| /** |
| - * Creates a [StreamTransformer]. |
| - * |
| - * The returned instance takes responsibility of implementing ([bind]). |
| - * When the user invokes `bind` it returns a new "bound" stream. Only when |
| - * the user starts listening to the bound stream, the `listen` method |
| - * invokes the given closure [transformer]. |
| - * |
| - * The [transformer] closure receives the stream, that was bound, as argument |
| - * and returns a [StreamSubscription]. In almost all cases the closure |
| - * listens itself to the stream that is given as argument. |
| - * |
| - * The result of invoking the [transformer] closure is a [StreamSubscription]. |
| - * The bound stream-transformer (created by the `bind` method above) then sets |
| - * the handlers it received as part of the `listen` call. |
| - * |
| - * Conceptually this can be summarized as follows: |
| - * |
| - * 1. `var transformer = new StreamTransformer(transformerClosure);` |
| - * creates a `StreamTransformer` that supports the `bind` method. |
| - * 2. `var boundStream = stream.transform(transformer);` binds the `stream` |
| - * and returns a bound stream that has a pointer to `stream`. |
| - * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)` |
| - * starts the listening and transformation. This is accomplished |
| - * in 2 steps: first the `boundStream` invokes the `transformerClosure` with |
| - * the `stream` it captured: `transformerClosure(stream, b)`. |
| - * The result `subscription`, a [StreamSubscription], is then |
| - * updated to receive its handlers: `subscription.onData(f1)`, |
| - * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription |
| - * is returned as result of the `listen` call. |
| + * Creates a [StreamTransformer] based on the given [onListen] callback. |
| + * |
| + * The returned stream transformer uses the provided [onListen] callback |
| + * when a transformed stream is listened to. At that time, the callback |
| + * receives the input stream (the one passed to [bind]) and a |
| + * boolean flag `cancelOnError` to create a [StreamSubscription]. |
| + * |
| + * The [onListen] callback does *not* receive the handlers that were passed |
| + * to [Stream.listen]. These are automatically set after the call to the |
| + * [onListen] callback (using [StreamSubscription.onData], |
| + * [StreamSubscription.onError] and [StreamSubscription.onDone]). |
| + * |
| + * Most commonly, an [onListen] callback will first call [Stream.listen] on |
| + * the provided stream (with the corresponding `cancelOnError` flag), and then |
| + * return a new [StreamSubscription]. |
| * |
| * There are two common ways to create a StreamSubscription: |
| * |
| - * 1. by creating a new class that implements [StreamSubscription]. |
| + * 1. by allocating a [StreamController] and to return the result of |
| + * listening to its stream. It's important to forward pause, resume and |
| + * cancel events (unless the transformer intentionally wants to change |
| + * this behavior). |
| + * 2. by creating a new class that implements [StreamSubscription]. |
| * Note that the subscription should run callbacks in the [Zone] the |
| - * stream was listened to. |
| - * 2. by allocating a [StreamController] and to return the result of |
| - * listening to its stream. |
| + * stream was listened to (see [Zone] and [Zone.bindCallback]). |
| * |
| * Example use of a duplicating transformer: |
|
Lasse Reichstein Nielsen
2017/05/08 07:08:55
Example use of -> Example of use to create
floitsch
2017/05/08 09:29:50
slightly changed.
done.
|
| * |
| - * stringStream.transform(new StreamTransformer<String, String>( |
| - * (Stream<String> input, bool cancelOnError) { |
| - * StreamController<String> controller; |
| - * StreamSubscription<String> subscription; |
| - * controller = new StreamController<String>( |
| - * onListen: () { |
| - * subscription = input.listen((data) { |
| - * // Duplicate the data. |
| - * controller.add(data); |
| - * controller.add(data); |
| - * }, |
| - * onError: controller.addError, |
| - * onDone: controller.close, |
| - * cancelOnError: cancelOnError); |
| - * }, |
| - * onPause: () { subscription.pause(); }, |
| - * onResume: () { subscription.resume(); }, |
| - * onCancel: () => subscription.cancel(), |
| - * sync: true); |
| - * return controller.stream.listen(null); |
| - * }); |
| + * ``` |
| + * StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) { |
| + * StreamSubscription<String> subscription; |
| + * // Create controller that forwards pause, resume and cancel events. |
| + * var controller = new StreamController<int>( |
| + * onPause: () { subscription.pause(); }, |
| + * onResume: () { subscription.resume(); }, |
| + * onCancel: () => subscription.cancel(), |
| + * sync: true); // "sync" is correct here, since events are forwarded. |
| + * |
| + * // Listen to the provided stream using `cancelOnError`. |
| + * subscription = input.listen((data) { |
| + * // Duplicate the data. |
| + * controller.add(data); |
| + * controller.add(data); |
| + * }, onError: controller.addError, |
| + * onDone: controller.close, |
| + * cancelOnError: cancelOnError); |
|
Lasse Reichstein Nielsen
2017/05/08 07:08:54
I find the formatting slightly confusing.
Try putt
floitsch
2017/05/08 09:29:50
Not much of a difference.
done.
|
| + * |
| + * // Return a new [StreamSubscription] by listening to the controller's |
| + * // stream. |
| + * return controller.stream.listen(null); |
| + * } |
| + * |
| + * // Instantiate a transformer: |
| + * var duplicator = new StreamTransformer<int, int>(_onListen); |
|
Lasse Reichstein Nielsen
2017/05/08 07:08:54
Consider making it const, just because you can.
Lasse Reichstein Nielsen
2017/05/08 07:08:55
Consider making it const, just because you can.
floitsch
2017/05/08 09:29:50
Acknowledged.
floitsch
2017/05/08 09:29:51
Done.
|
| + * |
| + * // Use as follows: |
| + * intStream.transform(duplicator); |
| + * ``` |
| */ |
| const factory StreamTransformer( |
| - StreamSubscription<T> transformer( |
| + StreamSubscription<T> onListen( |
| Stream<S> stream, bool cancelOnError)) = |
| _StreamSubscriptionTransformer<S, T>; |
| @@ -1698,14 +1705,24 @@ abstract class StreamTransformer<S, T> { |
| void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; |
| /** |
| - * Transform the incoming [stream]'s events. |
| + * Transforms the provided [stream]. |
| + * |
| + * Returns a replacement stream for the provided input [stream]. |
|
Lasse Reichstein Nielsen
2017/05/08 07:08:55
Returns a new stream with events that are computed
floitsch
2017/05/08 09:29:50
Done.
|
| + * |
| + * Implementors of the [StreamTransformer] interface should document |
| + * differences from the following expected behavior: |
| * |
| - * Creates a new stream. |
| - * When this stream is listened to, it will start listening on [stream], |
| - * and generate events on the new stream based on the events from [stream]. |
| + * * When the returned stream is listened to, it starts listening to the |
| + * input [stream]. |
| + * * Subscriptions of the returned stream forward (in a reasonable time) |
| + * a [StreamSubscription.pause] call to the subscription of the input |
| + * [stream]. |
| + * * Similarly, canceling a subscription of the returned stream eventually |
| + * (in reasonable time) cancels the subscription of the input [stream]. |
| * |
| - * Subscriptions on the returned stream should propagate pause state |
| - * to the subscription on [stream]. |
| + * "Reasonable time" depends on the transformer and stream. Some transformers, |
| + * like a "timeout" transformer, might make these operations depend on a |
| + * duration. Others might delay them not at all, or just by a microtask. |
|
Lasse Reichstein Nielsen
2017/05/08 07:08:55
delay them not at all -> not delay them at all
floitsch
2017/05/08 09:29:50
Done.
|
| */ |
| Stream<T> bind(Stream<S> stream); |
| } |