Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 1587 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1598 * Otherwise, the returned future will complete when either: | 1598 * Otherwise, the returned future will complete when either: |
| 1599 * | 1599 * |
| 1600 * * all events have been processed and the sink has been closed, or | 1600 * * all events have been processed and the sink has been closed, or |
| 1601 * * the sink has otherwise been stopped from handling more events | 1601 * * the sink has otherwise been stopped from handling more events |
| 1602 * (for example by cancelling a stream subscription). | 1602 * (for example by cancelling a stream subscription). |
| 1603 */ | 1603 */ |
| 1604 Future get done; | 1604 Future get done; |
| 1605 } | 1605 } |
| 1606 | 1606 |
| 1607 /** | 1607 /** |
| 1608 * The target of a [Stream.transform] call. | 1608 * Transforms a Stream. |
| 1609 * | 1609 * |
| 1610 * The [Stream.transform] call will pass itself to this object and then return | 1610 * When a stream's [Stream.transform] method is invoked with a |
| 1611 * the resulting stream. | 1611 * [StreamTransformer], the stream calls the [bind] method on the provided |
| 1612 * transformer. The resulting stream is then returned from the | |
| 1613 * [Stream.transform] method. | |
| 1614 * | |
| 1615 * Conceptually, a transformer is simply a function from [Stream] to [Stream] | |
| 1616 * that is encapsulated into a class. | |
| 1612 * | 1617 * |
| 1613 * It is good practice to write transformers that can be used multiple times. | 1618 * It is good practice to write transformers that can be used multiple times. |
| 1619 * | |
| 1620 * All other transforming methods on [Stream], such as [Stream.map], | |
| 1621 * [Stream.where] or [Stream.expand] can be implemented using | |
| 1622 * [Stream.transform]. A [StreamTransformer] is thus very powerful but often | |
| 1623 * also a bit more complicated to use. | |
| 1614 */ | 1624 */ |
| 1615 abstract class StreamTransformer<S, T> { | 1625 abstract class StreamTransformer<S, T> { |
| 1616 /** | 1626 /** |
| 1617 * Creates a [StreamTransformer]. | 1627 * Creates a [StreamTransformer] based on the given [onListen] callback. |
| 1618 * | 1628 * |
| 1619 * The returned instance takes responsibility of implementing ([bind]). | 1629 * The returned stream transformer uses the provided [onListen] callback |
| 1620 * When the user invokes `bind` it returns a new "bound" stream. Only when | 1630 * when a transformed stream is listened to. At that time, the callback |
| 1621 * the user starts listening to the bound stream, the `listen` method | 1631 * receives the input stream (the one passed to [bind]) and a |
| 1622 * invokes the given closure [transformer]. | 1632 * boolean flag `cancelOnError` to create a [StreamSubscription]. |
| 1623 * | 1633 * |
| 1624 * The [transformer] closure receives the stream, that was bound, as argument | 1634 * The [onListen] callback does *not* receive the handlers that were passed |
| 1625 * and returns a [StreamSubscription]. In almost all cases the closure | 1635 * to [Stream.listen]. These are automatically set after the call to the |
| 1626 * listens itself to the stream that is given as argument. | 1636 * [onListen] callback (using [StreamSubscription.onData], |
| 1637 * [StreamSubscription.onError] and [StreamSubscription.onDone]). | |
| 1627 * | 1638 * |
| 1628 * The result of invoking the [transformer] closure is a [StreamSubscription]. | 1639 * Most commonly, an [onListen] callback will first call [Stream.listen] on |
| 1629 * The bound stream-transformer (created by the `bind` method above) then sets | 1640 * the provided stream (with the corresponding `cancelOnError` flag), and then |
| 1630 * the handlers it received as part of the `listen` call. | 1641 * return a new [StreamSubscription]. |
| 1631 * | |
| 1632 * Conceptually this can be summarized as follows: | |
| 1633 * | |
| 1634 * 1. `var transformer = new StreamTransformer(transformerClosure);` | |
| 1635 * creates a `StreamTransformer` that supports the `bind` method. | |
| 1636 * 2. `var boundStream = stream.transform(transformer);` binds the `stream` | |
| 1637 * and returns a bound stream that has a pointer to `stream`. | |
| 1638 * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)` | |
| 1639 * starts the listening and transformation. This is accomplished | |
| 1640 * in 2 steps: first the `boundStream` invokes the `transformerClosure` with | |
| 1641 * the `stream` it captured: `transformerClosure(stream, b)`. | |
| 1642 * The result `subscription`, a [StreamSubscription], is then | |
| 1643 * updated to receive its handlers: `subscription.onData(f1)`, | |
| 1644 * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription | |
| 1645 * is returned as result of the `listen` call. | |
| 1646 * | 1642 * |
| 1647 * There are two common ways to create a StreamSubscription: | 1643 * There are two common ways to create a StreamSubscription: |
| 1648 * | 1644 * |
| 1649 * 1. by creating a new class that implements [StreamSubscription]. | 1645 * 1. by allocating a [StreamController] and to return the result of |
| 1646 * listening to its stream. It's important to forward pause, resume and | |
| 1647 * cancel events (unless the transformer intentionally wants to change | |
| 1648 * this behavior). | |
| 1649 * 2. by creating a new class that implements [StreamSubscription]. | |
| 1650 * Note that the subscription should run callbacks in the [Zone] the | 1650 * Note that the subscription should run callbacks in the [Zone] the |
| 1651 * stream was listened to. | 1651 * stream was listened to (see [Zone] and [Zone.bindCallback]). |
| 1652 * 2. by allocating a [StreamController] and to return the result of | |
| 1653 * listening to its stream. | |
| 1654 * | 1652 * |
| 1655 * Example use of a duplicating transformer: | 1653 * Example use of a duplicating transformer: |
|
Lasse Reichstein Nielsen
2017/05/08 07:08:55
Example use of -> Example of use to create
floitsch
2017/05/08 09:29:50
slightly changed.
done.
| |
| 1656 * | 1654 * |
| 1657 * stringStream.transform(new StreamTransformer<String, String>( | 1655 * ``` |
| 1658 * (Stream<String> input, bool cancelOnError) { | 1656 * StreamSubscription<int> _onListen(Stream<int> input, bool cancelOnError) { |
| 1659 * StreamController<String> controller; | 1657 * StreamSubscription<String> subscription; |
| 1660 * StreamSubscription<String> subscription; | 1658 * // Create controller that forwards pause, resume and cancel events. |
| 1661 * controller = new StreamController<String>( | 1659 * var controller = new StreamController<int>( |
| 1662 * onListen: () { | 1660 * onPause: () { subscription.pause(); }, |
| 1663 * subscription = input.listen((data) { | 1661 * onResume: () { subscription.resume(); }, |
| 1664 * // Duplicate the data. | 1662 * onCancel: () => subscription.cancel(), |
| 1665 * controller.add(data); | 1663 * sync: true); // "sync" is correct here, since events are forwarded. |
| 1666 * controller.add(data); | 1664 * |
| 1667 * }, | 1665 * // Listen to the provided stream using `cancelOnError`. |
| 1668 * onError: controller.addError, | 1666 * subscription = input.listen((data) { |
| 1669 * onDone: controller.close, | 1667 * // Duplicate the data. |
| 1670 * cancelOnError: cancelOnError); | 1668 * controller.add(data); |
| 1671 * }, | 1669 * controller.add(data); |
| 1672 * onPause: () { subscription.pause(); }, | 1670 * }, onError: controller.addError, |
| 1673 * onResume: () { subscription.resume(); }, | 1671 * onDone: controller.close, |
| 1674 * onCancel: () => subscription.cancel(), | 1672 * cancelOnError: cancelOnError); |
|
Lasse Reichstein Nielsen
2017/05/08 07:08:54
I find the formatting slightly confusing.
Try putt
floitsch
2017/05/08 09:29:50
Not much of a difference.
done.
| |
| 1675 * sync: true); | 1673 * |
| 1676 * return controller.stream.listen(null); | 1674 * // Return a new [StreamSubscription] by listening to the controller's |
| 1677 * }); | 1675 * // stream. |
| 1676 * return controller.stream.listen(null); | |
| 1677 * } | |
| 1678 * | |
| 1679 * // Instantiate a transformer: | |
| 1680 * var duplicator = new StreamTransformer<int, int>(_onListen); | |
|
Lasse Reichstein Nielsen
2017/05/08 07:08:54
Consider making it const, just because you can.
Lasse Reichstein Nielsen
2017/05/08 07:08:55
Consider making it const, just because you can.
floitsch
2017/05/08 09:29:50
Acknowledged.
floitsch
2017/05/08 09:29:51
Done.
| |
| 1681 * | |
| 1682 * // Use as follows: | |
| 1683 * intStream.transform(duplicator); | |
| 1684 * ``` | |
| 1678 */ | 1685 */ |
| 1679 const factory StreamTransformer( | 1686 const factory StreamTransformer( |
| 1680 StreamSubscription<T> transformer( | 1687 StreamSubscription<T> onListen( |
| 1681 Stream<S> stream, bool cancelOnError)) = | 1688 Stream<S> stream, bool cancelOnError)) = |
| 1682 _StreamSubscriptionTransformer<S, T>; | 1689 _StreamSubscriptionTransformer<S, T>; |
| 1683 | 1690 |
| 1684 /** | 1691 /** |
| 1685 * Creates a [StreamTransformer] that delegates events to the given functions. | 1692 * Creates a [StreamTransformer] that delegates events to the given functions. |
| 1686 * | 1693 * |
| 1687 * Example use of a duplicating transformer: | 1694 * Example use of a duplicating transformer: |
| 1688 * | 1695 * |
| 1689 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs( | 1696 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs( |
| 1690 * handleData: (String value, EventSink<String> sink) { | 1697 * handleData: (String value, EventSink<String> sink) { |
| 1691 * sink.add(value); | 1698 * sink.add(value); |
| 1692 * sink.add(value); // Duplicate the incoming events. | 1699 * sink.add(value); // Duplicate the incoming events. |
| 1693 * })); | 1700 * })); |
| 1694 */ | 1701 */ |
| 1695 factory StreamTransformer.fromHandlers( | 1702 factory StreamTransformer.fromHandlers( |
| 1696 {void handleData(S data, EventSink<T> sink), | 1703 {void handleData(S data, EventSink<T> sink), |
| 1697 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | 1704 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
| 1698 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; | 1705 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; |
| 1699 | 1706 |
| 1700 /** | 1707 /** |
| 1701 * Transform the incoming [stream]'s events. | 1708 * Transforms the provided [stream]. |
| 1702 * | 1709 * |
| 1703 * Creates a new stream. | 1710 * Returns a replacement stream for the provided input [stream]. |
|
Lasse Reichstein Nielsen
2017/05/08 07:08:55
Returns a new stream with events that are computed
floitsch
2017/05/08 09:29:50
Done.
| |
| 1704 * When this stream is listened to, it will start listening on [stream], | |
| 1705 * and generate events on the new stream based on the events from [stream]. | |
| 1706 * | 1711 * |
| 1707 * Subscriptions on the returned stream should propagate pause state | 1712 * Implementors of the [StreamTransformer] interface should document |
| 1708 * to the subscription on [stream]. | 1713 * differences from the following expected behavior: |
| 1714 * | |
| 1715 * * When the returned stream is listened to, it starts listening to the | |
| 1716 * input [stream]. | |
| 1717 * * Subscriptions of the returned stream forward (in a reasonable time) | |
| 1718 * a [StreamSubscription.pause] call to the subscription of the input | |
| 1719 * [stream]. | |
| 1720 * * Similarly, canceling a subscription of the returned stream eventually | |
| 1721 * (in reasonable time) cancels the subscription of the input [stream]. | |
| 1722 * | |
| 1723 * "Reasonable time" depends on the transformer and stream. Some transformers, | |
| 1724 * like a "timeout" transformer, might make these operations depend on a | |
| 1725 * duration. Others might delay them not at all, or just by a microtask. | |
|
Lasse Reichstein Nielsen
2017/05/08 07:08:55
delay them not at all -> not delay them at all
floitsch
2017/05/08 09:29:50
Done.
| |
| 1709 */ | 1726 */ |
| 1710 Stream<T> bind(Stream<S> stream); | 1727 Stream<T> bind(Stream<S> stream); |
| 1711 } | 1728 } |
| 1712 | 1729 |
| 1713 /** | 1730 /** |
| 1714 * An [Iterator] like interface for the values of a [Stream]. | 1731 * An [Iterator] like interface for the values of a [Stream]. |
| 1715 * | 1732 * |
| 1716 * This wraps a [Stream] and a subscription on the stream. It listens | 1733 * This wraps a [Stream] and a subscription on the stream. It listens |
| 1717 * on the stream, and completes the future returned by [moveNext] when the | 1734 * on the stream, and completes the future returned by [moveNext] when the |
| 1718 * next value becomes available. | 1735 * next value becomes available. |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1789 } | 1806 } |
| 1790 | 1807 |
| 1791 void addError(error, [StackTrace stackTrace]) { | 1808 void addError(error, [StackTrace stackTrace]) { |
| 1792 _sink.addError(error, stackTrace); | 1809 _sink.addError(error, stackTrace); |
| 1793 } | 1810 } |
| 1794 | 1811 |
| 1795 void close() { | 1812 void close() { |
| 1796 _sink.close(); | 1813 _sink.close(); |
| 1797 } | 1814 } |
| 1798 } | 1815 } |
| OLD | NEW |